Unverified Commit 330680bf authored by Frame's avatar Frame Committed by GitHub
Browse files

koord-scheduler: improve reservation compatibility (#550)

Signed-off-by: default avatarsaintube <saintube@foxmail.com>
parent 063ccd71
Showing with 1120 additions and 381 deletions
+1120 -381
......@@ -5,6 +5,7 @@ go 1.17
require (
github.com/NVIDIA/go-nvml v0.11.6-0.0.20220715143214-a79f46f2a6f7
github.com/docker/docker v20.10.17+incompatible
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/fsnotify/fsnotify v1.5.4
github.com/gin-gonic/gin v1.8.1
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
......@@ -66,7 +67,6 @@ require (
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/evanphx/json-patch v4.11.0+incompatible // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v0.4.0 // indirect
......
......@@ -34,7 +34,6 @@ import (
koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
schedulingv1alpha1lister "github.com/koordinator-sh/koordinator/pkg/client/listers/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/reservation"
"github.com/koordinator-sh/koordinator/pkg/util"
)
......@@ -48,14 +47,14 @@ func AddReservationErrorHandler(sched *scheduler.Scheduler, internalHandler Sche
sched.Error = func(podInfo *framework.QueuedPodInfo, schedulingErr error) {
pod := podInfo.Pod
// if the pod is not a reserve pod, use the default error handler
if !reservation.IsReservePod(pod) {
if !util.IsReservePod(pod) {
defaultErrorFn(podInfo, schedulingErr)
return
}
reservationErrorFn(podInfo, schedulingErr)
rName := reservation.GetReservationNameFromReservePod(pod)
rName := util.GetReservationNameFromReservePod(pod)
r, err := reservationLister.Get(rName)
if err != nil {
return
......@@ -84,7 +83,7 @@ func makeReservationErrorFunc(internalHandler SchedulerInternalHandler, reservat
}
// Check if the corresponding reservation exists in informer cache.
rName := reservation.GetReservationNameFromReservePod(pod)
rName := util.GetReservationNameFromReservePod(pod)
cachedR, err := reservationLister.Get(rName)
if err != nil {
klog.InfoS("Reservation doesn't exist in informer cache",
......@@ -93,12 +92,12 @@ func makeReservationErrorFunc(internalHandler SchedulerInternalHandler, reservat
}
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
if nodeName := reservation.GetReservationNodeName(cachedR); len(nodeName) != 0 {
if nodeName := util.GetReservationNodeName(cachedR); len(nodeName) != 0 {
klog.InfoS("Reservation has been assigned to node. Abort adding it back to queue.",
"pod", klog.KObj(pod), "reservation", rName, "node", nodeName)
return
}
podInfo.PodInfo = framework.NewPodInfo(reservation.NewReservePod(cachedR))
podInfo.PodInfo = framework.NewPodInfo(util.NewReservePod(cachedR))
if err = internalHandler.GetQueue().AddUnschedulableIfNotPresent(podInfo, internalHandler.GetQueue().SchedulingCycle()); err != nil {
klog.ErrorS(err, "Error occurred")
}
......@@ -117,7 +116,7 @@ func updateReservationStatus(client koordclientset.Interface, reservationLister
}
curR := r.DeepCopy()
reservation.SetReservationUnschedulable(curR, schedulingErr.Error())
util.SetReservationUnschedulable(curR, schedulingErr.Error())
_, err = client.SchedulingV1alpha1().Reservations().UpdateStatus(context.TODO(), curR, metav1.UpdateOptions{})
if err != nil {
klog.V(4).ErrorS(err, "failed to UpdateStatus for unschedulable", "reservation", klog.KObj(curR))
......@@ -148,7 +147,7 @@ func AddScheduleEventHandler(sched *scheduler.Scheduler, internalHandler Schedul
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *schedulingv1alpha1.Reservation:
return reservation.IsReservationAvailable(t)
return util.IsReservationAvailable(t)
case cache.DeletedFinalStateUnknown:
if _, ok := t.Obj.(*schedulingv1alpha1.Reservation); ok {
// DeletedFinalStateUnknown object can be stale, so just try to cleanup without check.
......@@ -178,8 +177,8 @@ func AddScheduleEventHandler(sched *scheduler.Scheduler, internalHandler Schedul
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *schedulingv1alpha1.Reservation:
return isResponsibleForReservation(sched.Profiles, t) && !reservation.IsReservationAvailable(t) &&
!reservation.IsReservationFailed(t) && !reservation.IsReservationSucceeded(t)
return isResponsibleForReservation(sched.Profiles, t) && !util.IsReservationAvailable(t) &&
!util.IsReservationFailed(t) && !util.IsReservationSucceeded(t)
case cache.DeletedFinalStateUnknown:
if r, ok := t.Obj.(*schedulingv1alpha1.Reservation); ok {
// DeletedFinalStateUnknown object can be stale, so just try to cleanup without check.
......@@ -210,7 +209,7 @@ func AddScheduleEventHandler(sched *scheduler.Scheduler, internalHandler Schedul
switch t := obj.(type) {
case *schedulingv1alpha1.Reservation:
// else should be processed by other handlers
return reservation.IsReservationFailed(t) || reservation.IsReservationSucceeded(t)
return util.IsReservationFailed(t) || util.IsReservationSucceeded(t)
default:
klog.Errorf("unable to handle object in %T: %T", obj, sched)
return false
......@@ -237,7 +236,7 @@ func addReservationToCache(sched *scheduler.Scheduler, internalHandler Scheduler
return
}
// only add valid reservation into cache
err := reservation.ValidateReservation(r)
err := util.ValidateReservation(r)
if err != nil {
klog.Errorf("addReservationToCache failed, invalid reservation, err: %v", err)
return
......@@ -245,7 +244,7 @@ func addReservationToCache(sched *scheduler.Scheduler, internalHandler Scheduler
klog.V(3).InfoS("Add event for scheduled reservation", "reservation", klog.KObj(r))
// update pod cache and trigger pod assigned event for scheduling queue
reservePod := reservation.NewReservePod(r)
reservePod := util.NewReservePod(r)
if err = internalHandler.GetCache().AddPod(reservePod); err != nil {
klog.Errorf("scheduler cache AddPod failed for reservation, reservation %s, err: %v", klog.KObj(reservePod), err)
}
......@@ -269,20 +268,20 @@ func updateReservationInCache(sched *scheduler.Scheduler, internalHandler Schedu
}
// nodeName update of the same reservations is not allowed and may corrupt the cache
if reservation.GetReservationNodeName(oldR) != reservation.GetReservationNodeName(newR) {
if util.GetReservationNodeName(oldR) != util.GetReservationNodeName(newR) {
klog.Errorf("updateReservationInCache failed, update on existing nodeName is forbidden, old %s, new %s",
reservation.GetReservationNodeName(oldR), reservation.GetReservationNodeName(newR))
util.GetReservationNodeName(oldR), util.GetReservationNodeName(newR))
return
}
// update pod cache and trigger pod assigned event for scheduling queue
err := reservation.ValidateReservation(newR)
err := util.ValidateReservation(newR)
if err != nil {
klog.Errorf("updateReservationInCache failed, invalid reservation, err: %v", err)
return
}
oldReservePod := reservation.NewReservePod(oldR)
newReservePod := reservation.NewReservePod(newR)
oldReservePod := util.NewReservePod(oldR)
newReservePod := util.NewReservePod(newR)
if err := internalHandler.GetCache().UpdatePod(oldReservePod, newReservePod); err != nil {
klog.Errorf("scheduler cache UpdatePod failed for reservation, old %s, new %s, err: %v", klog.KObj(oldR), klog.KObj(newR), err)
}
......@@ -308,12 +307,12 @@ func deleteReservationFromCache(sched *scheduler.Scheduler, internalHandler Sche
klog.V(3).InfoS("Delete event for scheduled reservation", "reservation", klog.KObj(r))
// delete pod cache and trigger pod deleted event for scheduling queue
err := reservation.ValidateReservation(r)
err := util.ValidateReservation(r)
if err != nil {
klog.Errorf("deleteReservationFromCache failed, invalid reservation, err: %v", err)
return
}
reservePod := reservation.NewReservePod(r)
reservePod := util.NewReservePod(r)
if err := internalHandler.GetCache().RemovePod(reservePod); err != nil {
klog.Errorf("scheduler cache RemovePod failed for reservation, reservation %s, err: %v", klog.KObj(r), err)
}
......@@ -328,7 +327,7 @@ func addReservationToSchedulingQueue(sched *scheduler.Scheduler, internalHandler
}
klog.V(3).InfoS("Add event for unscheduled reservation", "reservation", klog.KObj(r))
reservePod := reservation.NewReservePod(r)
reservePod := util.NewReservePod(r)
if err := internalHandler.GetQueue().Add(reservePod); err != nil {
klog.Errorf("failed to add reserve pod into scheduling queue, reservation %v, err: %v", klog.KObj(reservePod), err)
}
......@@ -347,7 +346,7 @@ func updateReservationInSchedulingQueue(sched *scheduler.Scheduler, internalHand
return
}
newReservePod := reservation.NewReservePod(newR)
newReservePod := util.NewReservePod(newR)
isAssumed, err := internalHandler.GetCache().IsAssumedPod(newReservePod)
if err != nil {
klog.Errorf("failed to check whether reserve pod %s is assumed, err: %v", klog.KObj(newReservePod), err)
......@@ -356,7 +355,7 @@ func updateReservationInSchedulingQueue(sched *scheduler.Scheduler, internalHand
return
}
oldReservePod := reservation.NewReservePod(oldR)
oldReservePod := util.NewReservePod(oldR)
if err = internalHandler.GetQueue().Update(oldReservePod, newReservePod); err != nil {
klog.Errorf("failed to update reserve pod in scheduling queue, old %s, new %s, err: %v", klog.KObj(oldReservePod), klog.KObj(newReservePod), err)
}
......@@ -380,7 +379,7 @@ func deleteReservationFromSchedulingQueue(sched *scheduler.Scheduler, internalHa
}
klog.V(3).InfoS("Delete event for unscheduled reservation", "reservation", klog.KObj(r))
reservePod := reservation.NewReservePod(r)
reservePod := util.NewReservePod(r)
if err := internalHandler.GetQueue().Delete(reservePod); err != nil {
klog.Errorf("failed to delete reserve pod in scheduling queue, reservation %s, err: %v", klog.KObj(r), err)
}
......@@ -396,7 +395,7 @@ func handleInactiveReservation(sched *scheduler.Scheduler, internalHandler Sched
}
// if the reservation has been scheduled, remove the reserve pod from the pod cache
reservePod := reservation.NewReservePod(r)
reservePod := util.NewReservePod(r)
// in case the pod has expired before scheduling cache initialized, or the pod just finished scheduling cycle and
// deleted, both we need to check if pod is cached
......@@ -410,7 +409,7 @@ func handleInactiveReservation(sched *scheduler.Scheduler, internalHandler Sched
internalHandler.MoveAllToActiveOrBackoffQueue(assignedPodDelete)
}
if len(reservation.GetReservationNodeName(r)) <= 0 {
if len(util.GetReservationNodeName(r)) <= 0 {
// pod is unscheduled, try dequeue the reserve pod from the scheduling queue
err = internalHandler.GetQueue().Delete(reservePod)
if err != nil {
......@@ -421,5 +420,5 @@ func handleInactiveReservation(sched *scheduler.Scheduler, internalHandler Sched
}
func isResponsibleForReservation(profiles profile.Map, r *schedulingv1alpha1.Reservation) bool {
return profiles.HandlesSchedulerName(reservation.GetReservationSchedulerName(r))
return profiles.HandlesSchedulerName(util.GetReservationSchedulerName(r))
}
......@@ -37,7 +37,7 @@ import (
koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake"
koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/reservation"
"github.com/koordinator-sh/koordinator/pkg/util"
)
type fakeExtendHandle struct {
......@@ -76,7 +76,7 @@ func TestAddReservationErrorHandler(t *testing.T) {
NodeName: testNodeName,
},
}
testPod := reservation.NewReservePod(testR)
testPod := util.NewReservePod(testR)
t.Run("test not panic", func(t *testing.T) {
sched := &scheduler.Scheduler{}
......
......@@ -22,11 +22,7 @@ import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
resourceapi "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/scheduler/framework"
......@@ -373,26 +369,14 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,
return framework.NewStatus(framework.Error, err.Error())
}
patchBytes, err := util.GeneratePodPatch(podOriginal, pod)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if string(patchBytes) == "{}" {
return nil
}
err = retry.OnError(
retry.DefaultRetry,
errors.IsTooManyRequests,
func() error {
_, err := p.handle.ClientSet().CoreV1().Pods(pod.Namespace).
Patch(ctx, pod.Name, apimachinerytypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
klog.Error("Failed to patch Pod %s/%s, patch: %v, err: %v", pod.Namespace, pod.Name, string(patchBytes), err)
}
return err
})
// patch pod or reservation (if the pod is a reserve pod) with new annotations
err = util.RetryOnConflictOrTooManyRequests(func() error {
_, err1 := util.NewPatch().WithHandle(p.handle).AddAnnotations(pod.Annotations).PatchPodOrReservation(podOriginal)
return err1
})
if err != nil {
klog.V(3).ErrorS(err, "Failed to preBind Pod with CPUSet",
"pod", klog.KObj(pod), "CPUSet", state.allocatedCPUs, "node", nodeName)
return framework.NewStatus(framework.Error, err.Error())
}
......
......@@ -51,10 +51,10 @@ func (p *Plugin) gcReservations() {
if err = p.expireReservation(r); err != nil {
klog.Warningf("failed to update reservation %s as expired, err: %s", klog.KObj(r), err)
}
} else if IsReservationActive(r) {
} else if util.IsReservationActive(r) {
// sync active reservation for correct owner statuses
p.syncActiveReservation(r)
} else if IsReservationExpired(r) || IsReservationSucceeded(r) {
} else if util.IsReservationExpired(r) || util.IsReservationSucceeded(r) {
p.reservationCache.AddToInactive(r)
}
}
......@@ -183,7 +183,7 @@ func (p *Plugin) syncPodDeleted(pod *corev1.Pod) {
}
// check if the reservation is still scheduled; succeeded ones are ignored to update
if !IsReservationAvailable(r) {
if !util.IsReservationAvailable(r) {
klog.V(4).InfoS("skip sync for reservation no longer available or scheduled",
"reservation", klog.KObj(r))
return nil
......
......@@ -22,7 +22,6 @@ import (
"time"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......
......@@ -63,7 +63,7 @@ func (h *Hook) PreFilterHook(handle frameworkext.ExtendedHandle, cycleState *fra
}
// skip if the pod is a reserve pod
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return nil, false
}
......@@ -97,7 +97,7 @@ func (h *Hook) FilterHook(handle frameworkext.ExtendedHandle, cycleState *framew
}
// do not hook if not reserve state (where we should check if pod match any reservation on node)
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return nil, nil, false
}
// abort the hook if node is not found
......@@ -160,7 +160,7 @@ func (h *Hook) prepareMatchReservationState(handle frameworkext.ExtendedHandle,
continue
}
// only count available reservations, ignore succeeded ones
if !IsReservationAvailable(r) {
if !util.IsReservationAvailable(r) {
continue
}
if matchReservation(pod, newReservationInfo(r)) {
......@@ -200,7 +200,7 @@ func preparePreFilterNodeInfo(nodeInfo *framework.NodeInfo, pod *corev1.Pod, mat
if existingPod == nil {
continue
}
if matchedCache.Get(GetReservePodKey(existingPod)) != nil {
if matchedCache.Get(util.GetReservePodKey(existingPod)) != nil {
// clean affinity terms for matched reservations
newPodInfo := podInfo.DeepCopy()
newPodInfo.RequiredAntiAffinityTerms = nil
......
......@@ -27,6 +27,7 @@ import (
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
"github.com/koordinator-sh/koordinator/pkg/util"
)
func TestPreFilterHook(t *testing.T) {
......@@ -77,7 +78,7 @@ func TestPreFilterHook(t *testing.T) {
},
}
testHandle1 := &fakeExtendedHandle{
sharedLister: newFakeSharedLister([]*corev1.Pod{NewReservePod(rScheduled)}, []*corev1.Node{testNode}, false),
sharedLister: newFakeSharedLister([]*corev1.Pod{util.NewReservePod(rScheduled)}, []*corev1.Node{testNode}, false),
koordSharedInformerFactory: &fakeKoordinatorSharedInformerFactory{
informer: &fakeIndexedInformer{
rOnNode: map[string][]*schedulingv1alpha1.Reservation{
......
......@@ -27,7 +27,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
listercorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
......@@ -179,10 +178,10 @@ func (p *Plugin) Name() string { return Name }
// Also do validations in this phase.
func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod) *framework.Status {
// if the pod is a reserve pod
if IsReservePod(pod) {
if util.IsReservePod(pod) {
// validate reserve pod and reservation
klog.V(4).InfoS("Attempting to pre-filter reserve pod", "pod", klog.KObj(pod))
rName := GetReservationNameFromReservePod(pod)
rName := util.GetReservationNameFromReservePod(pod)
r, err := p.rLister.Get(rName)
if errors.IsNotFound(err) {
klog.V(3).InfoS("skip the pre-filter for reservation since the object is not found",
......@@ -191,7 +190,7 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState
} else if err != nil {
return framework.NewStatus(framework.Error, "cannot get reservation, err: "+err.Error())
}
err = ValidateReservation(r)
err = util.ValidateReservation(r)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
......@@ -214,10 +213,10 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
}
// the pod is a reserve pod
if IsReservePod(pod) {
if util.IsReservePod(pod) {
klog.V(4).InfoS("Attempting to filter reserve pod", "pod", klog.KObj(pod), "node", node.Name)
// if the reservation specifies a nodeName initially, check if the nodeName matches
rNodeName := GetReservePodNodeName(pod)
rNodeName := util.GetReservePodNodeName(pod)
if len(rNodeName) > 0 && rNodeName != nodeInfo.Node().Name {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeNotMatchReservation)
}
......@@ -230,7 +229,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
}
func (p *Plugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
if IsReservePod(pod) {
if util.IsReservePod(pod) {
// return err to stop default preemption
return nil, framework.NewStatus(framework.Error)
}
......@@ -239,7 +238,7 @@ func (p *Plugin) PostFilter(ctx context.Context, state *framework.CycleState, po
func (p *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodes []*corev1.Node) *framework.Status {
// if pod is a reserve pod, ignored
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return nil
}
......@@ -277,7 +276,7 @@ func (p *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState,
func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) (int64, *framework.Status) {
// if pod is a reserve pod, ignored
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return framework.MinNodeScore, nil
}
......@@ -320,7 +319,7 @@ func (p *Plugin) NormalizeScore(ctx context.Context, state *framework.CycleState
func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
// if the pod is a reserve pod
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return nil
}
......@@ -398,7 +397,7 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState,
func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) {
// if the pod is a reserve pod
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return
}
......@@ -445,7 +444,7 @@ func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState
} else if err1 != nil {
return err1
}
if !IsReservationAvailable(curR) {
if !util.IsReservationAvailable(curR) {
klog.V(5).InfoS("skip unreserve resources on a non-scheduled reservation",
"reservation", klog.KObj(curR), "phase", curR.Status.Phase)
return nil
......@@ -480,23 +479,19 @@ func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState
if !needPatch {
return
}
patchBytes, err := util.GeneratePodPatch(pod, newPod)
if err != nil {
klog.V(4).InfoS("failed to generate patch for pod unreserve",
"pod", klog.KObj(pod), "patch", patchBytes, "err", err)
return
}
_, err = p.handle.ClientSet().CoreV1().Pods(pod.Namespace).
Patch(ctx, pod.Name, apimachinerytypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
err = util.RetryOnConflictOrTooManyRequests(func() error {
_, err1 := util.NewPatch().WithClientset(p.handle.ClientSet()).AddAnnotations(newPod.Annotations).PatchPod(pod)
return err1
})
if err != nil {
klog.V(4).InfoS("failed to patch pod for unreserve",
"pod", klog.KObj(pod), "patch", patchBytes, "err", err)
"pod", klog.KObj(pod), "err", err)
}
}
func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
// if the pod is a reserve pod
if IsReservePod(pod) {
if util.IsReservePod(pod) {
return nil
}
......@@ -528,7 +523,7 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,
return err1
}
if !IsReservationAvailable(curR) {
if !util.IsReservationAvailable(curR) {
klog.Warningf("failed to allocate resources on a non-scheduled reservation %v, phase %v",
klog.KObj(curR), curR.Status.Phase)
return fmt.Errorf(ErrReasonReservationInactive)
......@@ -547,8 +542,8 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,
_, err1 = p.client.Reservations().UpdateStatus(context.TODO(), curR, metav1.UpdateOptions{})
if err1 != nil {
klog.Warningf("failed to update reservation status for pod allocation, reservation %v, pod %v, err: %v",
klog.KObj(curR), klog.KObj(pod), err1)
klog.V(4).ErrorS(err1, "failed to update reservation status for pod allocation",
"reservation", klog.KObj(curR), "pod", klog.KObj(pod))
}
return err1
......@@ -566,20 +561,13 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,
// NOTE: the pod annotation can be stale, we should use reservation status as the ground-truth
newPod := pod.DeepCopy()
apiext.SetReservationAllocated(newPod, target)
patchBytes, err := util.GeneratePodPatch(pod, newPod)
if err != nil {
klog.V(4).InfoS("failed to generate patch for pod PreBind",
"pod", klog.KObj(pod), "patch", patchBytes, "err", err)
return nil
}
err = util.RetryOnConflictOrTooManyRequests(func() error {
_, err1 := p.handle.ClientSet().CoreV1().Pods(pod.Namespace).
Patch(ctx, pod.Name, apimachinerytypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err1 := util.NewPatch().WithClientset(p.handle.ClientSet()).AddAnnotations(newPod.Annotations).PatchPod(pod)
return err1
})
if err != nil {
klog.V(4).InfoS("failed to patch pod for PreBind allocating reservation",
"pod", klog.KObj(pod), "patch", patchBytes, "err", err)
"pod", klog.KObj(pod), "err", err)
}
return nil
......@@ -588,11 +576,11 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,
// Bind fake binds reserve pod and mark corresponding reservation as Available.
// NOTE: This Bind plugin should get called before DefaultBinder; plugin order should be configured.
func (p *Plugin) Bind(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
if !IsReservePod(pod) {
if !util.IsReservePod(pod) {
return framework.NewStatus(framework.Skip, SkipReasonNotReservation)
}
rName := GetReservationNameFromReservePod(pod)
rName := util.GetReservationNameFromReservePod(pod)
klog.V(4).InfoS("Attempting to fake bind reserve pod to node",
"pod", klog.KObj(pod), "reservation", rName, "node", nodeName)
......@@ -610,7 +598,7 @@ func (p *Plugin) Bind(ctx context.Context, cycleState *framework.CycleState, pod
}
// check if the reservation has been inactive
if IsReservationFailed(reservation) || p.reservationCache.IsInactive(reservation) {
if util.IsReservationFailed(reservation) || p.reservationCache.IsInactive(reservation) {
return fmt.Errorf(ErrReasonReservationInactive)
}
......@@ -620,7 +608,7 @@ func (p *Plugin) Bind(ctx context.Context, cycleState *framework.CycleState, pod
setReservationAvailable(reservation, nodeName)
_, err = p.client.Reservations().UpdateStatus(context.TODO(), reservation, metav1.UpdateOptions{})
if err != nil {
klog.Warningf("failed to update reservation %v, err: %v", klog.KObj(reservation), err)
klog.V(4).ErrorS(err, "failed to update reservation", "reservation", klog.KObj(reservation))
}
return err
})
......@@ -639,9 +627,9 @@ func (p *Plugin) handleOnAdd(obj interface{}) {
klog.V(3).Infof("reservation cache add failed to parse, obj %T", obj)
return
}
if IsReservationActive(r) {
if util.IsReservationActive(r) {
p.reservationCache.AddToActive(r)
} else if IsReservationFailed(r) || IsReservationSucceeded(r) {
} else if util.IsReservationFailed(r) || util.IsReservationSucceeded(r) {
p.reservationCache.AddToInactive(r)
}
klog.V(5).InfoS("reservation cache add", "reservation", klog.KObj(r))
......@@ -668,9 +656,9 @@ func (p *Plugin) handleOnUpdate(oldObj, newObj interface{}) {
return
}
if IsReservationActive(newR) {
if util.IsReservationActive(newR) {
p.reservationCache.AddToActive(newR)
} else if IsReservationFailed(newR) || IsReservationSucceeded(newR) {
} else if util.IsReservationFailed(newR) || util.IsReservationSucceeded(newR) {
p.reservationCache.AddToInactive(newR)
}
klog.V(5).InfoS("reservation cache update", "reservation", klog.KObj(newR))
......
......@@ -53,6 +53,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions/scheduling/v1alpha1"
listerschedulingv1alpha1 "github.com/koordinator-sh/koordinator/pkg/client/listers/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
"github.com/koordinator-sh/koordinator/pkg/util"
)
var _ listerschedulingv1alpha1.ReservationLister = &fakeReservationLister{}
......@@ -521,7 +522,7 @@ func TestFilter(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
Annotations: map[string]string{
AnnotationReservationNode: testNode.Name,
util.AnnotationReservationNode: testNode.Name,
},
},
})
......@@ -529,7 +530,7 @@ func TestFilter(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-1",
Annotations: map[string]string{
AnnotationReservationNode: testNode.Name,
util.AnnotationReservationNode: testNode.Name,
},
},
})
......@@ -537,7 +538,7 @@ func TestFilter(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-1",
Annotations: map[string]string{
AnnotationReservationNode: "other-node",
util.AnnotationReservationNode: "other-node",
},
},
})
......@@ -625,7 +626,7 @@ func TestPostFilter(t *testing.T) {
Name: "reserve-pod-no-name",
},
})
delete(reservePodNoName.Annotations, AnnotationReservationName)
delete(reservePodNoName.Annotations, util.AnnotationReservationName)
r := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
......@@ -1945,7 +1946,7 @@ func testGetReservePod(pod *corev1.Pod) *corev1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[AnnotationReservePod] = "true"
pod.Annotations[AnnotationReservationName] = pod.Name
pod.Annotations[util.AnnotationReservePod] = "true"
pod.Annotations[util.AnnotationReservationName] = pod.Name
return pod
}
......@@ -29,6 +29,7 @@ import (
apiext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/util"
)
const (
......@@ -130,12 +131,12 @@ func newAvailableCache(rList ...*schedulingv1alpha1.Reservation) *AvailableCache
ownerToR: map[string]*reservationInfo{},
}
for _, r := range rList {
if !IsReservationAvailable(r) {
if !util.IsReservationAvailable(r) {
continue
}
rInfo := newReservationInfo(r)
a.reservations[GetReservationKey(r)] = rInfo
nodeName := GetReservationNodeName(r)
a.reservations[util.GetReservationKey(r)] = rInfo
nodeName := util.GetReservationNodeName(r)
a.nodeToR[nodeName] = append(a.nodeToR[nodeName], rInfo)
for _, owner := range r.Status.CurrentOwners { // one owner at most owns one reservation
a.ownerToR[getOwnerKey(&owner)] = rInfo
......@@ -156,8 +157,8 @@ func (a *AvailableCache) Add(r *schedulingv1alpha1.Reservation) {
a.lock.Lock()
defer a.lock.Unlock()
rInfo := newReservationInfo(r)
a.reservations[GetReservationKey(r)] = rInfo
nodeName := GetReservationNodeName(r)
a.reservations[util.GetReservationKey(r)] = rInfo
nodeName := util.GetReservationNodeName(r)
a.nodeToR[nodeName] = append(a.nodeToR[nodeName], rInfo)
for _, owner := range r.Status.CurrentOwners { // one owner at most owns one reservation
a.ownerToR[getOwnerKey(&owner)] = rInfo
......@@ -167,13 +168,13 @@ func (a *AvailableCache) Add(r *schedulingv1alpha1.Reservation) {
func (a *AvailableCache) Delete(r *schedulingv1alpha1.Reservation) {
a.lock.Lock()
defer a.lock.Unlock()
if r == nil || len(GetReservationNodeName(r)) <= 0 {
if r == nil || len(util.GetReservationNodeName(r)) <= 0 {
return
}
// cleanup r map
delete(a.reservations, GetReservationKey(r))
delete(a.reservations, util.GetReservationKey(r))
// cleanup nodeToR
nodeName := GetReservationNodeName(r)
nodeName := util.GetReservationNodeName(r)
rOnNode := a.nodeToR[nodeName]
for i, rInfo := range rOnNode {
if rInfo.Reservation.Name == r.Name {
......@@ -250,7 +251,7 @@ func (c *reservationCache) AddToActive(r *schedulingv1alpha1.Reservation) {
defer c.lock.Unlock()
c.active.Add(r)
// directly remove the assumed state if the reservation is in assumed cache but not shared any more
key := GetReservationKey(r)
key := util.GetReservationKey(r)
assumed, ok := c.assumed[key]
if ok && assumed.shared <= 0 {
delete(c.assumed, key)
......@@ -260,14 +261,14 @@ func (c *reservationCache) AddToActive(r *schedulingv1alpha1.Reservation) {
func (c *reservationCache) AddToInactive(r *schedulingv1alpha1.Reservation) {
c.lock.Lock()
defer c.lock.Unlock()
c.inactive[GetReservationKey(r)] = r
c.inactive[util.GetReservationKey(r)] = r
c.active.Delete(r)
}
func (c *reservationCache) Assume(r *schedulingv1alpha1.Reservation) {
c.lock.Lock()
defer c.lock.Unlock()
key := GetReservationKey(r)
key := util.GetReservationKey(r)
assumed, ok := c.assumed[key]
if ok {
assumed.shared++
......@@ -292,7 +293,7 @@ func (c *reservationCache) unassume(r *schedulingv1alpha1.Reservation, update bo
// Here are the common operations for unassuming:
// 1. (update=true) Restore: set assumed object into a version without the caller's assuming change.
// 2. (update=false) Accept: keep assumed object since the the caller's assuming change is accepted.
key := GetReservationKey(r)
key := util.GetReservationKey(r)
assumed, ok := c.assumed[key]
if ok {
assumed.shared--
......@@ -315,7 +316,7 @@ func (c *reservationCache) Delete(r *schedulingv1alpha1.Reservation) {
c.lock.Lock()
defer c.lock.Unlock()
c.active.Delete(r)
delete(c.inactive, GetReservationKey(r))
delete(c.inactive, util.GetReservationKey(r))
}
func (c *reservationCache) GetOwned(pod *corev1.Pod) *reservationInfo {
......@@ -327,14 +328,14 @@ func (c *reservationCache) GetOwned(pod *corev1.Pod) *reservationInfo {
func (c *reservationCache) GetInCache(r *schedulingv1alpha1.Reservation) *reservationInfo {
c.lock.RLock()
defer c.lock.RUnlock()
key := GetReservationKey(r)
key := util.GetReservationKey(r)
// if assumed, use the assumed state
assumed, ok := c.assumed[key]
if ok {
return assumed.info
}
// otherwise, use in active cache
return c.active.Get(GetReservationKey(r))
return c.active.Get(util.GetReservationKey(r))
}
func (c *reservationCache) GetAllInactive() map[string]*schedulingv1alpha1.Reservation {
......@@ -350,7 +351,7 @@ func (c *reservationCache) GetAllInactive() map[string]*schedulingv1alpha1.Reser
func (c *reservationCache) IsInactive(r *schedulingv1alpha1.Reservation) bool {
c.lock.RLock()
defer c.lock.RUnlock()
_, ok := c.inactive[GetReservationKey(r)]
_, ok := c.inactive[util.GetReservationKey(r)]
return ok
}
......
......@@ -50,59 +50,6 @@ func StatusNodeNameIndexFunc(obj interface{}) ([]string, error) {
return []string{r.Status.NodeName}, nil
}
// IsReservationActive checks if the reservation is scheduled and its status is Available/Waiting (active to use).
func IsReservationActive(r *schedulingv1alpha1.Reservation) bool {
return r != nil && len(GetReservationNodeName(r)) > 0 &&
(r.Status.Phase == schedulingv1alpha1.ReservationAvailable || r.Status.Phase == schedulingv1alpha1.ReservationWaiting)
}
// IsReservationAvailable checks if the reservation is scheduled on a node and its status is Available.
func IsReservationAvailable(r *schedulingv1alpha1.Reservation) bool {
return r != nil && len(GetReservationNodeName(r)) > 0 && r.Status.Phase == schedulingv1alpha1.ReservationAvailable
}
func IsReservationSucceeded(r *schedulingv1alpha1.Reservation) bool {
return r != nil && r.Status.Phase == schedulingv1alpha1.ReservationSucceeded
}
func IsReservationFailed(r *schedulingv1alpha1.Reservation) bool {
return r != nil && r.Status.Phase == schedulingv1alpha1.ReservationFailed
}
func IsReservationExpired(r *schedulingv1alpha1.Reservation) bool {
if r == nil || r.Status.Phase != schedulingv1alpha1.ReservationFailed {
return false
}
for _, condition := range r.Status.Conditions {
if condition.Type == schedulingv1alpha1.ReservationConditionReady {
return condition.Status == schedulingv1alpha1.ConditionStatusFalse &&
condition.Reason == schedulingv1alpha1.ReasonReservationExpired
}
}
return false
}
func GetReservationNodeName(r *schedulingv1alpha1.Reservation) string {
return r.Status.NodeName
}
func SetReservationNodeName(r *schedulingv1alpha1.Reservation, nodeName string) {
r.Status.NodeName = nodeName
}
func ValidateReservation(r *schedulingv1alpha1.Reservation) error {
if r == nil || r.Spec.Template == nil {
return fmt.Errorf("the reservation misses the template spec")
}
if len(r.Spec.Owners) <= 0 {
return fmt.Errorf("the reservation misses the owner spec")
}
if r.Spec.TTL == nil && r.Spec.Expires == nil {
return fmt.Errorf("the reservation misses the expiration spec")
}
return nil
}
func isReservationNeedExpiration(r *schedulingv1alpha1.Reservation) bool {
// 1. failed or succeeded reservations does not need to expire
if r.Status.Phase == schedulingv1alpha1.ReservationFailed || r.Status.Phase == schedulingv1alpha1.ReservationSucceeded {
......@@ -121,13 +68,13 @@ func isReservationNeedCleanup(r *schedulingv1alpha1.Reservation) bool {
if r == nil {
return true
}
if IsReservationExpired(r) {
if util.IsReservationExpired(r) {
for _, condition := range r.Status.Conditions {
if condition.Reason == schedulingv1alpha1.ReasonReservationExpired {
return time.Since(condition.LastTransitionTime.Time) > defaultGCDuration
}
}
} else if IsReservationSucceeded(r) {
} else if util.IsReservationSucceeded(r) {
for _, condition := range r.Status.Conditions {
if condition.Reason == schedulingv1alpha1.ReasonReservationSucceeded {
return time.Since(condition.LastProbeTime.Time) > defaultGCDuration
......@@ -139,7 +86,7 @@ func isReservationNeedCleanup(r *schedulingv1alpha1.Reservation) bool {
func setReservationAvailable(r *schedulingv1alpha1.Reservation, nodeName string) {
// just annotate scheduled node at status
SetReservationNodeName(r, nodeName)
util.SetReservationNodeName(r, nodeName)
r.Status.Phase = schedulingv1alpha1.ReservationAvailable
r.Status.CurrentOwners = make([]corev1.ObjectReference, 0)
......@@ -201,36 +148,6 @@ func setReservationExpired(r *schedulingv1alpha1.Reservation) {
}
}
func SetReservationUnschedulable(r *schedulingv1alpha1.Reservation, msg string) {
// unschedule reservations can try scheduling in next cycles, so we does not update its phase
// not duplicate condition info
idx := -1
isScheduled := false
for i, condition := range r.Status.Conditions {
if condition.Type == schedulingv1alpha1.ReservationConditionScheduled {
idx = i
isScheduled = condition.Status == schedulingv1alpha1.ConditionStatusTrue
}
}
if idx < 0 { // if not set condition
condition := schedulingv1alpha1.ReservationCondition{
Type: schedulingv1alpha1.ReservationConditionScheduled,
Status: schedulingv1alpha1.ConditionStatusFalse,
Reason: schedulingv1alpha1.ReasonReservationUnschedulable,
Message: msg,
LastProbeTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
}
r.Status.Conditions = append(r.Status.Conditions, condition)
} else if isScheduled { // if is scheduled, keep the condition status
r.Status.Conditions[idx].LastProbeTime = metav1.Now()
} else { // if already unschedulable, update the message
r.Status.Conditions[idx].Reason = schedulingv1alpha1.ReasonReservationUnschedulable
r.Status.Conditions[idx].Message = msg
r.Status.Conditions[idx].LastProbeTime = metav1.Now()
}
}
func setReservationAllocated(r *schedulingv1alpha1.Reservation, pod *corev1.Pod) {
owner := getPodOwner(pod)
requests, _ := resourceapi.PodRequestsAndLimits(pod)
......
......@@ -18,7 +18,6 @@ package reservation
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
......@@ -58,75 +57,6 @@ func TestStatusNodeNameIndexFunc(t *testing.T) {
})
}
func TestNewReservePod(t *testing.T) {
t.Run("test not panic", func(t *testing.T) {
r := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: corev1.PodSpec{
NodeName: "test-node-0",
},
},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
}
reservePod := NewReservePod(r)
assert.NotNil(t, reservePod)
assert.True(t, IsReservePod(reservePod))
})
}
func TestIsReservationActive(t *testing.T) {
t.Run("test not panic", func(t *testing.T) {
rPending := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: corev1.PodSpec{
NodeName: "test-node-0",
},
},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
}
assert.Equal(t, false, IsReservationActive(rPending))
rActive := rPending.DeepCopy()
rActive.Status = schedulingv1alpha1.ReservationStatus{
Phase: schedulingv1alpha1.ReservationAvailable,
NodeName: "test-node-0",
}
assert.Equal(t, true, IsReservationActive(rActive))
})
}
func Test_matchReservationOwners(t *testing.T) {
type args struct {
pod *corev1.Pod
......@@ -313,50 +243,3 @@ func Test_matchReservationOwners(t *testing.T) {
})
}
}
func TestGetReservationSchedulerName(t *testing.T) {
tests := []struct {
name string
arg *schedulingv1alpha1.Reservation
want string
}{
{
name: "empty reservation",
arg: nil,
want: corev1.DefaultSchedulerName,
},
{
name: "empty template",
arg: &schedulingv1alpha1.Reservation{},
want: corev1.DefaultSchedulerName,
},
{
name: "empty scheduler name",
arg: &schedulingv1alpha1.Reservation{
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{},
},
},
want: corev1.DefaultSchedulerName,
},
{
name: "get scheduler name successfully",
arg: &schedulingv1alpha1.Reservation{
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SchedulerName: "test-scheduler",
},
},
},
},
want: "test-scheduler",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetReservationSchedulerName(tt.arg)
assert.Equal(t, tt.want, got)
})
}
}
......@@ -17,7 +17,6 @@ limitations under the License.
package util
import (
"encoding/json"
"fmt"
"io/ioutil"
"path"
......@@ -25,7 +24,6 @@ import (
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/strategicpatch"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
......@@ -256,16 +254,3 @@ func GetPodRequest(pod *corev1.Pod, resourceNames ...corev1.ResourceName) corev1
func IsPodTerminated(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed
}
func GeneratePodPatch(oldPod, newPod *corev1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, err
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, err
}
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, &corev1.Pod{})
}
......@@ -17,8 +17,6 @@ limitations under the License.
package util
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
......@@ -620,38 +618,3 @@ func Test_GetPodBEMemoryRequest(t *testing.T) {
})
}
}
func Test_GeneratePodPatch(t *testing.T) {
pod1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-pod-1",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "test-container-1"},
{Name: "test-container-2"},
},
},
}
patchAnnotation := map[string]string{"test_case": "Test_GeneratePodPatch"}
pod2 := pod1.DeepCopy()
pod2.SetAnnotations(patchAnnotation)
patchBytes, err := GeneratePodPatch(pod1, pod2)
if err != nil {
t.Errorf("error creating patch bytes %v", err)
}
var patchMap map[string]interface{}
err = json.Unmarshal(patchBytes, &patchMap)
if err != nil {
t.Errorf("error unmarshalling json patch : %v", err)
}
metadata, ok := patchMap["metadata"].(map[string]interface{})
if !ok {
t.Errorf("error converting metadata to version map")
}
annotation, _ := metadata["annotations"].(map[string]interface{})
if fmt.Sprint(annotation) != fmt.Sprint(patchAnnotation) {
t.Errorf("expect patchBytes: %q, got: %q", patchAnnotation, annotation)
}
}
......@@ -14,10 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package reservation
package util
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
apiext "github.com/koordinator-sh/koordinator/apis/extension"
......@@ -50,10 +54,20 @@ func NewReservePod(r *schedulingv1alpha1.Reservation) *corev1.Pod {
reservePod.Namespace = corev1.NamespaceDefault
}
// annotate the reservePod
// labels, annotations: `objectMeta` overwrites `template.objectMeta`
if reservePod.Labels == nil {
reservePod.Labels = map[string]string{}
}
for k, v := range r.Labels {
reservePod.Labels[k] = v
}
if reservePod.Annotations == nil {
reservePod.Annotations = map[string]string{}
}
for k, v := range r.Annotations {
reservePod.Annotations[k] = v
}
// annotate the reservePod
reservePod.Annotations[AnnotationReservePod] = "true"
reservePod.Annotations[AnnotationReservationName] = r.Name // for search inversely
......@@ -72,6 +86,22 @@ func NewReservePod(r *schedulingv1alpha1.Reservation) *corev1.Pod {
return reservePod
}
func ValidateReservation(r *schedulingv1alpha1.Reservation) error {
if r == nil {
return fmt.Errorf("the reservation is nil")
}
if r.Spec.Template == nil {
return fmt.Errorf("the reservation misses the template spec")
}
if len(r.Spec.Owners) <= 0 {
return fmt.Errorf("the reservation misses the owner spec")
}
if r.Spec.TTL == nil && r.Spec.Expires == nil {
return fmt.Errorf("the reservation misses the expiration spec")
}
return nil
}
func IsReservePod(pod *corev1.Pod) bool {
return pod != nil && pod.Annotations != nil && pod.Annotations[AnnotationReservePod] == "true"
}
......@@ -98,3 +128,171 @@ func GetReservationSchedulerName(r *schedulingv1alpha1.Reservation) string {
}
return r.Spec.Template.Spec.SchedulerName
}
// IsReservationActive checks if the reservation is scheduled and its status is Available/Waiting (active to use).
func IsReservationActive(r *schedulingv1alpha1.Reservation) bool {
return r != nil && len(GetReservationNodeName(r)) > 0 &&
(r.Status.Phase == schedulingv1alpha1.ReservationAvailable || r.Status.Phase == schedulingv1alpha1.ReservationWaiting)
}
// IsReservationAvailable checks if the reservation is scheduled on a node and its status is Available.
func IsReservationAvailable(r *schedulingv1alpha1.Reservation) bool {
return r != nil && len(GetReservationNodeName(r)) > 0 && r.Status.Phase == schedulingv1alpha1.ReservationAvailable
}
func IsReservationSucceeded(r *schedulingv1alpha1.Reservation) bool {
return r != nil && r.Status.Phase == schedulingv1alpha1.ReservationSucceeded
}
func IsReservationFailed(r *schedulingv1alpha1.Reservation) bool {
return r != nil && r.Status.Phase == schedulingv1alpha1.ReservationFailed
}
func IsReservationExpired(r *schedulingv1alpha1.Reservation) bool {
if r == nil || r.Status.Phase != schedulingv1alpha1.ReservationFailed {
return false
}
for _, condition := range r.Status.Conditions {
if condition.Type == schedulingv1alpha1.ReservationConditionReady {
return condition.Status == schedulingv1alpha1.ConditionStatusFalse &&
condition.Reason == schedulingv1alpha1.ReasonReservationExpired
}
}
return false
}
func GetReservationNodeName(r *schedulingv1alpha1.Reservation) string {
return r.Status.NodeName
}
func SetReservationNodeName(r *schedulingv1alpha1.Reservation, nodeName string) {
r.Status.NodeName = nodeName
}
func SetReservationUnschedulable(r *schedulingv1alpha1.Reservation, msg string) {
// unschedule reservations can try scheduling in next cycles, so we does not update its phase
// not duplicate condition info
idx := -1
isScheduled := false
for i, condition := range r.Status.Conditions {
if condition.Type == schedulingv1alpha1.ReservationConditionScheduled {
idx = i
isScheduled = condition.Status == schedulingv1alpha1.ConditionStatusTrue
}
}
if idx < 0 { // if not set condition
condition := schedulingv1alpha1.ReservationCondition{
Type: schedulingv1alpha1.ReservationConditionScheduled,
Status: schedulingv1alpha1.ConditionStatusFalse,
Reason: schedulingv1alpha1.ReasonReservationUnschedulable,
Message: msg,
LastProbeTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
}
r.Status.Conditions = append(r.Status.Conditions, condition)
} else if isScheduled { // if is scheduled, keep the condition status
r.Status.Conditions[idx].LastProbeTime = metav1.Now()
} else { // if already unschedulable, update the message
r.Status.Conditions[idx].Reason = schedulingv1alpha1.ReasonReservationUnschedulable
r.Status.Conditions[idx].Message = msg
r.Status.Conditions[idx].LastProbeTime = metav1.Now()
}
}
func IsObjValidActiveReservation(obj interface{}) bool {
reservation, _ := obj.(*schedulingv1alpha1.Reservation)
err := ValidateReservation(reservation)
if err != nil {
klog.ErrorS(err, "failed to validate reservation obj", "reservation", klog.KObj(reservation))
return false
}
if !IsReservationActive(reservation) {
klog.V(6).InfoS("ignore reservation obj since it is not active",
"reservation", klog.KObj(reservation), "phase", reservation.Status.Phase)
return false
}
return true
}
// ReservationToPodEventHandlerFuncs can be used to handle reservation events with a pod event handler, which converts
// each reservation object into the corresponding reserve pod object.
// e.g.
// func registerReservationEventHandler(handle framework.Handle, podHandler podHandler) {
// extendedHandle, ok := handle.(frameworkext.ExtendedHandle)
// if !ok { // if not implement extendedHandle, ignore reservation events
// klog.V(3).Infof("registerReservationEventHandler aborted, cannot convert handle to frameworkext.ExtendedHandle, got %T", handle)
// return
// }
// extendedHandle.KoordinatorSharedInformerFactory().Scheduling().V1alpha1().Reservations().Informer().AddEventHandler(&util.ReservationToPodEventHandlerFuncs{
// FilterFunc: util.IsObjValidActiveReservation,
// PodHandler: &podHandler,
// })
// }
type ReservationToPodEventHandlerFuncs struct {
FilterFunc func(obj interface{}) bool
PodHandler cache.ResourceEventHandler
}
var _ cache.ResourceEventHandler = &ReservationToPodEventHandlerFuncs{}
func (r ReservationToPodEventHandlerFuncs) OnAdd(obj interface{}) {
reservation, ok := obj.(*schedulingv1alpha1.Reservation)
if !ok {
return
}
if !r.FilterFunc(reservation) {
return
}
pod := NewReservePod(reservation)
r.PodHandler.OnAdd(pod)
}
// OnUpdate calls UpdateFunc if it's not nil.
func (r ReservationToPodEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
oldR, oldOK := oldObj.(*schedulingv1alpha1.Reservation)
newR, newOK := newObj.(*schedulingv1alpha1.Reservation)
if !oldOK || !newOK {
return
}
oldOK = r.FilterFunc(oldR)
newOK = r.FilterFunc(newR)
switch {
case oldOK && newOK:
oldPod := NewReservePod(oldR)
newPod := NewReservePod(newR)
r.PodHandler.OnUpdate(oldPod, newPod)
case !oldOK && newOK:
newPod := NewReservePod(newR)
r.PodHandler.OnAdd(newPod)
case oldOK && !newOK:
oldPod := NewReservePod(oldR)
r.PodHandler.OnDelete(oldPod)
default:
// do nothing
}
}
// OnDelete calls DeleteFunc if it's not nil.
func (r ReservationToPodEventHandlerFuncs) OnDelete(obj interface{}) {
var reservation *schedulingv1alpha1.Reservation
switch t := obj.(type) {
case *schedulingv1alpha1.Reservation:
reservation = t
case cache.DeletedFinalStateUnknown:
var ok bool
reservation, ok = t.Obj.(*schedulingv1alpha1.Reservation)
if !ok {
return
}
default:
return
}
if !r.FilterFunc(reservation) {
return
}
pod := NewReservePod(reservation)
r.PodHandler.OnDelete(pod)
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)
func TestNewReservePod(t *testing.T) {
t.Run("test not panic", func(t *testing.T) {
r := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: corev1.PodSpec{
NodeName: "test-node-0",
},
},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
}
reservePod := NewReservePod(r)
assert.NotNil(t, reservePod)
assert.True(t, IsReservePod(reservePod))
})
}
func TestIsReservationActive(t *testing.T) {
t.Run("test not panic", func(t *testing.T) {
rPending := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: corev1.PodSpec{
NodeName: "test-node-0",
},
},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
}
assert.Equal(t, false, IsReservationActive(rPending))
rActive := rPending.DeepCopy()
rActive.Status = schedulingv1alpha1.ReservationStatus{
Phase: schedulingv1alpha1.ReservationAvailable,
NodeName: "test-node-0",
}
assert.Equal(t, true, IsReservationActive(rActive))
})
}
func TestGetReservationSchedulerName(t *testing.T) {
tests := []struct {
name string
arg *schedulingv1alpha1.Reservation
want string
}{
{
name: "empty reservation",
arg: nil,
want: corev1.DefaultSchedulerName,
},
{
name: "empty template",
arg: &schedulingv1alpha1.Reservation{},
want: corev1.DefaultSchedulerName,
},
{
name: "empty scheduler name",
arg: &schedulingv1alpha1.Reservation{
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{},
},
},
want: corev1.DefaultSchedulerName,
},
{
name: "get scheduler name successfully",
arg: &schedulingv1alpha1.Reservation{
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SchedulerName: "test-scheduler",
},
},
},
},
want: "test-scheduler",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetReservationSchedulerName(tt.arg)
assert.Equal(t, tt.want, got)
})
}
}
func TestIsObjValidActiveReservation(t *testing.T) {
tests := []struct {
name string
arg interface{}
want bool
}{
{
name: "valid and active",
arg: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
Status: schedulingv1alpha1.ReservationStatus{
Phase: schedulingv1alpha1.ReservationAvailable,
NodeName: "test-node-0",
},
},
want: true,
},
{
name: "valid but not active",
arg: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
Status: schedulingv1alpha1.ReservationStatus{
Phase: schedulingv1alpha1.ReservationSucceeded,
NodeName: "test-node-0",
},
},
want: false,
},
{
name: "invalid",
arg: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "test-pod-0",
},
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
Status: schedulingv1alpha1.ReservationStatus{
Phase: schedulingv1alpha1.ReservationPending,
},
},
want: false,
},
{
name: "invalid 1",
arg: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "reserve-pod-0",
},
},
TTL: &metav1.Duration{Duration: 30 * time.Minute},
},
Status: schedulingv1alpha1.ReservationStatus{
Phase: schedulingv1alpha1.ReservationPending,
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := IsObjValidActiveReservation(tt.arg)
assert.Equal(t, tt.want, got)
})
}
}
......@@ -17,12 +17,25 @@ limitations under the License.
package util
import (
"context"
"encoding/json"
"fmt"
"reflect"
jsonpatch "github.com/evanphx/json-patch"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
)
// MergeCfg returns a merged interface. Value in new will
......@@ -67,3 +80,215 @@ func RetryOnConflictOrTooManyRequests(fn func() error) error {
return errors.IsConflict(err) || errors.IsTooManyRequests(err)
}, fn)
}
func GeneratePodPatch(oldPod, newPod *corev1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, err
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, err
}
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, &corev1.Pod{})
}
func PatchPod(clientset clientset.Interface, oldPod, newPod *corev1.Pod) (*corev1.Pod, error) {
// generate patch bytes for the update
patchBytes, err := GeneratePodPatch(oldPod, newPod)
if err != nil {
klog.V(5).InfoS("failed to generate pod patch", "pod", klog.KObj(oldPod), "err", err)
return nil, err
}
if string(patchBytes) == "{}" { // nothing to patch
return oldPod, nil
}
// patch with pod client
patched, err := clientset.CoreV1().Pods(oldPod.Namespace).
Patch(context.TODO(), oldPod.Name, apimachinerytypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
klog.V(5).InfoS("failed to patch pod", "pod", klog.KObj(oldPod), "patch", string(patchBytes), "err", err)
return nil, err
}
klog.V(6).InfoS("successfully patch pod", "pod", klog.KObj(oldPod), "patch", string(patchBytes))
return patched, nil
}
func GenerateReservationPatch(oldReservation, newReservation *schedulingv1alpha1.Reservation) ([]byte, error) {
oldData, err := json.Marshal(oldReservation)
if err != nil {
return nil, err
}
newData, err := json.Marshal(newReservation)
if err != nil {
return nil, err
}
return jsonpatch.CreateMergePatch(oldData, newData)
}
func PatchReservation(clientset koordinatorclientset.Interface, oldReservation, newReservation *schedulingv1alpha1.Reservation) (*schedulingv1alpha1.Reservation, error) {
patchBytes, err := GenerateReservationPatch(oldReservation, newReservation)
if err != nil {
klog.V(5).InfoS("failed to generate reservation patch", "reservation", klog.KObj(oldReservation), "err", err)
return nil, err
}
if string(patchBytes) == "{}" { // nothing to patch
return oldReservation, nil
}
// NOTE: CRDs do not support strategy merge patch, so here falls back to merge patch.
// link: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
patched, err := clientset.SchedulingV1alpha1().Reservations().
Patch(context.TODO(), oldReservation.Name, apimachinerytypes.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
klog.V(5).InfoS("failed to patch pod", "pod", klog.KObj(oldReservation), "patch", string(patchBytes), "err", err)
return nil, err
}
klog.V(6).InfoS("successfully patch pod", "pod", klog.KObj(oldReservation), "patch", string(patchBytes))
return patched, nil
}
// Patch is for simply patching arbitrary objects (e.g. pods, koord CRDs).
type Patch struct {
Clientset clientset.Interface
KoordClientset koordinatorclientset.Interface
// patch data
// NOTE: add more fields if needed
LabelsAdd map[string]string
AnnotationsAdd map[string]string
LabelsRemove []string
AnnotationsRemove []string
}
func NewPatch() *Patch {
return &Patch{
LabelsAdd: map[string]string{},
AnnotationsAdd: map[string]string{},
}
}
func (p *Patch) WithHandle(handle framework.Handle) *Patch {
p.Clientset = handle.ClientSet()
// set KoordClientset if ExtendedHandle implemented
extendedHandle, ok := handle.(frameworkext.ExtendedHandle)
if ok {
p.KoordClientset = extendedHandle.KoordinatorClientSet()
}
return p
}
func (p *Patch) WithClientset(cs clientset.Interface) *Patch {
p.Clientset = cs
return p
}
func (p *Patch) WithKoordinatorClientSet(cs koordinatorclientset.Interface) *Patch {
p.KoordClientset = cs
return p
}
func (p *Patch) AddLabels(labels map[string]string) *Patch {
for k, v := range labels {
p.LabelsAdd[k] = v
}
return p
}
func (p *Patch) AddAnnotations(annotations map[string]string) *Patch {
for k, v := range annotations {
p.AnnotationsAdd[k] = v
}
return p
}
func (p *Patch) RemoveLabels(labelKeys []string) *Patch {
for _, key := range labelKeys {
p.LabelsRemove = append(p.LabelsRemove, key)
}
return p
}
func (p *Patch) RemoveAnnotations(annotationKeys []string) *Patch {
for _, key := range annotationKeys {
p.AnnotationsRemove = append(p.AnnotationsRemove, key)
}
return p
}
func (p *Patch) PatchPod(pod *corev1.Pod) (*corev1.Pod, error) {
if p.Clientset == nil || reflect.ValueOf(p.Clientset).IsNil() {
return nil, fmt.Errorf("missing clientset for pod")
}
newPod := pod.DeepCopy()
if newPod.Labels == nil {
newPod.Labels = map[string]string{}
}
for k, v := range p.LabelsAdd {
newPod.Labels[k] = v
}
for _, key := range p.LabelsRemove {
delete(newPod.Labels, key)
}
if newPod.Annotations == nil {
newPod.Annotations = map[string]string{}
}
for k, v := range p.AnnotationsAdd {
newPod.Annotations[k] = v
}
for _, key := range p.AnnotationsRemove {
delete(newPod.Annotations, key)
}
return PatchPod(p.Clientset, pod, newPod)
}
func (p *Patch) PatchReservation(r *schedulingv1alpha1.Reservation) (*schedulingv1alpha1.Reservation, error) {
if p.KoordClientset == nil || reflect.ValueOf(p.KoordClientset).IsNil() {
return nil, fmt.Errorf("missing clientset for reservation")
}
newR := r.DeepCopy()
if newR.Labels == nil {
newR.Labels = map[string]string{}
}
for k, v := range p.LabelsAdd {
newR.Labels[k] = v
}
for _, key := range p.LabelsRemove {
delete(newR.Labels, key)
}
if newR.Annotations == nil {
newR.Annotations = map[string]string{}
}
for k, v := range p.AnnotationsAdd {
newR.Annotations[k] = v
}
for _, key := range p.AnnotationsRemove {
delete(newR.Annotations, key)
}
return PatchReservation(p.KoordClientset, r, newR)
}
// PatchPodOrReservation patches the pod (if the pod is not a reserve pod) or corresponding reservation object (if the
// pod is a reserve pod) with the given patch data.
func (p *Patch) PatchPodOrReservation(pod *corev1.Pod) (interface{}, error) {
// if pod is not a reserve pod, patch the pod with pod client
if !IsReservePod(pod) {
return p.PatchPod(pod)
}
// otherwise the pod is a reserve pod, patch the reservation with reservation client
// fake reservation objects to generate patch
rName := GetReservationNameFromReservePod(pod)
reservation := &schedulingv1alpha1.Reservation{
ObjectMeta: pod.ObjectMeta,
}
reservation.Name = rName
return p.PatchReservation(reservation)
}
......@@ -17,10 +17,24 @@ limitations under the License.
package util
import (
"context"
"encoding/json"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/utils/pointer"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
clientschedulingv1alpha1 "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/typed/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
)
func Test_MergeCfg(t *testing.T) {
......@@ -219,3 +233,224 @@ func TestMaxInt64(t *testing.T) {
})
}
}
func Test_GeneratePodPatch(t *testing.T) {
pod1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-pod-1",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "test-container-1"},
{Name: "test-container-2"},
},
},
}
patchAnnotation := map[string]string{"test_case": "Test_GeneratePodPatch"}
pod2 := pod1.DeepCopy()
pod2.SetAnnotations(patchAnnotation)
patchBytes, err := GeneratePodPatch(pod1, pod2)
if err != nil {
t.Errorf("error creating patch bytes %v", err)
}
var patchMap map[string]interface{}
err = json.Unmarshal(patchBytes, &patchMap)
if err != nil {
t.Errorf("error unmarshalling json patch : %v", err)
}
metadata, ok := patchMap["metadata"].(map[string]interface{})
if !ok {
t.Errorf("error converting metadata to version map")
}
annotation, _ := metadata["annotations"].(map[string]interface{})
if fmt.Sprint(annotation) != fmt.Sprint(patchAnnotation) {
t.Errorf("expect patchBytes: %q, got: %q", patchAnnotation, annotation)
}
}
type fakeReservationClientSet struct {
koordinatorclientset.Interface
clientschedulingv1alpha1.SchedulingV1alpha1Interface
clientschedulingv1alpha1.ReservationInterface
reservations map[string]*schedulingv1alpha1.Reservation
patchErr map[string]bool
}
func (f *fakeReservationClientSet) SchedulingV1alpha1() clientschedulingv1alpha1.SchedulingV1alpha1Interface {
return f
}
func (f *fakeReservationClientSet) Reservations() clientschedulingv1alpha1.ReservationInterface {
return f
}
func (f *fakeReservationClientSet) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *schedulingv1alpha1.Reservation, err error) {
if f.patchErr[name] {
return nil, fmt.Errorf("patch error")
}
r, ok := f.reservations[name]
if !ok {
return nil, fmt.Errorf("reservation not found")
}
return r, nil
}
type fakeExtendedHandle struct {
client *kubefake.Clientset
koordClient *fakeReservationClientSet
frameworkext.ExtendedHandle
}
func (f *fakeExtendedHandle) ClientSet() clientset.Interface {
return f.client
}
func (f *fakeExtendedHandle) KoordinatorClientSet() koordinatorclientset.Interface {
return f.koordClient
}
func TestPatch_PatchPodOrReservation(t *testing.T) {
testNormalPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-1",
},
}
testR := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation-0",
UID: "123456",
},
}
testReservePod := NewReservePod(testR)
type fields struct {
handle framework.Handle
annotations map[string]string
}
type args struct {
pod *corev1.Pod
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "nothing to patch for normal pod",
fields: fields{
handle: &fakeExtendedHandle{
client: kubefake.NewSimpleClientset(testNormalPod),
},
},
args: args{
pod: &corev1.Pod{},
},
wantErr: false,
},
{
name: "patch successfully for normal pod",
fields: fields{
handle: &fakeExtendedHandle{
client: kubefake.NewSimpleClientset(testNormalPod),
},
annotations: map[string]string{
"aaa": "bbb",
},
},
args: args{
pod: testNormalPod,
},
wantErr: false,
},
{
name: "nothing to patch for reserve pod",
fields: fields{
handle: &fakeExtendedHandle{
koordClient: &fakeReservationClientSet{},
},
},
args: args{
pod: testReservePod,
},
wantErr: false,
},
{
name: "patch successfully for reserve pod",
fields: fields{
handle: &fakeExtendedHandle{
koordClient: &fakeReservationClientSet{
reservations: map[string]*schedulingv1alpha1.Reservation{
testR.Name: testR,
},
},
},
annotations: map[string]string{
"aaa": "bbb",
},
},
args: args{
pod: testReservePod,
},
wantErr: false,
},
{
name: "patch error for reserve pod",
fields: fields{
handle: &fakeExtendedHandle{
koordClient: &fakeReservationClientSet{
reservations: map[string]*schedulingv1alpha1.Reservation{
testR.Name: testR,
},
patchErr: map[string]bool{
testR.Name: true,
},
},
},
annotations: map[string]string{
"aaa": "bbb",
},
},
args: args{
pod: testReservePod,
},
wantErr: true,
},
{
name: "patch not found for reserve pod",
fields: fields{
handle: &fakeExtendedHandle{
koordClient: &fakeReservationClientSet{
reservations: map[string]*schedulingv1alpha1.Reservation{},
},
},
annotations: map[string]string{
"aaa": "bbb",
},
},
args: args{
pod: testReservePod,
},
wantErr: true,
},
{
name: "missing clientset for reserve pod",
fields: fields{
handle: &fakeExtendedHandle{},
annotations: map[string]string{
"aaa": "bbb",
},
},
args: args{
pod: testReservePod,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, gotErr := NewPatch().WithHandle(tt.fields.handle).AddAnnotations(tt.fields.annotations).PatchPodOrReservation(tt.args.pod)
assert.Equal(t, tt.wantErr, gotErr != nil)
})
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment