Commit 46287a5b authored by hzma's avatar hzma
Browse files

replace static route with policy route

parent bf167a60
Showing with 1026 additions and 208 deletions
+1026 -208
......@@ -397,6 +397,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
if err := c.initSyncCrdVlans(); err != nil {
klog.Errorf("failed to sync crd vlans: %v", err)
}
if err := c.initDeleteOverlayPodsStaticRoutes(); err != nil {
klog.Errorf("failed to delete pod's static route in default vpc: %v", err)
}
// start workers to do all the network operations
c.startWorkers(stopCh)
......
......@@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
)
......@@ -494,6 +495,21 @@ func (c *Controller) gcPortGroup() error {
for _, node := range nodes {
npNames = append(npNames, fmt.Sprintf("%s/%s", "node", node.Name))
}
// append overlay subnets port group to npNames to avoid gc distributed subnets port group
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}
for _, subnet := range subnets {
if subnet.Spec.Vpc != util.DefaultVpc || subnet.Spec.Vlan != "" || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
for _, node := range nodes {
npNames = append(npNames, fmt.Sprintf("%s/%s", subnet.Name, node.Name))
}
}
}
pgs, err := c.ovnClient.ListNpPortGroup()
......
......@@ -45,6 +45,11 @@ func (c *Controller) InitOVN() error {
return err
}
if err := c.createOverlaySubnetsAddressSet(); err != nil {
klog.Errorf("failed to create overlay subnets address-set, %v", err)
return err
}
return nil
}
......@@ -629,3 +634,35 @@ func (c *Controller) initAppendNodeExternalIds(portName, nodeName string) error
}
return nil
}
func (c *Controller) initDeleteOverlayPodsStaticRoutes() error {
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list pods: %v", err)
return err
}
for _, pod := range pods {
if pod.Spec.HostNetwork {
continue
}
podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod kubeovn nets %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
continue
}
for _, podNet := range podNets {
if !isOvnSubnet(podNet.Subnet) || podNet.Subnet.Spec.Vpc != util.DefaultVpc || podNet.Subnet.Spec.Vlan != "" || podNet.Subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
for _, podIP := range strings.Split(pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)], ",") {
if err := c.ovnClient.DeleteStaticRoute(podIP, podNet.Subnet.Spec.Vpc); err != nil {
return err
}
}
}
}
}
return nil
}
......@@ -223,7 +223,7 @@ func (c *Controller) handleUpdateNp(key string) error {
svcAsName = svcAsNameIPv6
svcIPs = svcIpv6s
}
if err := c.ovnClient.CreateAddressSet(svcAsName, np.Namespace, np.Name, "service"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(svcAsName, np.Namespace, np.Name, "service"); err != nil {
klog.Errorf("failed to create address_set %s, %v", svcAsNameIPv4, err)
return err
}
......@@ -285,7 +285,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
klog.Infof("UpdateNp Ingress, allows is %v, excepts is %v", allows, excepts)
if err := c.ovnClient.CreateAddressSet(ingressAllowAsName, np.Namespace, np.Name, "ingress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(ingressAllowAsName, np.Namespace, np.Name, "ingress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", ingressAllowAsName, err)
return err
}
......@@ -294,7 +294,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
if err := c.ovnClient.CreateAddressSet(ingressExceptAsName, np.Namespace, np.Name, "ingress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(ingressExceptAsName, np.Namespace, np.Name, "ingress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", ingressExceptAsName, err)
return err
}
......@@ -313,12 +313,12 @@ func (c *Controller) handleUpdateNp(key string) error {
if len(np.Spec.Ingress) == 0 {
ingressAllowAsName := fmt.Sprintf("%s.%s.all", ingressAllowAsNamePrefix, protocol)
ingressExceptAsName := fmt.Sprintf("%s.%s.all", ingressExceptAsNamePrefix, protocol)
if err := c.ovnClient.CreateAddressSet(ingressAllowAsName, np.Namespace, np.Name, "ingress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(ingressAllowAsName, np.Namespace, np.Name, "ingress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", ingressAllowAsName, err)
return err
}
if err := c.ovnClient.CreateAddressSet(ingressExceptAsName, np.Namespace, np.Name, "ingress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(ingressExceptAsName, np.Namespace, np.Name, "ingress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", ingressExceptAsName, err)
return err
}
......@@ -423,7 +423,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
klog.Infof("UpdateNp Egress, allows is %v, excepts is %v", allows, excepts)
if err := c.ovnClient.CreateAddressSet(egressAllowAsName, np.Namespace, np.Name, "egress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(egressAllowAsName, np.Namespace, np.Name, "egress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", egressAllowAsName, err)
return err
}
......@@ -432,7 +432,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
if err := c.ovnClient.CreateAddressSet(egressExceptAsName, np.Namespace, np.Name, "egress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(egressExceptAsName, np.Namespace, np.Name, "egress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", egressExceptAsName, err)
return err
}
......@@ -451,12 +451,12 @@ func (c *Controller) handleUpdateNp(key string) error {
if len(np.Spec.Egress) == 0 {
egressAllowAsName := fmt.Sprintf("%s.%s.all", egressAllowAsNamePrefix, protocol)
egressExceptAsName := fmt.Sprintf("%s.%s.all", egressExceptAsNamePrefix, protocol)
if err := c.ovnClient.CreateAddressSet(egressAllowAsName, np.Namespace, np.Name, "egress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(egressAllowAsName, np.Namespace, np.Name, "egress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", egressAllowAsName, err)
return err
}
if err := c.ovnClient.CreateAddressSet(egressExceptAsName, np.Namespace, np.Name, "egress"); err != nil {
if err := c.ovnClient.CreateNpAddressSet(egressExceptAsName, np.Namespace, np.Name, "egress"); err != nil {
klog.Errorf("failed to create address_set %s, %v", egressExceptAsName, err)
return err
}
......
......@@ -19,7 +19,6 @@ import (
"k8s.io/klog/v2"
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
goping "github.com/oilbeater/go-ping"
)
......@@ -307,6 +306,11 @@ func (c *Controller) handleAddNode(key string) error {
return err
}
if err := c.addPolicyRouteForNode(node.Name, ipStr); err != nil {
klog.Errorf("failed to add policy route for node %s, %v", key, err)
return err
}
if err := c.RemoveRedundantChassis(node); err != nil {
return err
}
......@@ -425,6 +429,10 @@ func (c *Controller) handleDeleteNode(key string) error {
klog.Errorf("failed to delete port group %s for node, %v", portName, err)
return err
}
if err := c.deletePolicyRouteForNode(key); err != nil {
klog.Errorf("failed to delete policy route for node %s: %v", key, err)
return err
}
addresses := c.ipam.GetPodAddress(portName)
for _, addr := range addresses {
......@@ -661,15 +669,20 @@ func (c *Controller) checkGatewayReady() error {
for _, node := range nodes {
ipStr := node.Annotations[util.IpAddressAnnotation]
for _, ip := range strings.Split(ipStr, ",") {
var cidrBlock string
for _, cidrBlock = range strings.Split(subnet.Spec.CIDRBlock, ",") {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
continue
}
exist, err := c.checkNodeEcmpRouteExist(ip, cidrBlock)
exist, err := c.checkPolicyRouteExistForNode(node.Name, cidrBlock)
if err != nil {
klog.Errorf("get ecmp static route for subnet %v, error %v", subnet.Name, err)
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
break
}
nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
break
}
......@@ -697,27 +710,36 @@ func (c *Controller) checkGatewayReady() error {
}
if !success {
klog.Warningf("failed to ping ovn0 %s or node %v is not ready", ip, node.Name)
if exist {
if err := c.ovnClient.DeleteMatchedStaticRoute(cidrBlock, ip, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route %s for node %s, %v", ip, node.Name, err)
klog.Warningf("failed to ping ovn0 %s or node %v is not ready, delete ecmp policy route for node", ip, node.Name)
nextHops = util.RemoveString(nextHops, ip)
delete(nameIpMap, node.Name)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
} else {
klog.V(3).Infof("succeed to ping gw %s", ip)
if !exist {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, subnet.Spec.CIDRBlock, ip, c.config.ClusterRouter, util.EcmpRouteType); err != nil {
klog.Errorf("failed to add static route for node %s, %v", node.Name, err)
nextHops = append(nextHops, ip)
if nameIpMap == nil {
nameIpMap = make(map[string]string, 1)
}
nameIpMap[node.Name] = ip
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
}
} else {
if exist {
klog.Infof("subnet %v gatewayNode does not contains node %v, should delete ecmp route for node ip %s", subnet.Name, node.Name, ip)
if err := c.ovnClient.DeleteMatchedStaticRoute(cidrBlock, ip, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route %s for node %s, %v", ip, node.Name, err)
klog.Infof("subnet %v gatewayNode does not contains node %v, delete policy route for node ip %s", subnet.Name, node.Name, ip)
nextHops = util.RemoveString(nextHops, ip)
delete(nameIpMap, node.Name)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
......@@ -729,7 +751,7 @@ func (c *Controller) checkGatewayReady() error {
return nil
}
func (c *Controller) checkNodeEcmpRouteExist(nodeIp, cidrBlock string) (bool, error) {
func (c *Controller) checkRouteExist(nextHop, cidrBlock, routePolicy string) (bool, error) {
routes, err := c.ovnClient.GetStaticRouteList(c.config.ClusterRouter)
if err != nil {
klog.Errorf("failed to list static route %v", err)
......@@ -737,11 +759,12 @@ func (c *Controller) checkNodeEcmpRouteExist(nodeIp, cidrBlock string) (bool, er
}
for _, route := range routes {
if route.Policy != ovs.PolicySrcIP {
if route.Policy != routePolicy {
continue
}
if route.CIDR == cidrBlock && route.NextHop == nodeIp {
klog.V(3).Infof("src-ip static route exist for cidr %s, nexthop %v", cidrBlock, nodeIp)
if route.CIDR == cidrBlock && route.NextHop == nextHop {
klog.V(3).Infof("static route exists for cidr %s, nexthop %v", cidrBlock, nextHop)
return true, nil
}
}
......@@ -952,3 +975,161 @@ func (c *Controller) RemoveRedundantChassis(node *v1.Node) error {
}
return nil
}
func (c *Controller) getPolicyRouteParas(cidr string) ([]string, map[string]string, error) {
ipSuffix := "ip4"
subnetAsName := getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv4)
if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
subnetAsName = getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv6)
}
match := fmt.Sprintf("%s.src == %s && %s.dst != $%s", ipSuffix, cidr, ipSuffix, subnetAsName)
nextHops, nameIpMap, err := c.ovnClient.GetPolicyRouteParas(util.CentralSubnetPriority, match)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return nextHops, nameIpMap, err
}
return nextHops, nameIpMap, nil
}
func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr string) (bool, error) {
_, nameIpMap, err := c.getPolicyRouteParas(cidr)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return false, err
}
if _, ok := nameIpMap[nodeName]; ok {
return true, nil
}
return false, nil
}
func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
return err
}
for _, subnet := range subnets {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc || subnet.Name == c.config.NodeSwitch {
continue
}
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
if err = c.ovnClient.DeletePortGroup(pgName); err != nil {
klog.Errorf("failed to delete port group for subnet %s and node %s, %v", subnet.Name, nodeName, err)
return err
}
if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
klog.Errorf("failed to delete policy route for subnet %s and node %s, %v", subnet.Name, nodeName, err)
return err
}
}
if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
if c.config.EnableEcmp {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
nextHops = util.RemoveString(nextHops, nameIpMap[nodeName])
delete(nameIpMap, nodeName)
if len(nextHops) == 0 {
if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
return err
}
} else {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
return err
}
}
}
}
} else {
if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
return err
}
}
}
}
return nil
}
func (c *Controller) addPolicyRouteForNode(nodeName, nodeIP string) error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
return err
}
for _, subnet := range subnets {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
continue
}
if c.config.EnableEcmp {
if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
continue
}
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}
nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}
for _, nextHop := range strings.Split(nodeIP, ",") {
if util.CheckProtocol(cidrBlock) == util.CheckProtocol(nextHop) {
continue
}
nextHops = append(nextHops, nextHop)
nameIpMap[nodeName] = nextHop
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
return err
}
}
}
} else {
if subnet.Status.ActivateGateway != nodeName {
continue
}
if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
return err
}
}
}
return nil
}
......@@ -625,6 +625,7 @@ func (c *Controller) handleDeletePod(pod *v1.Pod) error {
if err != nil {
klog.Warningf("filed to get port '%s' sg, %v", portName, err)
}
// when lsp is deleted, the port of pod is deleted from any port-group automatically.
if err := c.ovnClient.DeleteLogicalSwitchPort(portName); err != nil {
klog.Errorf("failed to delete lsp %s, %v", portName, err)
return err
......@@ -708,13 +709,8 @@ func (c *Controller) handleUpdatePod(key string) error {
}
return err
}
podName := c.getNameByPod(oripod)
pod := oripod.DeepCopy()
// in case update handler overlap the annotation when cache is not in sync
if pod.Annotations[util.AllocatedAnnotation] == "" {
return fmt.Errorf("no address has been allocated to %s/%s", namespace, name)
}
podName := c.getNameByPod(pod)
klog.Infof("update pod %s/%s", namespace, name)
......@@ -726,83 +722,137 @@ func (c *Controller) handleUpdatePod(key string) error {
return err
}
_, idNameMap, err := c.ovnClient.ListLspForNodePortgroup()
if err != nil {
klog.Errorf("failed to list lsp info, %v", err)
return err
}
for _, podNet := range podNets {
// routing should be configured only if the OVN network is the default network
if !podNet.IsDefault || util.OvnProvider != podNet.ProviderName {
if !isOvnSubnet(podNet.Subnet) {
continue
}
// in case update handler overlap the annotation when cache is not in sync
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "" {
return fmt.Errorf("no address has been allocated to %s/%s", namespace, name)
}
podIP = pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)]
subnet = podNet.Subnet
break
}
if podIP != "" && subnet.Spec.Vlan == "" && subnet.Spec.Vpc == util.DefaultVpc {
if pod.Annotations[util.EipAnnotation] != "" || pod.Annotations[util.SnatAnnotation] != "" {
cm, err := c.configMapsLister.ConfigMaps("kube-system").Get(util.ExternalGatewayConfig)
if err != nil {
klog.Errorf("failed to get ex-gateway config, %v", err)
return err
}
nextHop := cm.Data["external-gw-addr"]
if nextHop == "" {
klog.Errorf("no available gateway nic address")
return fmt.Errorf("no available gateway nic address")
}
if strings.Contains(nextHop, "/") {
nextHop = strings.Split(nextHop, "/")[0]
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, nextHop, c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err
}
} else {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType && pod.Annotations[util.NorthGatewayAnnotation] == "" {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if podIP != "" && subnet.Spec.Vlan == "" && subnet.Spec.Vpc == util.DefaultVpc {
if pod.Annotations[util.EipAnnotation] != "" || pod.Annotations[util.SnatAnnotation] != "" {
cm, err := c.configMapsLister.ConfigMaps("kube-system").Get(util.ExternalGatewayConfig)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
klog.Errorf("failed to get ex-gateway config, %v", err)
return err
}
nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
nextHop := cm.Data["external-gw-addr"]
if nextHop == "" {
klog.Errorf("no available gateway nic address")
return fmt.Errorf("no available gateway nic address")
}
if strings.Contains(nextHop, "/") {
nextHop = strings.Split(nextHop, "/")[0]
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, nextHop, c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err
}
} else {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType && pod.Annotations[util.NorthGatewayAnnotation] == "" {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
return err
}
for _, nodeAddr := range nodeTunlIPAddr {
for _, podAddr := range strings.Split(podIP, ",") {
if util.CheckProtocol(nodeAddr.String()) != util.CheckProtocol(podAddr) {
continue
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podAddr, nodeAddr.String(), c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
pgName := getOverlaySubnetsPortGroupName(subnet.Name, node.Name)
exist, err := c.ovnClient.PortGroupExists(pgName)
if err != nil {
return err
}
if !exist {
if err = c.ovnClient.CreateNpPortGroup(pgName, subnet.Name, node.Name); err != nil {
klog.Errorf("failed to create port group for subnet %s and node %s, %v", subnet.Name, node.Name, err)
return err
}
}
nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
return err
}
for _, nodeAddr := range nodeTunlIPAddr {
for _, podAddr := range strings.Split(podIP, ",") {
if util.CheckProtocol(nodeAddr.String()) != util.CheckProtocol(podAddr) {
continue
}
pgPorts, err := c.getPgPorts(idNameMap, pgName)
if err != nil {
klog.Errorf("failed to fetch ports for pg %v, %v", pgName, err)
return err
}
portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName)
if !util.IsStringIn(portName, pgPorts) {
pgPorts = append(pgPorts, portName)
if err = c.ovnClient.SetPortsToPortGroup(pgName, pgPorts); err != nil {
klog.Errorf("failed to set ports to port group %v, %v", pgName, err)
return err
}
}
ipSuffix := "ip4"
subnetAsName := getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv4)
if util.CheckProtocol(nodeAddr.String()) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
subnetAsName = getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv6)
}
pgAs := fmt.Sprintf("%s_%s", pgName, ipSuffix)
match := fmt.Sprintf("%s.src == $%s && %s.dst != $%s", ipSuffix, pgAs, ipSuffix, subnetAsName)
exist, err := c.ovnClient.PolicyRouteExists(util.PodRouterPolicyPriority, match)
if err != nil {
return err
}
if !exist {
if err = c.ovnClient.AddPolicyRoute(c.config.ClusterRouter, util.PodRouterPolicyPriority, match, "reroute", nodeAddr.String()); err != nil {
klog.Errorf("failed to add logical router policy for port-group address-set %s: %v", pgAs, err)
return err
}
}
}
}
}
}
if pod.Annotations[util.NorthGatewayAnnotation] != "" {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, pod.Annotations[util.NorthGatewayAnnotation], c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err
if pod.Annotations[util.NorthGatewayAnnotation] != "" {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, pod.Annotations[util.NorthGatewayAnnotation], c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err
}
}
}
}
for _, ipStr := range strings.Split(podIP, ",") {
if err := c.ovnClient.UpdateNatRule("dnat_and_snat", ipStr, pod.Annotations[util.EipAnnotation], c.config.ClusterRouter, pod.Annotations[util.MacAddressAnnotation], fmt.Sprintf("%s.%s", podName, pod.Namespace)); err != nil {
klog.Errorf("failed to add nat rules, %v", err)
return err
}
for _, ipStr := range strings.Split(podIP, ",") {
if err := c.ovnClient.UpdateNatRule("dnat_and_snat", ipStr, pod.Annotations[util.EipAnnotation], c.config.ClusterRouter, pod.Annotations[util.MacAddressAnnotation], fmt.Sprintf("%s.%s", podName, pod.Namespace)); err != nil {
klog.Errorf("failed to add nat rules, %v", err)
return err
}
if err := c.ovnClient.UpdateNatRule("snat", ipStr, pod.Annotations[util.SnatAnnotation], c.config.ClusterRouter, "", ""); err != nil {
klog.Errorf("failed to add nat rules, %v", err)
return err
if err := c.ovnClient.UpdateNatRule("snat", ipStr, pod.Annotations[util.SnatAnnotation], c.config.ClusterRouter, "", ""); err != nil {
klog.Errorf("failed to add nat rules, %v", err)
return err
}
}
}
}
pod.Annotations[util.RoutedAnnotation] = "true"
pod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true"
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
......
This diff is collapsed.
......@@ -328,22 +328,22 @@ func (c Client) AddLbToLogicalSwitch(tcpLb, tcpSessLb, udpLb, udpSessLb, ls stri
func (c Client) RemoveLbFromLogicalSwitch(tcpLb, tcpSessLb, udpLb, udpSessLb, ls string) error {
if err := c.removeLoadBalancerFromLogicalSwitch(tcpLb, ls); err != nil {
klog.Errorf("failed to add tcp lb to %s, %v", ls, err)
klog.Errorf("failed to remove tcp lb from %s, %v", ls, err)
return err
}
if err := c.removeLoadBalancerFromLogicalSwitch(udpLb, ls); err != nil {
klog.Errorf("failed to add udp lb to %s, %v", ls, err)
klog.Errorf("failed to remove udp lb from %s, %v", ls, err)
return err
}
if err := c.removeLoadBalancerFromLogicalSwitch(tcpSessLb, ls); err != nil {
klog.Errorf("failed to add tcp session lb to %s, %v", ls, err)
klog.Errorf("failed to remove tcp session lb from %s, %v", ls, err)
return err
}
if err := c.removeLoadBalancerFromLogicalSwitch(udpSessLb, ls); err != nil {
klog.Errorf("failed to add udp session lb to %s, %v", ls, err)
klog.Errorf("failed to remove udp session lb from %s, %v", ls, err)
return err
}
......@@ -725,6 +725,107 @@ func (c Client) AddStaticRoute(policy, cidr, nextHop, router string, routeType s
return nil
}
// AddPolicyRoute add a policy route rule in ovn
func (c Client) AddPolicyRoute(router string, priority int32, match string, action string, nextHop string) error {
exist, err := c.IsPolicyRouteExist(router, priority, match)
if err != nil {
return err
}
if exist {
return nil
}
// lr-policy-add ROUTER PRIORITY MATCH ACTION [NEXTHOP]
args := []string{"lr-policy-add", router, strconv.Itoa(int(priority)), match, action}
if nextHop != "" {
args = append(args, nextHop)
}
if _, err := c.ovnNbCommand(args...); err != nil {
return err
}
return nil
}
// DeletePolicyRoute delete a policy route rule in ovn
func (c Client) DeletePolicyRoute(router string, priority int32, match string) error {
exist, err := c.IsPolicyRouteExist(router, priority, match)
if err != nil {
return err
}
if !exist {
return nil
}
var args = []string{"lr-policy-del", router}
// lr-policy-del ROUTER [PRIORITY [MATCH]]
if priority > 0 {
args = append(args, strconv.Itoa(int(priority)))
if match != "" {
args = append(args, match)
}
}
_, err = c.ovnNbCommand(args...)
return err
}
func (c Client) IsPolicyRouteExist(router string, priority int32, match string) (bool, error) {
existPolicyRoute, err := c.GetPolicyRouteList(router)
if err != nil {
return false, err
}
for _, rule := range existPolicyRoute {
if rule.Priority != priority {
continue
}
if match == "" || rule.Match == match {
return true, nil
}
}
return false, nil
}
type PolicyRoute struct {
Priority int32
Match string
Action string
NextHopIP string
}
func (c Client) GetPolicyRouteList(router string) (routeList []*PolicyRoute, err error) {
output, err := c.ovnNbCommand("lr-policy-list", router)
if err != nil {
klog.Errorf("failed to list logical router policy route: %v", err)
return nil, err
}
return parseLrPolicyRouteListOutput(output)
}
var policyRouteRegexp = regexp.MustCompile(`^\s*(\d+)\s+(.*)\b\s+(allow|drop|reroute)\s*(.*)?$`)
func parseLrPolicyRouteListOutput(output string) (routeList []*PolicyRoute, err error) {
lines := strings.Split(output, "\n")
routeList = make([]*PolicyRoute, 0, len(lines))
for _, l := range lines {
if len(l) == 0 {
continue
}
sm := policyRouteRegexp.FindStringSubmatch(l)
if len(sm) != 5 {
continue
}
priority, err := strconv.ParseInt(sm[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("found unexpeted policy priority %s, please check", sm[1])
}
routeList = append(routeList, &PolicyRoute{
Priority: int32(priority),
Match: sm[2],
Action: sm[3],
NextHopIP: sm[4],
})
}
return routeList, nil
}
func (c Client) GetStaticRouteList(router string) (routeList []*StaticRoute, err error) {
output, err := c.ovnNbCommand("lr-route-list", router)
if err != nil {
......@@ -849,6 +950,10 @@ func (c Client) DeleteStaticRouteByNextHop(nextHop string) error {
if strings.TrimSpace(nextHop) == "" {
return nil
}
if util.CheckProtocol(nextHop) == kubeovnv1.ProtocolIPv6 {
nextHop = strings.ReplaceAll(nextHop, ":", "\\:")
}
output, err := c.ovnNbCommand("--format=csv", "--no-heading", "--data=bare", "--columns=ip_prefix", "find", "Logical_Router_Static_Route", fmt.Sprintf("nexthop=%s", nextHop))
if err != nil {
klog.Errorf("failed to list static route %s, %v", nextHop, err)
......@@ -1136,7 +1241,47 @@ func (c Client) ListNpPortGroup() ([]portGroup, error) {
return result, nil
}
func (c Client) CreateAddressSet(asName, npNamespace, npName, direction string) error {
func (c Client) CreateAddressSet(name string) error {
output, err := c.ovnNbCommand("--data=bare", "--no-heading", "--columns=_uuid", "find", "address_set", fmt.Sprintf("name=%s", name))
if err != nil {
klog.Errorf("failed to find address_set %s: %v, %q", name, err, output)
return err
}
if output != "" {
return nil
}
_, err = c.ovnNbCommand("create", "address_set", fmt.Sprintf("name=%s", name), fmt.Sprintf("external_ids:vendor=%s", util.CniTypeName))
return err
}
func (c Client) RemoveAddressSetAddresses(name string, address string) error {
output, err := c.ovnNbCommand("remove", "address_set", name, "addresses", strings.ReplaceAll(address, ":", `\:`))
if err != nil {
klog.Errorf("failed to remove address %s from address_set %s: %v, %q", address, name, err, output)
return err
}
return nil
}
func (c Client) ListAddressesByName(addressSetName string) ([]string, error) {
output, err := c.ovnNbCommand("--data=bare", "--no-heading", "--columns=addresses", "find", "address_set", fmt.Sprintf("name=%s", addressSetName))
if err != nil {
klog.Errorf("failed to list address_set of %s, error %v", addressSetName, err)
return nil, err
}
lines := strings.Split(output, "\n")
result := make([]string, 0, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
result = append(result, strings.Fields(l)...)
}
return result, nil
}
func (c Client) CreateNpAddressSet(asName, npNamespace, npName, direction string) error {
output, err := c.ovnNbCommand("--data=bare", "--no-heading", "--columns=_uuid", "find", "address_set", fmt.Sprintf("name=%s", asName))
if err != nil {
klog.Errorf("failed to find address_set %s: %v, %q", asName, err, output)
......@@ -1728,3 +1873,78 @@ func (c *Client) AclExists(priority, direction string) (bool, error) {
}
return true, nil
}
func (c *Client) PortGroupExists(pgName string) (bool, error) {
results, err := c.CustomFindEntity("port_group", []string{"_uuid"}, fmt.Sprintf("name=%s", pgName))
if err != nil {
klog.Errorf("customFindEntity failed, %v", err)
return false, err
}
if len(results) == 0 {
return false, nil
}
return true, nil
}
func (c *Client) PolicyRouteExists(priority int32, match string) (bool, error) {
results, err := c.CustomFindEntity("Logical_Router_Policy", []string{"_uuid"}, fmt.Sprintf("priority=%d", priority), fmt.Sprintf("match=\"%s\"", match))
if err != nil {
klog.Errorf("customFindEntity failed, %v", err)
return false, err
}
if len(results) == 0 {
return false, nil
}
return true, nil
}
func (c *Client) GetPolicyRouteParas(priority int32, match string) ([]string, map[string]string, error) {
var nexthops []string
result, err := c.CustomFindEntity("Logical_Router_Policy", []string{"nexthops", "external_ids"}, fmt.Sprintf("priority=%d", priority), fmt.Sprintf("match=\"%s\"", match))
if err != nil {
klog.Errorf("customFindEntity failed, %v", err)
return nexthops, nil, err
}
if len(result) == 0 {
return nexthops, nil, nil
}
nexthops = append(nexthops, result[0]["nexthops"]...)
nameIpMap := make(map[string]string, len(result[0]["external_ids"]))
for _, l := range result[0]["external_ids"] {
if len(strings.TrimSpace(l)) == 0 {
continue
}
parts := strings.Split(strings.TrimSpace(l), "=")
if len(parts) != 2 {
continue
}
name := strings.TrimSpace(parts[0])
ip := strings.TrimSpace(parts[1])
nameIpMap[name] = ip
}
return nexthops, nameIpMap, nil
}
func (c Client) SetPolicyRouteExternalIds(priority int32, match string, nameIpMaps map[string]string) error {
result, err := c.CustomFindEntity("Logical_Router_Policy", []string{"_uuid"}, fmt.Sprintf("priority=%d", priority), fmt.Sprintf("match=\"%s\"", match))
if err != nil {
klog.Errorf("customFindEntity failed, %v", err)
return err
}
if len(result) == 0 {
return nil
}
uuid := result[0]["_uuid"][0]
ovnCmd := []string{"set", "logical-router-policy", uuid}
for nodeName, nodeIP := range nameIpMaps {
ovnCmd = append(ovnCmd, fmt.Sprintf("external_ids:%s=\"%s\"", nodeName, nodeIP))
}
if _, err := c.ovnNbCommand(ovnCmd...); err != nil {
return fmt.Errorf("failed to set logical-router-policy externalIds, %v", err)
}
return nil
}
......@@ -120,6 +120,9 @@ const (
OffloadType = "offload-port"
InternalType = "internal-port"
PodRouterPolicyPriority = 20000
CentralSubnetPriority = 25000
ChassisLoc = "/etc/openvswitch/system-id.conf"
HostnameEnv = "KUBE_NODE_NAME"
ChasRetryTime = 5
......
......@@ -111,6 +111,11 @@ var _ = Describe("[Subnet]", func() {
It("centralized gateway", func() {
name := f.GetName()
cidr := "11.11.0.0/16"
if isIPv6 {
cidr = "fd00:11:11::/112"
}
By("create subnet")
s := kubeovn.Subnet{
ObjectMeta: metav1.ObjectMeta{
......@@ -118,7 +123,7 @@ var _ = Describe("[Subnet]", func() {
Labels: map[string]string{"e2e": "true"},
},
Spec: kubeovn.SubnetSpec{
CIDRBlock: "11.11.0.0/16",
CIDRBlock: cidr,
GatewayType: kubeovn.GWCentralizedType,
GatewayNode: "kube-ovn-control-plane,kube-ovn-worker,kube-ovn-worker2",
},
......@@ -140,6 +145,11 @@ var _ = Describe("[Subnet]", func() {
Describe("Update", func() {
It("distributed to centralized", func() {
name := f.GetName()
cidr := "11.12.0.0/16"
if isIPv6 {
cidr = "fd00:11:12::/112"
}
By("create subnet")
s := &kubeovn.Subnet{
ObjectMeta: metav1.ObjectMeta{
......@@ -147,7 +157,7 @@ var _ = Describe("[Subnet]", func() {
Labels: map[string]string{"e2e": "true"},
},
Spec: kubeovn.SubnetSpec{
CIDRBlock: "11.12.0.0/16",
CIDRBlock: cidr,
},
}
_, err := f.OvnClientSet.KubeovnV1().Subnets().Create(context.Background(), s, metav1.CreateOptions{})
......@@ -174,6 +184,10 @@ var _ = Describe("[Subnet]", func() {
Describe("Delete", func() {
It("normal deletion", func() {
name := f.GetName()
cidr := "11.13.0.0/16"
if isIPv6 {
cidr = "fd00:11:13::/112"
}
By("create subnet")
s := kubeovn.Subnet{
ObjectMeta: metav1.ObjectMeta{
......@@ -181,7 +195,7 @@ var _ = Describe("[Subnet]", func() {
Labels: map[string]string{"e2e": "true"},
},
Spec: kubeovn.SubnetSpec{
CIDRBlock: "11.13.0.0/16",
CIDRBlock: cidr,
},
}
_, err := f.OvnClientSet.KubeovnV1().Subnets().Create(context.Background(), &s, metav1.CreateOptions{})
......@@ -205,6 +219,10 @@ var _ = Describe("[Subnet]", func() {
Describe("cidr with nonstandard style", func() {
It("cidr ends with nonzero", func() {
name := f.GetName()
cidr := "11.14.0.10/16"
if isIPv6 {
cidr = "fd00:11:14::10/112"
}
By("create subnet")
s := &kubeovn.Subnet{
ObjectMeta: metav1.ObjectMeta{
......@@ -212,7 +230,7 @@ var _ = Describe("[Subnet]", func() {
Labels: map[string]string{"e2e": "true"},
},
Spec: kubeovn.SubnetSpec{
CIDRBlock: "11.14.0.1/16",
CIDRBlock: cidr,
},
}
......@@ -224,7 +242,12 @@ var _ = Describe("[Subnet]", func() {
s, err = f.OvnClientSet.KubeovnV1().Subnets().Get(context.Background(), name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(s.Spec.CIDRBlock).To(Equal("11.14.0.0/16"))
if !isIPv6 {
Expect(s.Spec.CIDRBlock).To(Equal("11.14.0.0/16"))
} else {
Expect(s.Spec.CIDRBlock).To(Equal("fd00:11:14::/112"))
}
})
})
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment