Commit 34d21a5d authored by hzma's avatar hzma
Browse files

add containerId to qos external-ids

parent 0be7e330
Showing with 163 additions and 39 deletions
+163 -39
......@@ -995,8 +995,17 @@ func (c *Controller) handlePod(key string) error {
return err
}
var containerId string
qosIfaceContainerIdMap, err := ovs.ListContainerIds("interface")
if err != nil {
return err
}
// set default nic bandwidth
ifaceID := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider)
if cid, ok := qosIfaceContainerIdMap[ifaceID]; ok {
containerId = cid
}
err = ovs.SetInterfaceBandwidth(pod.Name, pod.Namespace, ifaceID, pod.Annotations[util.EgressRateAnnotation], pod.Annotations[util.IngressRateAnnotation], pod.Annotations[util.PriorityAnnotation])
if err != nil {
return err
......@@ -1006,7 +1015,7 @@ func (c *Controller) handlePod(key string) error {
return err
}
// set linux-netem qos
err = ovs.SetNetemQos(pod.Name, pod.Namespace, ifaceID, pod.Annotations[util.NetemQosLatencyAnnotation], pod.Annotations[util.NetemQosLimitAnnotation], pod.Annotations[util.NetemQosLossAnnotation])
err = ovs.SetNetemQos(pod.Name, pod.Namespace, ifaceID, containerId, pod.Annotations[util.NetemQosLatencyAnnotation], pod.Annotations[util.NetemQosLimitAnnotation], pod.Annotations[util.NetemQosLossAnnotation])
if err != nil {
return err
}
......@@ -1020,6 +1029,9 @@ func (c *Controller) handlePod(key string) error {
provider := fmt.Sprintf("%s.%s.ovn", multiNet.Name, multiNet.Namespace)
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
ifaceID = ovs.PodNameToPortName(pod.Name, pod.Namespace, provider)
if cid, ok := qosIfaceContainerIdMap[ifaceID]; ok {
containerId = cid
}
err = ovs.SetInterfaceBandwidth(pod.Name, pod.Namespace, ifaceID, pod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.PriorityAnnotationTemplate, provider)])
if err != nil {
return err
......@@ -1028,7 +1040,7 @@ func (c *Controller) handlePod(key string) error {
if err != nil {
return err
}
err = ovs.SetNetemQos(pod.Name, pod.Namespace, ifaceID, pod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)])
err = ovs.SetNetemQos(pod.Name, pod.Namespace, ifaceID, containerId, pod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)])
if err != nil {
return err
}
......@@ -1150,6 +1162,10 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
if err != nil {
return err
}
qosIfaceContainerIdMap, err := ovs.ListContainerIds("interface")
if err != nil {
return err
}
for _, pod := range pods {
if pod.Spec.HostNetwork ||
......@@ -1165,7 +1181,7 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
if pod.Annotations[util.PriorityAnnotation] != "" {
priority = pod.Annotations[util.PriorityAnnotation]
}
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority, qosIfaceUidMap, queueIfaceUidMap, qosIfaceContainerIdMap); err != nil {
klog.Errorf("failed to set htbqos priority for pod %s/%s, iface %v: %v", pod.Namespace, pod.Name, ifaceID, err)
return err
}
......@@ -1189,7 +1205,7 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
priority = pod.Annotations[fmt.Sprintf(util.PriorityAnnotationTemplate, provider)]
}
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority, qosIfaceUidMap, queueIfaceUidMap, qosIfaceContainerIdMap); err != nil {
klog.Errorf("failed to set htbqos priority for pod %s/%s, iface %v: %v", pod.Namespace, pod.Name, ifaceID, err)
return err
}
......@@ -1199,7 +1215,7 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
return nil
}
func (c *Controller) clearQos(podName, podNamespace, ifaceID string) error {
func (c *Controller) clearQos(podName, podNamespace, ifaceID, containerId string) error {
if htbQos, _ := ovs.IsHtbQos(ifaceID); !htbQos {
return nil
}
......@@ -1209,7 +1225,7 @@ func (c *Controller) clearQos(podName, podNamespace, ifaceID string) error {
return err
}
if err := ovs.ClearPodBandwidth(podName, podNamespace, ifaceID); err != nil {
if err := ovs.ClearPodBandwidth(podName, podNamespace, ifaceID, containerId); err != nil {
klog.Errorf("failed to delete htbqos record for pod %s/%s, iface %v: %v", podNamespace, podName, ifaceID, err)
return err
}
......@@ -1228,6 +1244,12 @@ func (c *Controller) deleteSubnetQos(subnet *kubeovnv1.Subnet) error {
return err
}
var containerId string
qosIfaceContainerIdMap, err := ovs.ListContainerIds("interface")
if err != nil {
return err
}
for _, pod := range pods {
if pod.Spec.HostNetwork ||
pod.DeletionTimestamp != nil {
......@@ -1241,7 +1263,10 @@ func (c *Controller) deleteSubnetQos(subnet *kubeovnv1.Subnet) error {
continue
}
ifaceID := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider)
if err := c.clearQos(pod.Name, pod.Namespace, ifaceID); err != nil {
if cid, ok := qosIfaceContainerIdMap[ifaceID]; ok {
containerId = cid
}
if err := c.clearQos(pod.Name, pod.Namespace, ifaceID, containerId); err != nil {
return err
}
}
......@@ -1261,7 +1286,10 @@ func (c *Controller) deleteSubnetQos(subnet *kubeovnv1.Subnet) error {
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
ifaceID := ovs.PodNameToPortName(pod.Name, pod.Namespace, provider)
if err := c.clearQos(pod.Name, pod.Namespace, ifaceID); err != nil {
if cid, ok := qosIfaceContainerIdMap[ifaceID]; ok {
containerId = cid
}
if err := c.clearQos(pod.Name, pod.Namespace, ifaceID, containerId); err != nil {
return err
}
}
......
......@@ -55,6 +55,7 @@ func (csh cniServerHandler) configureNic(podName, podNamespace, provider, netns,
fmt.Sprintf("external_ids:pod_name=%s", podName),
fmt.Sprintf("external_ids:pod_namespace=%s", podNamespace),
fmt.Sprintf("external_ids:ip=%s", ipStr),
fmt.Sprintf("external_ids:containerid=%s", containerID),
fmt.Sprintf("external_ids:pod_netns=%s", netns))
if err != nil {
return fmt.Errorf("add nic to ovs failed %v: %q", err, output)
......@@ -101,7 +102,7 @@ func (csh cniServerHandler) deleteNic(podName, podNamespace, containerID, device
return fmt.Errorf("failed to delete ovs port %v, %q", err, output)
}
if err = ovs.ClearPodBandwidth(podName, podNamespace, ""); err != nil {
if err = ovs.ClearPodBandwidth(podName, podNamespace, "", containerID); err != nil {
return err
}
if err = ovs.ClearHtbQosQueue(podName, podNamespace, ""); err != nil {
......@@ -884,6 +885,7 @@ func (csh cniServerHandler) configureNicWithInternalPort(podName, podNamespace,
fmt.Sprintf("external_ids:pod_name=%s", podName),
fmt.Sprintf("external_ids:pod_namespace=%s", podNamespace),
fmt.Sprintf("external_ids:ip=%s", ipStr),
fmt.Sprintf("external_ids:containerid=%s", containerID),
fmt.Sprintf("external_ids:pod_netns=%s", netns))
if err != nil {
return containerNicName, fmt.Errorf("add nic to ovs failed %v: %q", err, output)
......
......@@ -106,28 +106,63 @@ func ovsGet(table, record, column, key string) (string, error) {
return Exec(args...)
}
func ovsRemove(table, record, column, key string) error {
args := []string{"remove"}
if key == "" {
args = append(args, table, record, column)
} else {
args = append(args, table, record, column, key)
}
_, err := Exec(args...)
return err
}
// Bridges returns bridges created by Kube-OVN
func Bridges() ([]string, error) {
return ovsFind("bridge", "name", fmt.Sprintf("external-ids:vendor=%s", util.CniTypeName))
}
// ClearPodBandwidth remove qos related to this pod.
func ClearPodBandwidth(podName, podNamespace, ifaceID string) error {
var qosList, qosListByPod []string
func GetQosList(podName, podNamespace, ifaceID, containerId string) ([]string, error) {
var qosList []string
var err error
if ifaceID != "" {
if containerId != "" {
qosList, err = ovsFind("qos", "_uuid", fmt.Sprintf(`external-ids:containerid="%s"`, containerId))
if err != nil {
return qosList, err
}
} else if ifaceID != "" {
qosList, err = ovsFind("qos", "_uuid", fmt.Sprintf(`external-ids:iface-id="%s"`, ifaceID))
if err != nil {
return err
return qosList, err
}
} else {
qosListByPod, err = ovsFind("qos", "_uuid", fmt.Sprintf(`external-ids:pod="%s/%s"`, podNamespace, podName))
qosList, err = ovsFind("qos", "_uuid", fmt.Sprintf(`external-ids:pod="%s/%s"`, podNamespace, podName))
if err != nil {
return err
return qosList, err
}
}
return qosList, nil
}
// ClearPodBandwidth remove qos related to this pod.
func ClearPodBandwidth(podName, podNamespace, ifaceID, containerId string) error {
qosList, err := GetQosList(podName, podNamespace, ifaceID, containerId)
if err != nil {
return err
}
usedQosList, err := ovsFind("port", "qos", "qos!=[]")
if err != nil {
return err
}
for _, usedQosId := range usedQosList {
if util.ContainsString(qosList, usedQosId) {
qosList = util.RemoveString(qosList, usedQosId)
}
}
qosList = append(qosList, qosListByPod...)
qosList = util.UniqString(qosList)
for _, qos := range qosList {
if err := ovsDestroy("qos", qos); err != nil {
return err
......@@ -156,6 +191,11 @@ func SetInterfaceBandwidth(podName, podNamespace, iface, ingress, egress, podPri
return err
}
qosIfaceContainerIdMap, err := ListContainerIds("interface")
if err != nil {
return err
}
for _, ifName := range interfaceList {
// ingress_policing_rate is in Kbps
err := ovsSet("interface", ifName, fmt.Sprintf("ingress_policing_rate=%d", ingressKPS), fmt.Sprintf("ingress_policing_burst=%d", ingressKPS*8/10))
......@@ -172,7 +212,7 @@ func SetInterfaceBandwidth(podName, podNamespace, iface, ingress, egress, podPri
return err
}
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid, qosIfaceUidMap); err != nil {
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid, qosIfaceUidMap, qosIfaceContainerIdMap); err != nil {
return err
}
} else {
......@@ -196,7 +236,7 @@ func SetInterfaceBandwidth(podName, podNamespace, iface, ingress, egress, podPri
}
}
if err = SetHtbQosPriority(podName, podNamespace, iface, ifName, podPriority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
if err = SetHtbQosPriority(podName, podNamespace, iface, ifName, podPriority, qosIfaceUidMap, queueIfaceUidMap, qosIfaceContainerIdMap); err != nil {
return err
}
}
......@@ -413,14 +453,14 @@ func SetHtbQosQueueRecord(podName, podNamespace, iface, priority string, maxRate
return queueIfaceUidMap[iface], nil
}
func SetHtbQosPriority(podName, podNamespace, iface, ifName, priority string, qosIfaceUidMap, queueIfaceUidMap map[string]string) error {
func SetHtbQosPriority(podName, podNamespace, iface, ifName, priority string, qosIfaceUidMap, queueIfaceUidMap, qosIfaceContainerIdMap map[string]string) error {
if priority != "" {
queueUid, err := SetHtbQosQueueRecord(podName, podNamespace, iface, priority, 0, queueIfaceUidMap)
if err != nil {
return err
}
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid, qosIfaceUidMap); err != nil {
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid, qosIfaceUidMap, qosIfaceContainerIdMap); err != nil {
return err
}
} else {
......@@ -452,7 +492,7 @@ func SetHtbQosPriority(podName, podNamespace, iface, ifName, priority string, qo
}
// SetQosQueueBinding set qos related to queue record.
func SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid string, qosIfaceUidMap map[string]string) error {
func SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid string, qosIfaceUidMap, qosIfaceContainerIdMap map[string]string) error {
var qosCommandValues []string
qosCommandValues = append(qosCommandValues, fmt.Sprintf("queues:0=%s", queueUid))
......@@ -461,6 +501,10 @@ func SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid string, q
if podNamespace != "" && podName != "" {
qosCommandValues = append(qosCommandValues, fmt.Sprintf("external-ids:pod=%s/%s", podNamespace, podName))
}
if containerId, ok := qosIfaceContainerIdMap[iface]; ok {
qosCommandValues = append(qosCommandValues, fmt.Sprintf(`external-ids:containerid="%s"`, containerId))
}
qos, err := ovsCreate("qos", qosCommandValues...)
if err != nil {
return err
......@@ -513,14 +557,14 @@ func ClearPortQosBinding(ifaceID string) error {
}
// SetPodQosPriority set qos to this pod port.
func SetPodQosPriority(podName, podNamespace, ifaceID, priority string, qosIfaceUidMap, queueIfaceUidMap map[string]string) error {
func SetPodQosPriority(podName, podNamespace, ifaceID, priority string, qosIfaceUidMap, queueIfaceUidMap, qosIfaceContainerIdMap map[string]string) error {
interfaceList, err := ovsFind("interface", "name", fmt.Sprintf("external-ids:iface-id=%s", ifaceID))
if err != nil {
return err
}
for _, ifName := range interfaceList {
if err = SetHtbQosPriority(podName, podNamespace, ifaceID, ifName, priority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
if err = SetHtbQosPriority(podName, podNamespace, ifaceID, ifName, priority, qosIfaceUidMap, queueIfaceUidMap, qosIfaceContainerIdMap); err != nil {
return err
}
}
......@@ -528,7 +572,7 @@ func SetPodQosPriority(podName, podNamespace, ifaceID, priority string, qosIface
}
// The latency value expressed in us.
func SetNetemQos(podName, podNamespace, iface, latency, limit, loss string) error {
func SetNetemQos(podName, podNamespace, iface, containerId, latency, limit, loss string) error {
latencyMs, _ := strconv.Atoi(latency)
latencyUs := latencyMs * 1000
limitPkts, _ := strconv.Atoi(limit)
......@@ -540,7 +584,7 @@ func SetNetemQos(podName, podNamespace, iface, latency, limit, loss string) erro
}
for _, ifName := range interfaceList {
qosList, err := ovsFind("qos", "_uuid", fmt.Sprintf("external-ids:iface-id=%s", iface))
qosList, err := GetQosList(podName, podNamespace, iface, containerId)
if err != nil {
return err
}
......@@ -561,6 +605,9 @@ func SetNetemQos(podName, podNamespace, iface, latency, limit, loss string) erro
if podNamespace != "" && podName != "" {
qosCommandValues = append(qosCommandValues, fmt.Sprintf("external-ids:pod=%s/%s", podNamespace, podName))
}
if containerId != "" {
qosCommandValues = append(qosCommandValues, fmt.Sprintf(`external-ids:containerid="%s"`, containerId))
}
qos, err := ovsCreate("qos", qosCommandValues...)
if err != nil {
......@@ -584,6 +631,22 @@ func SetNetemQos(podName, podNamespace, iface, latency, limit, loss string) erro
if err := ovsSet("qos", qos, qosCommandValues...); err != nil {
return err
}
if latencyMs == 0 {
if err := ovsRemove("qos", qos, "other_config", "latency"); err != nil {
return err
}
}
if limitPkts == 0 {
if err := ovsRemove("qos", qos, "other_config", "limit"); err != nil {
return err
}
}
if lossPercent == 0 {
if err := ovsRemove("qos", qos, "other_config", "loss"); err != nil {
return err
}
}
}
}
} else {
......@@ -599,7 +662,7 @@ func SetNetemQos(podName, podNamespace, iface, latency, limit, loss string) erro
}
// reuse this function to delete qos record
if err = ClearPodBandwidth(podName, podNamespace, iface); err != nil {
if err = ClearPodBandwidth(podName, podNamespace, iface, containerId); err != nil {
klog.Errorf("failed to delete netemqos record for pod %s/%s: %v", podNamespace, podName, err)
return err
}
......@@ -639,3 +702,42 @@ func ListExternalIds(table string) (map[string]string, error) {
}
return result, nil
}
func ListContainerIds(table string) (map[string]string, error) {
var getIfaceId, getContainerId bool
var iface, containerId string
args := []string{"--data=bare", "--format=csv", "--no-heading", "--columns=external_ids", "find", table, "external_ids:iface-id!=[]"}
output, err := Exec(args...)
if err != nil {
klog.Errorf("failed to list %s, %v", table, err)
return nil, err
}
lines := strings.Split(output, "\n")
result := make(map[string]string, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
getIfaceId = false
getContainerId = false
externalIds := strings.Fields(strings.TrimSpace(l))
for _, externalId := range externalIds {
if strings.Contains(externalId, "iface-id=") {
getIfaceId = true
iface = strings.TrimPrefix(strings.TrimSpace(externalId), "iface-id=")
}
if strings.Contains(externalId, "containerid=") {
getContainerId = true
containerId = strings.TrimPrefix(strings.TrimSpace(externalId), "containerid=")
}
if getIfaceId && getContainerId {
result[iface] = containerId
break
}
}
}
return result, nil
}
......@@ -265,18 +265,10 @@ func SplitIpsByProtocol(excludeIps []string) ([]string, []string) {
var v4ExcludeIps, v6ExcludeIps []string
for _, ex := range excludeIps {
ips := strings.Split(ex, "..")
if len(ips) == 1 {
if net.ParseIP(ips[0]).To4() != nil {
v4ExcludeIps = append(v4ExcludeIps, ips[0])
} else {
v6ExcludeIps = append(v6ExcludeIps, ips[0])
}
if net.ParseIP(ips[0]).To4() != nil {
v4ExcludeIps = append(v4ExcludeIps, ex)
} else {
if net.ParseIP(ips[0]).To4() != nil {
v4ExcludeIps = append(v4ExcludeIps, ex)
} else {
v6ExcludeIps = append(v6ExcludeIps, ex)
}
v6ExcludeIps = append(v6ExcludeIps, ex)
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment