Unverified Commit 9efd4bb2 authored by bobz965's avatar bobz965 Committed by GitHub
Browse files

fix: 5ms is too short for eip and nats creation (#1781)

* fix: 20ms is too short for eip and nats creation

* fix: 20ms is too short for eip and nats creation
parent 80425b7c
Showing with 53 additions and 41 deletions
+53 -41
...@@ -25,6 +25,7 @@ type Configuration struct { ...@@ -25,6 +25,7 @@ type Configuration struct {
OvnSbAddr string OvnSbAddr string
OvnTimeout int OvnTimeout int
CustCrdRetryMaxDelay int CustCrdRetryMaxDelay int
CustCrdRetryMinDelay int
KubeConfigFile string KubeConfigFile string
KubeRestConfig *rest.Config KubeRestConfig *rest.Config
...@@ -96,7 +97,8 @@ func ParseFlags() (*Configuration, error) { ...@@ -96,7 +97,8 @@ func ParseFlags() (*Configuration, error) {
argOvnNbAddr = pflag.String("ovn-nb-addr", "", "ovn-nb address") argOvnNbAddr = pflag.String("ovn-nb-addr", "", "ovn-nb address")
argOvnSbAddr = pflag.String("ovn-sb-addr", "", "ovn-sb address") argOvnSbAddr = pflag.String("ovn-sb-addr", "", "ovn-sb address")
argOvnTimeout = pflag.Int("ovn-timeout", 60, "") argOvnTimeout = pflag.Int("ovn-timeout", 60, "")
argCustCrdRetryMaxDelay = pflag.Int("cust-crd-retry-max-delay", 20, "The max delay between custom crd two retries") argCustCrdRetryMinDelay = pflag.Int("cust-crd-retry-min-delay", 2, "The min delay seconds between custom crd two retries")
argCustCrdRetryMaxDelay = pflag.Int("cust-crd-retry-max-delay", 20, "The max delay seconds between custom crd two retries")
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.") argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
argDefaultLogicalSwitch = pflag.String("default-ls", util.DefaultSubnet, "The default logical switch name") argDefaultLogicalSwitch = pflag.String("default-ls", util.DefaultSubnet, "The default logical switch name")
...@@ -169,6 +171,7 @@ func ParseFlags() (*Configuration, error) { ...@@ -169,6 +171,7 @@ func ParseFlags() (*Configuration, error) {
OvnNbAddr: *argOvnNbAddr, OvnNbAddr: *argOvnNbAddr,
OvnSbAddr: *argOvnSbAddr, OvnSbAddr: *argOvnSbAddr,
OvnTimeout: *argOvnTimeout, OvnTimeout: *argOvnTimeout,
CustCrdRetryMinDelay: *argCustCrdRetryMinDelay,
CustCrdRetryMaxDelay: *argCustCrdRetryMaxDelay, CustCrdRetryMaxDelay: *argCustCrdRetryMaxDelay,
KubeConfigFile: *argKubeConfigFile, KubeConfigFile: *argKubeConfigFile,
DefaultLogicalSwitch: *argDefaultLogicalSwitch, DefaultLogicalSwitch: *argDefaultLogicalSwitch,
......
...@@ -185,7 +185,7 @@ func NewController(config *Configuration) *Controller { ...@@ -185,7 +185,7 @@ func NewController(config *Configuration) *Controller {
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")}) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
custCrdRateLimiter := workqueue.NewMaxOfRateLimiter( custCrdRateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, time.Duration(config.CustCrdRetryMaxDelay)*time.Second), workqueue.NewItemExponentialFailureRateLimiter(time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
) )
......
...@@ -25,10 +25,10 @@ import ( ...@@ -25,10 +25,10 @@ import (
) )
var ( var (
vpcNatImage = "" vpcNatImage = ""
vpcNatEnabled = "unknown" vpcNatEnabled = "unknown"
VpcNatCmVersion = "" VpcNatCmVersion = ""
createAt = "" NAT_GW_CREATED_AT = ""
) )
const ( const (
...@@ -267,7 +267,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { ...@@ -267,7 +267,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
if needToCreate { if needToCreate {
_, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace). _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
Create(context.Background(), newSts, metav1.CreateOptions{}) Create(context.Background(), newSts, metav1.CreateOptions{})
// if pod create successfully, will add initVpcNatGatewayQueue, then syncVpcNatGwRules
if err != nil { if err != nil {
klog.Errorf("failed to create statefulset '%s', err: %v", newSts.Name, err) klog.Errorf("failed to create statefulset '%s', err: %v", newSts.Name, err)
return err return err
...@@ -282,17 +282,6 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { ...@@ -282,17 +282,6 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
return err return err
} }
} }
pod, err := c.getNatGwPod(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
if _, ok := pod.Annotations[util.VpcNatGatewayInitAnnotation]; ok {
return c.syncVpcNatGwRules(key)
}
return nil return nil
} }
...@@ -358,8 +347,8 @@ func (c *Controller) handleInitVpcNatGw(key string) error { ...@@ -358,8 +347,8 @@ func (c *Controller) handleInitVpcNatGw(key string) error {
if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit { if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {
return nil return nil
} }
createAt = pod.CreationTimestamp.Format("2006-01-02T15:04:05") NAT_GW_CREATED_AT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
klog.V(3).Infof("nat gw pod '%s' inited at %s", key, createAt) klog.V(3).Infof("nat gw pod '%s' inited at %s", key, NAT_GW_CREATED_AT)
if err = c.execNatGwRules(pod, natGwInit, []string{v4Cidr}); err != nil { if err = c.execNatGwRules(pod, natGwInit, []string{v4Cidr}); err != nil {
klog.Errorf("failed to init vpc nat gateway, %v", err) klog.Errorf("failed to init vpc nat gateway, %v", err)
return err return err
...@@ -401,9 +390,9 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error { ...@@ -401,9 +390,9 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error {
} }
for _, fip := range fips.Items { for _, fip := range fips.Items {
if fip.Status.Redo != createAt { if fip.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo fip %s", fip.Name) klog.V(3).Infof("redo fip %s", fip.Name)
if err = c.redoFip(fip.Name, createAt, false); err != nil { if err = c.redoFip(fip.Name, NAT_GW_CREATED_AT, false); err != nil {
klog.Errorf("failed to update eip '%s' to make sure applied, %v", fip.Spec.EIP, err) klog.Errorf("failed to update eip '%s' to make sure applied, %v", fip.Spec.EIP, err)
return err return err
} }
...@@ -433,9 +422,9 @@ func (c *Controller) handleUpdateVpcEip(natGwKey string) error { ...@@ -433,9 +422,9 @@ func (c *Controller) handleUpdateVpcEip(natGwKey string) error {
return err return err
} }
for _, eip := range eips.Items { for _, eip := range eips.Items {
if eip.Spec.NatGwDp == natGwKey && eip.Status.Redo != createAt { if eip.Spec.NatGwDp == natGwKey && eip.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo eip %s", eip.Name) klog.V(3).Infof("redo eip %s", eip.Name)
if err = c.patchEipStatus(eip.Name, "", createAt, "", false); err != nil { if err = c.patchEipStatus(eip.Name, "", NAT_GW_CREATED_AT, "", false); err != nil {
klog.Errorf("failed to update eip '%s' to make sure applied, %v", eip.Name, err) klog.Errorf("failed to update eip '%s' to make sure applied, %v", eip.Name, err)
return err return err
} }
...@@ -465,9 +454,9 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error { ...@@ -465,9 +454,9 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {
return err return err
} }
for _, snat := range snats.Items { for _, snat := range snats.Items {
if snat.Status.Redo != createAt { if snat.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo snat %s", snat.Name) klog.V(3).Infof("redo snat %s", snat.Name)
if err = c.redoSnat(snat.Name, createAt, false); err != nil { if err = c.redoSnat(snat.Name, NAT_GW_CREATED_AT, false); err != nil {
klog.Errorf("failed to update eip '%s' to make sure applied, %v", snat.Spec.EIP, err) klog.Errorf("failed to update eip '%s' to make sure applied, %v", snat.Spec.EIP, err)
return err return err
} }
...@@ -498,9 +487,9 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error { ...@@ -498,9 +487,9 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {
return err return err
} }
for _, dnat := range dnats.Items { for _, dnat := range dnats.Items {
if dnat.Status.Redo != createAt { if dnat.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo dnat %s", dnat.Name) klog.V(3).Infof("redo dnat %s", dnat.Name)
if err = c.redoDnat(dnat.Name, createAt, false); err != nil { if err = c.redoDnat(dnat.Name, NAT_GW_CREATED_AT, false); err != nil {
klog.Errorf("failed to update dnat '%s' to make sure applied, %v", dnat.Name, err) klog.Errorf("failed to update dnat '%s' to make sure applied, %v", dnat.Name, err)
return err return err
} }
...@@ -768,7 +757,7 @@ func (c *Controller) checkVpcExternalNet() (err error) { ...@@ -768,7 +757,7 @@ func (c *Controller) checkVpcExternalNet() (err error) {
} }
func (c *Controller) initCreateAt(key string) (err error) { func (c *Controller) initCreateAt(key string) (err error) {
if createAt != "" { if NAT_GW_CREATED_AT != "" {
return nil return nil
} }
pod, err := c.getNatGwPod(key) pod, err := c.getNatGwPod(key)
...@@ -778,6 +767,6 @@ func (c *Controller) initCreateAt(key string) (err error) { ...@@ -778,6 +767,6 @@ func (c *Controller) initCreateAt(key string) (err error) {
} }
return err return err
} }
createAt = pod.CreationTimestamp.Format("2006-01-02T15:04:05") NAT_GW_CREATED_AT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
return nil return nil
} }
...@@ -227,7 +227,7 @@ func (c *Controller) handleAddIptablesEip(key string) error { ...@@ -227,7 +227,7 @@ func (c *Controller) handleAddIptablesEip(key string) error {
} }
return err return err
} }
if cachedEip.Spec.MacAddress != "" { if cachedEip.Status.Ready && cachedEip.Status.IP != "" {
// already ok // already ok
return nil return nil
} }
...@@ -261,7 +261,10 @@ func (c *Controller) handleAddIptablesEip(key string) error { ...@@ -261,7 +261,10 @@ func (c *Controller) handleAddIptablesEip(key string) error {
klog.Errorf("failed to update eip %s, %v", key, err) klog.Errorf("failed to update eip %s, %v", key, err)
return err return err
} }
if err = c.patchEipStatus(key, v4ip, "", "", true); err != nil { if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchEipStatus(key, v4ip, NAT_GW_CREATED_AT, "", true); err != nil {
klog.Errorf("failed to patch status for eip %s, %v", key, err) klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err return err
} }
...@@ -484,7 +487,8 @@ func (c *Controller) handleUpdateIptablesEip(key string) error { ...@@ -484,7 +487,8 @@ func (c *Controller) handleUpdateIptablesEip(key string) error {
} }
// redo // redo
if eip.Status.Redo != "" && if !eip.Status.Ready &&
eip.Status.Redo != "" &&
eip.Status.IP != "" && eip.Status.IP != "" &&
eip.DeletionTimestamp.IsZero() { eip.DeletionTimestamp.IsZero() {
eipV4Cidr, err := c.getEipV4Cidr(eip.Status.IP) eipV4Cidr, err := c.getEipV4Cidr(eip.Status.IP)
......
...@@ -520,6 +520,10 @@ func (c *Controller) handleAddIptablesFip(key string) error { ...@@ -520,6 +520,10 @@ func (c *Controller) handleAddIptablesFip(key string) error {
} }
return err return err
} }
if cachedFip.Status.Ready && cachedFip.Status.V4ip != "" {
// already ok
return nil
}
fip := cachedFip.DeepCopy() fip := cachedFip.DeepCopy()
klog.V(3).Infof("handle add fip %s", key) klog.V(3).Infof("handle add fip %s", key)
// get eip // get eip
...@@ -562,7 +566,10 @@ func (c *Controller) handleAddIptablesFip(key string) error { ...@@ -562,7 +566,10 @@ func (c *Controller) handleAddIptablesFip(key string) error {
klog.Errorf("failed to patch status for eip %s, %v", key, err) klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err return err
} }
if err = c.patchFipStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil { if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchFipStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, NAT_GW_CREATED_AT, true); err != nil {
klog.Errorf("failed to patch status for fip %s, %v", key, err) klog.Errorf("failed to patch status for fip %s, %v", key, err)
return err return err
} }
...@@ -652,7 +659,8 @@ func (c *Controller) handleUpdateIptablesFip(key string) error { ...@@ -652,7 +659,8 @@ func (c *Controller) handleUpdateIptablesFip(key string) error {
return nil return nil
} }
// redo // redo
if fip.Status.Redo != "" && if !fip.Status.Ready &&
fip.Status.Redo != "" &&
fip.Status.V4ip != "" && fip.Status.V4ip != "" &&
fip.DeletionTimestamp.IsZero() { fip.DeletionTimestamp.IsZero() {
klog.V(3).Infof("reapply fip '%s' in pod ", key) klog.V(3).Infof("reapply fip '%s' in pod ", key)
...@@ -694,7 +702,7 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error { ...@@ -694,7 +702,7 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error {
} }
return err return err
} }
if cachedDnat.Status.V4ip != "" { if cachedDnat.Status.Ready && cachedDnat.Status.V4ip != "" {
// already ok // already ok
return nil return nil
} }
...@@ -738,7 +746,10 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error { ...@@ -738,7 +746,10 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error {
klog.Errorf("failed to patch status for eip %s, %v", key, err) klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err return err
} }
if err = c.patchDnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil { if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchDnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, NAT_GW_CREATED_AT, true); err != nil {
klog.Errorf("failed to patch status for dnat %s, %v", key, err) klog.Errorf("failed to patch status for dnat %s, %v", key, err)
return err return err
} }
...@@ -831,7 +842,8 @@ func (c *Controller) handleUpdateIptablesDnatRule(key string) error { ...@@ -831,7 +842,8 @@ func (c *Controller) handleUpdateIptablesDnatRule(key string) error {
return nil return nil
} }
// redo // redo
if dnat.Status.Redo != "" && if !dnat.Status.Ready &&
dnat.Status.Redo != "" &&
dnat.Status.V4ip != "" && dnat.Status.V4ip != "" &&
dnat.DeletionTimestamp.IsZero() { dnat.DeletionTimestamp.IsZero() {
klog.V(3).Infof("reapply dnat in pod for %s", key) klog.V(3).Infof("reapply dnat in pod for %s", key)
...@@ -875,7 +887,7 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error { ...@@ -875,7 +887,7 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error {
} }
return err return err
} }
if cachedSnat.Status.V4ip != "" { if cachedSnat.Status.Ready && cachedSnat.Status.V4ip != "" {
// already ok // already ok
return nil return nil
} }
...@@ -919,7 +931,10 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error { ...@@ -919,7 +931,10 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error {
klog.Errorf("failed to patch status for eip %s, %v", key, err) klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err return err
} }
if err = c.patchSnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil { if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchSnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, NAT_GW_CREATED_AT, true); err != nil {
klog.Errorf("failed to update status for snat %s, %v", key, err) klog.Errorf("failed to update status for snat %s, %v", key, err)
return err return err
} }
...@@ -1008,7 +1023,8 @@ func (c *Controller) handleUpdateIptablesSnatRule(key string) error { ...@@ -1008,7 +1023,8 @@ func (c *Controller) handleUpdateIptablesSnatRule(key string) error {
return nil return nil
} }
// redo // redo
if snat.Status.Redo != "" && if !snat.Status.Ready &&
snat.Status.Redo != "" &&
snat.Status.V4ip != "" && snat.Status.V4ip != "" &&
snat.DeletionTimestamp.IsZero() { snat.DeletionTimestamp.IsZero() {
if err = c.createSnatInPod(snat.Status.NatGwDp, snat.Status.V4ip, v4Cidr); err != nil { if err = c.createSnatInPod(snat.Status.NatGwDp, snat.Status.V4ip, v4Cidr); err != nil {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment