Commit c0bbb136 authored by Alex Dadgar's avatar Alex Dadgar
Browse files

Fix force deadline notification

Showing with 34 additions and 8 deletions
+34 -8
......@@ -54,19 +54,14 @@ func (d *deadlineHeap) watch() {
case <-timer.C:
default:
}
var nextDeadline time.Time
defer timer.Stop()
var nextDeadline time.Time
for {
select {
case <-d.ctx.Done():
return
case <-timer.C:
if nextDeadline.IsZero() {
continue
}
var batch []string
d.mu.Lock()
......@@ -96,7 +91,7 @@ func (d *deadlineHeap) watch() {
continue
}
if !deadline.Equal(nextDeadline) {
if deadline.IsZero() || !deadline.Equal(nextDeadline) {
timer.Reset(deadline.Sub(time.Now()))
nextDeadline = deadline
}
......
......@@ -147,3 +147,34 @@ func TestDeadlineHeap_WatchCoalesce(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}
}
func TestDeadlineHeap_MultipleForce(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 1*time.Second)
nodeID := "1"
deadline := time.Time{}
h.Watch(nodeID, deadline)
var batch []string
select {
case batch = <-h.NextBatch():
case <-time.After(10 * time.Millisecond):
t.Fatal("timeout")
}
require.Len(batch, 1)
require.Equal(nodeID, batch[0])
nodeID = "2"
h.Watch(nodeID, deadline)
select {
case batch = <-h.NextBatch():
case <-time.After(10 * time.Millisecond):
t.Fatal("timeout")
}
require.Len(batch, 1)
require.Equal(nodeID, batch[0])
}
......@@ -727,7 +727,7 @@ func TestDrainer_Batch_Force(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Make sure the batch job is migrated
testutil.AssertUntil(500*time.Millisecond, func() (bool, error) {
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment