Commit 45f64bfa authored by hzma's avatar hzma
Browse files

add process for pod attachment nic with subnet in default vpc

parent 712e6f49
Showing with 129 additions and 68 deletions
+129 -68
...@@ -88,12 +88,25 @@ func (c *Controller) enqueueAddPod(obj interface{}) { ...@@ -88,12 +88,25 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
return return
} }
podNets, err := c.getPodKubeovnNets(p)
if err != nil {
klog.Errorf("failed to get pod nets %v", err)
return
}
// In case update event might lost during leader election // In case update event might lost during leader election
if p.Annotations != nil && if p.Annotations != nil &&
p.Annotations[util.AllocatedAnnotation] == "true" && p.Annotations[util.AllocatedAnnotation] == "true" &&
p.Annotations[util.RoutedAnnotation] != "true" &&
p.Status.HostIP != "" && p.Status.PodIP != "" { p.Status.HostIP != "" && p.Status.PodIP != "" {
c.updatePodQueue.Add(key) for _, podNet := range podNets {
if !isOvnSubnet(podNet.Subnet) {
continue
}
if p.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] != "true" {
c.updatePodQueue.Add(key)
break
}
}
return return
} }
...@@ -202,18 +215,28 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { ...@@ -202,18 +215,28 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
} }
} }
podNets, err := c.getPodKubeovnNets(newPod)
if err != nil {
klog.Errorf("failed to get pod nets %v", err)
return
}
// pod assigned an ip // pod assigned an ip
if newPod.Annotations[util.AllocatedAnnotation] == "true" && if newPod.Annotations[util.AllocatedAnnotation] == "true" &&
newPod.Annotations[util.RoutedAnnotation] != "true" &&
newPod.Spec.NodeName != "" { newPod.Spec.NodeName != "" {
klog.V(3).Infof("enqueue update pod %s", key) for _, podNet := range podNets {
c.updatePodQueue.Add(key) if !isOvnSubnet(podNet.Subnet) {
} continue
}
podNets, err := c.getPodKubeovnNets(newPod) if newPod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] != "true" {
if err != nil { klog.V(3).Infof("enqueue update pod %s", key)
return c.updatePodQueue.Add(key)
break
}
}
} }
// security policy changed // security policy changed
for _, podNet := range podNets { for _, podNet := range podNets {
oldSecurity := oldPod.Annotations[fmt.Sprintf(util.PortSecurityAnnotationTemplate, podNet.ProviderName)] oldSecurity := oldPod.Annotations[fmt.Sprintf(util.PortSecurityAnnotationTemplate, podNet.ProviderName)]
...@@ -428,6 +451,7 @@ func (c *Controller) handleAddPod(key string) error { ...@@ -428,6 +451,7 @@ func (c *Controller) handleAddPod(key string) error {
podNets, err := c.getPodKubeovnNets(pod) podNets, err := c.getPodKubeovnNets(pod)
if err != nil { if err != nil {
klog.Errorf("failed to get pod nets %v", err)
return err return err
} }
...@@ -692,87 +716,85 @@ func (c *Controller) handleUpdatePod(key string) error { ...@@ -692,87 +716,85 @@ func (c *Controller) handleUpdatePod(key string) error {
} }
for _, podNet := range podNets { for _, podNet := range podNets {
// routing should be configured only if the OVN network is the default network if !isOvnSubnet(podNet.Subnet) {
if !podNet.IsDefault || util.OvnProvider != podNet.ProviderName {
continue continue
} }
podIP = pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)] podIP = pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)]
subnet = podNet.Subnet subnet = podNet.Subnet
break
}
if podIP != "" && subnet.Spec.Vlan == "" && subnet.Spec.Vpc == util.DefaultVpc { if podIP != "" && subnet.Spec.Vlan == "" && subnet.Spec.Vpc == util.DefaultVpc {
if pod.Annotations[util.EipAnnotation] != "" || pod.Annotations[util.SnatAnnotation] != "" { if pod.Annotations[util.EipAnnotation] != "" || pod.Annotations[util.SnatAnnotation] != "" {
cm, err := c.configMapsLister.ConfigMaps("kube-system").Get(util.ExternalGatewayConfig) cm, err := c.configMapsLister.ConfigMaps("kube-system").Get(util.ExternalGatewayConfig)
if err != nil {
klog.Errorf("failed to get ex-gateway config, %v", err)
return err
}
nextHop := cm.Data["nic-ip"]
if nextHop == "" {
klog.Errorf("no available gateway nic address")
return fmt.Errorf("no available gateway nic address")
}
if !strings.Contains(nextHop, "/") {
klog.Errorf("gateway nic address's format is invalid")
return fmt.Errorf("gateway nic address's format is invalid")
}
nextHop = strings.Split(nextHop, "/")[0]
if addr := cm.Data["external-gw-addr"]; addr != "" {
nextHop = addr
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, nextHop, c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err
}
} else {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType && pod.Annotations[util.NorthGatewayAnnotation] == "" {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil { if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err) klog.Errorf("failed to get ex-gateway config, %v", err)
return err return err
} }
nodeTunlIPAddr, err := getNodeTunlIP(node) nextHop := cm.Data["nic-ip"]
if err != nil { if nextHop == "" {
klog.Errorf("no available gateway nic address")
return fmt.Errorf("no available gateway nic address")
}
if !strings.Contains(nextHop, "/") {
klog.Errorf("gateway nic address's format is invalid")
return fmt.Errorf("gateway nic address's format is invalid")
}
nextHop = strings.Split(nextHop, "/")[0]
if addr := cm.Data["external-gw-addr"]; addr != "" {
nextHop = addr
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, nextHop, c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err return err
} }
} else {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType && pod.Annotations[util.NorthGatewayAnnotation] == "" {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
return err
}
nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
return err
}
for _, nodeAddr := range nodeTunlIPAddr { for _, nodeAddr := range nodeTunlIPAddr {
for _, podAddr := range strings.Split(podIP, ",") { for _, podAddr := range strings.Split(podIP, ",") {
if util.CheckProtocol(nodeAddr.String()) != util.CheckProtocol(podAddr) { if util.CheckProtocol(nodeAddr.String()) != util.CheckProtocol(podAddr) {
continue continue
} }
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podAddr, nodeAddr.String(), c.config.ClusterRouter, util.NormalRouteType); err != nil { if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podAddr, nodeAddr.String(), c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err) klog.Errorf("failed to add static route, %v", err)
return err return err
}
} }
} }
} }
}
if pod.Annotations[util.NorthGatewayAnnotation] != "" { if pod.Annotations[util.NorthGatewayAnnotation] != "" {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, pod.Annotations[util.NorthGatewayAnnotation], c.config.ClusterRouter, util.NormalRouteType); err != nil { if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, pod.Annotations[util.NorthGatewayAnnotation], c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err) klog.Errorf("failed to add static route, %v", err)
return err return err
}
} }
} }
}
for _, ipStr := range strings.Split(podIP, ",") { for _, ipStr := range strings.Split(podIP, ",") {
if err := c.ovnClient.UpdateNatRule("dnat_and_snat", ipStr, pod.Annotations[util.EipAnnotation], c.config.ClusterRouter, pod.Annotations[util.MacAddressAnnotation], fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)); err != nil { if err := c.ovnClient.UpdateNatRule("dnat_and_snat", ipStr, pod.Annotations[util.EipAnnotation], c.config.ClusterRouter, pod.Annotations[util.MacAddressAnnotation], fmt.Sprintf("%s.%s", pod.Name, pod.Namespace)); err != nil {
klog.Errorf("failed to add nat rules, %v", err) klog.Errorf("failed to add nat rules, %v", err)
return err return err
} }
if err := c.ovnClient.UpdateNatRule("snat", ipStr, pod.Annotations[util.SnatAnnotation], c.config.ClusterRouter, "", ""); err != nil { if err := c.ovnClient.UpdateNatRule("snat", ipStr, pod.Annotations[util.SnatAnnotation], c.config.ClusterRouter, "", ""); err != nil {
klog.Errorf("failed to add nat rules, %v", err) klog.Errorf("failed to add nat rules, %v", err)
return err return err
}
} }
} }
}
pod.Annotations[util.RoutedAnnotation] = "true" pod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true"
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil { if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) { if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod. // Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
......
...@@ -713,6 +713,10 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) { ...@@ -713,6 +713,10 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) {
} }
} }
} }
attachIps, err := c.getAttachmentLocalPodIPsNeedNAT(pod, hostname, protocol)
if len(attachIps) != 0 && err == nil {
localPodIPs = append(localPodIPs, attachIps...)
}
} }
klog.V(3).Infof("local pod ips %v", localPodIPs) klog.V(3).Infof("local pod ips %v", localPodIPs)
...@@ -1071,3 +1075,37 @@ func getIptablesRuleNum(table, chain, rule, dstNatIp string) (string, error) { ...@@ -1071,3 +1075,37 @@ func getIptablesRuleNum(table, chain, rule, dstNatIp string) (string, error) {
} }
return num, nil return num, nil
} }
func (c *Controller) getAttachmentLocalPodIPsNeedNAT(pod *v1.Pod, hostname, protocol string) ([]string, error) {
var attachPodIPs []string
attachNets, err := util.ParsePodNetworkAnnotation(pod.Annotations[util.AttachmentNetworkAnnotation], pod.Namespace)
if err != nil {
klog.Errorf("failed to parse attach net for pod '%s', %v", pod.Name, err)
return attachPodIPs, err
}
for _, multiNet := range attachNets {
provider := fmt.Sprintf("%s.%s.ovn", multiNet.Name, multiNet.Namespace)
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
subnet, err := c.subnetsLister.Get(pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)])
if err != nil {
klog.Errorf("get subnet %s failed, %+v", pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)], err)
continue
}
if subnet.Spec.NatOutgoing &&
subnet.Spec.Vpc == util.DefaultVpc &&
subnet.Spec.GatewayType == kubeovnv1.GWDistributedType &&
pod.Spec.NodeName == hostname {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, provider)])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
attachPodIPs = append(attachPodIPs, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
attachPodIPs = append(attachPodIPs, ipv6)
}
}
}
}
return attachPodIPs, nil
}
...@@ -37,6 +37,7 @@ const ( ...@@ -37,6 +37,7 @@ const (
AllocatedAnnotationSuffix = ".kubernetes.io/allocated" AllocatedAnnotationSuffix = ".kubernetes.io/allocated"
AllocatedAnnotationTemplate = "%s.kubernetes.io/allocated" AllocatedAnnotationTemplate = "%s.kubernetes.io/allocated"
RoutedAnnotationTemplate = "%s.kubernetes.io/routed"
MacAddressAnnotationTemplate = "%s.kubernetes.io/mac_address" MacAddressAnnotationTemplate = "%s.kubernetes.io/mac_address"
IpAddressAnnotationTemplate = "%s.kubernetes.io/ip_address" IpAddressAnnotationTemplate = "%s.kubernetes.io/ip_address"
CidrAnnotationTemplate = "%s.kubernetes.io/cidr" CidrAnnotationTemplate = "%s.kubernetes.io/cidr"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment