Unverified Commit bbb054d0 authored by hzma's avatar hzma Committed by GitHub
Browse files

Merge pull request #1110 from kubeovn/qos-query

reduce qos query with ovs-vsctl cmd
parents 5a29f34e 61817bf4
Showing with 116 additions and 87 deletions
+116 -87
......@@ -1142,6 +1142,15 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
return err
}
qosIfaceUidMap, err := ovs.ListExternalIds("qos")
if err != nil {
return err
}
queueIfaceUidMap, err := ovs.ListExternalIds("queue")
if err != nil {
return err
}
for _, pod := range pods {
if pod.Spec.HostNetwork ||
pod.DeletionTimestamp != nil ||
......@@ -1156,7 +1165,7 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
if pod.Annotations[util.PriorityAnnotation] != "" {
priority = pod.Annotations[util.PriorityAnnotation]
}
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority); err != nil {
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
klog.Errorf("failed to set htbqos priority for pod %s/%s, iface %v: %v", pod.Namespace, pod.Name, ifaceID, err)
return err
}
......@@ -1180,7 +1189,7 @@ func (c *Controller) setSubnetQosPriority(subnet *kubeovnv1.Subnet) error {
priority = pod.Annotations[fmt.Sprintf(util.PriorityAnnotationTemplate, provider)]
}
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority); err != nil {
if err = ovs.SetPodQosPriority(pod.Name, pod.Namespace, ifaceID, priority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
klog.Errorf("failed to set htbqos priority for pod %s/%s, iface %v: %v", pod.Namespace, pod.Name, ifaceID, err)
return err
}
......
......@@ -145,6 +145,16 @@ func SetInterfaceBandwidth(podName, podNamespace, iface, ingress, egress, podPri
return err
}
qosIfaceUidMap, err := ListExternalIds("qos")
if err != nil {
return err
}
queueIfaceUidMap, err := ListExternalIds("queue")
if err != nil {
return err
}
for _, ifName := range interfaceList {
// ingress_policing_rate is in Kbps
err := ovsSet("interface", ifName, fmt.Sprintf("ingress_policing_rate=%d", ingressKPS), fmt.Sprintf("ingress_policing_burst=%d", ingressKPS*8/10))
......@@ -155,29 +165,25 @@ func SetInterfaceBandwidth(podName, podNamespace, iface, ingress, egress, podPri
egressMPS, _ := strconv.Atoi(egress)
egressBPS := egressMPS * 1000 * 1000
qosList, err := ovsFind("qos", "_uuid", fmt.Sprintf("external-ids:iface-id=%s", iface))
if err != nil {
return err
}
if egressBPS > 0 {
queueList, err := SetHtbQosQueueRecord(podName, podNamespace, iface, podPriority, egressBPS)
queueUid, err := SetHtbQosQueueRecord(podName, podNamespace, iface, podPriority, egressBPS, queueIfaceUidMap)
if err != nil {
return err
}
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, qosList, queueList); err != nil {
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid, qosIfaceUidMap); err != nil {
return err
}
} else {
for _, qos := range qosList {
qosType, err := ovsGet("qos", qos, "type", "")
if qosUid, ok := qosIfaceUidMap[iface]; ok {
qosType, err := ovsGet("qos", qosUid, "type", "")
if err != nil {
return err
}
if qosType != util.HtbQos {
continue
}
queueId, err := ovsGet("qos", qos, "queues", "0")
queueId, err := ovsGet("qos", qosUid, "queues", "0")
if err != nil {
return err
}
......@@ -189,7 +195,7 @@ func SetInterfaceBandwidth(podName, podNamespace, iface, ingress, egress, podPri
}
}
if err = SetHtbQosPriority(podName, podNamespace, iface, ifName, podPriority); err != nil {
if err = SetHtbQosPriority(podName, podNamespace, iface, ifName, podPriority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
return err
}
}
......@@ -376,13 +382,9 @@ func IsHtbQos(iface string) (bool, error) {
return false, nil
}
func SetHtbQosQueueRecord(podName, podNamespace, iface, priority string, maxRateBPS int) ([]string, error) {
queueList, err := ovsFind("queue", "_uuid", fmt.Sprintf("external-ids:iface-id=%s", iface))
if err != nil {
return queueList, err
}
func SetHtbQosQueueRecord(podName, podNamespace, iface, priority string, maxRateBPS int, queueIfaceUidMap map[string]string) (string, error) {
var queueCommandValues []string
var err error
if maxRateBPS > 0 {
queueCommandValues = append(queueCommandValues, fmt.Sprintf("other_config:max-rate=%d", maxRateBPS))
}
......@@ -390,71 +392,58 @@ func SetHtbQosQueueRecord(podName, podNamespace, iface, priority string, maxRate
queueCommandValues = append(queueCommandValues, fmt.Sprintf("other_config:priority=%s", priority))
}
if len(queueList) == 0 {
if queueUid, ok := queueIfaceUidMap[iface]; ok {
if err := ovsSet("queue", queueUid, queueCommandValues...); err != nil {
return queueUid, err
}
} else {
queueCommandValues = append(queueCommandValues, fmt.Sprintf("external-ids:iface-id=%s", iface))
if podNamespace != "" && podName != "" {
queueCommandValues = append(queueCommandValues, fmt.Sprintf("external-ids:pod=%s/%s", podNamespace, podName))
}
if _, err := ovsCreate("queue", queueCommandValues...); err != nil {
return queueList, err
}
} else {
for _, queueId := range queueList {
if err := ovsSet("queue", queueId, queueCommandValues...); err != nil {
return queueList, err
}
}
}
for {
queueList, err = ovsFind("queue", "_uuid", fmt.Sprintf("external-ids:iface-id=%s", iface))
if err != nil {
return queueList, err
}
if len(queueList) == 0 {
klog.Infof("queue record does not exist, please wait creating...")
time.Sleep(3 * time.Second)
continue
var queueId string
if queueId, err = ovsCreate("queue", queueCommandValues...); err != nil {
return queueUid, err
}
break
queueIfaceUidMap[iface] = queueId
}
return queueList, nil
return queueIfaceUidMap[iface], nil
}
func SetHtbQosPriority(podName, podNamespace, iface, ifName, priority string) error {
qosList, err := ovsFind("qos", "_uuid", fmt.Sprintf("external-ids:iface-id=%s", iface))
if err != nil {
return err
}
func SetHtbQosPriority(podName, podNamespace, iface, ifName, priority string, qosIfaceUidMap, queueIfaceUidMap map[string]string) error {
if priority != "" {
queueList, err := SetHtbQosQueueRecord(podName, podNamespace, iface, priority, 0)
queueUid, err := SetHtbQosQueueRecord(podName, podNamespace, iface, priority, 0, queueIfaceUidMap)
if err != nil {
return err
}
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, qosList, queueList); err != nil {
if err = SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid, qosIfaceUidMap); err != nil {
return err
}
} else {
for _, qos := range qosList {
qosType, err := ovsGet("qos", qos, "type", "")
if err != nil {
return err
}
if qosType != util.HtbQos {
continue
}
queueId, err := ovsGet("qos", qos, "queues", "0")
if err != nil {
return err
}
var qosUid string
var ok bool
if qosUid, ok = qosIfaceUidMap[iface]; !ok {
return nil
}
// It's difficult to check if qos and queue should be destroyed here since can not get subnet info here. So leave destroy operation in subnet loop check
if _, err := Exec("remove", "queue", queueId, "other_config", "priority"); err != nil {
return fmt.Errorf("failed to remove priority for queue in pod %v/%v, %v", podNamespace, podName, err)
}
qosType, err := ovsGet("qos", qosUid, "type", "")
if err != nil {
return err
}
if qosType != util.HtbQos {
return nil
}
queueId, err := ovsGet("qos", qosUid, "queues", "0")
if err != nil {
return err
}
// It's difficult to check if qos and queue should be destroyed here since can not get subnet info here. So leave destroy operation in subnet loop check
if _, err := Exec("remove", "queue", queueId, "other_config", "priority"); err != nil {
return fmt.Errorf("failed to remove priority for queue in pod %v/%v, %v", podNamespace, podName, err)
}
}
......@@ -462,10 +451,11 @@ func SetHtbQosPriority(podName, podNamespace, iface, ifName, priority string) er
}
// SetQosQueueBinding set qos related to queue record.
func SetQosQueueBinding(podName, podNamespace, ifName, iface string, qosList, queueList []string) error {
func SetQosQueueBinding(podName, podNamespace, ifName, iface, queueUid string, qosIfaceUidMap map[string]string) error {
var qosCommandValues []string
qosCommandValues = append(qosCommandValues, fmt.Sprintf("queues:0=%s", queueList[0]))
if len(qosList) == 0 {
qosCommandValues = append(qosCommandValues, fmt.Sprintf("queues:0=%s", queueUid))
if qosUid, ok := qosIfaceUidMap[iface]; !ok {
qosCommandValues = append(qosCommandValues, "type=linux-htb", fmt.Sprintf(`external-ids:iface-id="%s"`, iface))
if podNamespace != "" && podName != "" {
qosCommandValues = append(qosCommandValues, fmt.Sprintf("external-ids:pod=%s/%s", podNamespace, podName))
......@@ -478,30 +468,29 @@ func SetQosQueueBinding(podName, podNamespace, ifName, iface string, qosList, qu
if err != nil {
return err
}
qosIfaceUidMap[iface] = qos
} else {
for _, qos := range qosList {
qosType, err := ovsGet("qos", qos, "type", "")
qosType, err := ovsGet("qos", qosUid, "type", "")
if err != nil {
return err
}
if qosType != util.HtbQos {
klog.Errorf("netem qos exists for pod %s/%s, conflict with current qos, will be changed to htb qos", podNamespace, podName)
qosCommandValues = append(qosCommandValues, "type=linux-htb")
}
if qosType == util.HtbQos {
queueId, err := ovsGet("qos", qosUid, "queues", "0")
if err != nil {
return err
}
if qosType != util.HtbQos {
klog.Errorf("netem qos exists for pod %s/%s, conflict with current qos, will be changed to htb qos", podNamespace, podName)
qosCommandValues = append(qosCommandValues, "type=linux-htb")
}
if qosType == util.HtbQos {
queueId, err := ovsGet("qos", qos, "queues", "0")
if err != nil {
return err
}
if queueId == queueList[0] {
return nil
}
if queueId == queueUid {
return nil
}
}
if err := ovsSet("qos", qos, qosCommandValues...); err != nil {
return err
}
if err := ovsSet("qos", qosUid, qosCommandValues...); err != nil {
return err
}
}
return nil
......@@ -523,14 +512,14 @@ func ClearPortQosBinding(ifaceID string) error {
}
// SetPodQosPriority set qos to this pod port.
func SetPodQosPriority(podName, podNamespace, ifaceID, priority string) error {
func SetPodQosPriority(podName, podNamespace, ifaceID, priority string, qosIfaceUidMap, queueIfaceUidMap map[string]string) error {
interfaceList, err := ovsFind("interface", "name", fmt.Sprintf("external-ids:iface-id=%s", ifaceID))
if err != nil {
return err
}
for _, ifName := range interfaceList {
if err = SetHtbQosPriority(podName, podNamespace, ifaceID, ifName, priority); err != nil {
if err = SetHtbQosPriority(podName, podNamespace, ifaceID, ifName, priority, qosIfaceUidMap, queueIfaceUidMap); err != nil {
return err
}
}
......@@ -618,3 +607,34 @@ func SetNetemQos(podName, podNamespace, iface, latency, limit, loss string) erro
}
return nil
}
func ListExternalIds(table string) (map[string]string, error) {
args := []string{"--data=bare", "--format=csv", "--no-heading", "--columns=_uuid,external_ids", "find", table, "external_ids:iface-id!=[]"}
output, err := Exec(args...)
if err != nil {
klog.Errorf("failed to list %s, %v", table, err)
return nil, err
}
lines := strings.Split(output, "\n")
result := make(map[string]string, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
parts := strings.Split(strings.TrimSpace(l), ",")
if len(parts) != 2 {
continue
}
uuid := strings.TrimSpace(parts[0])
externalIds := strings.Fields(parts[1])
for _, externalId := range externalIds {
if !strings.Contains(externalId, "iface-id=") {
continue
}
iface := strings.TrimPrefix(strings.TrimSpace(externalId), "iface-id=")
result[iface] = uuid
break
}
}
return result, nil
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment