Commit 8f9a00ef authored by Tim Gross's avatar Tim Gross
Browse files

eval broker: shed blocked evals older than acknowledged eval

parent 54474a95
Showing with 226 additions and 150 deletions
+226 -150
......@@ -64,7 +64,11 @@ type EvalBroker struct {
jobEvals map[structs.NamespacedID]string
// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]PendingEvaluations
blocked map[structs.NamespacedID]BlockedEvaluations
// cancelable tracks previously blocked evaluations that are now safe for
// the Eval.Ack RPC to cancel in batches
cancelable PendingEvaluations
// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
......@@ -139,7 +143,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
blocked: make(map[structs.NamespacedID]BlockedEvaluations),
cancelable: PendingEvaluations{},
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
......@@ -559,6 +564,7 @@ func (b *EvalBroker) Ack(evalID, token string) error {
return fmt.Errorf("Token does not match for Evaluation ID")
}
jobID := unack.Eval.JobID
oldestUnackedIndex := unack.Eval.ModifyIndex
// Ensure we were able to stop the timer
if !unack.NackTimer.Stop() {
......@@ -588,6 +594,13 @@ func (b *EvalBroker) Ack(evalID, token string) error {
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)
if len(blocked) > 0 {
// Any blocked evaluations with ModifyIndexes older than the
// just-ack'd evaluation are no longer useful, so it's safe to drop
// them. We do this *after* we've popped the highest priority
// evaluation as a belt-and-suspenders; we can process one extra
// evaluation uselessly but can skip many.
cancelable := blocked.MarkForCancel(oldestUnackedIndex)
b.cancelable = append(b.cancelable, cancelable...)
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
......@@ -737,7 +750,8 @@ func (b *EvalBroker) flush() {
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.blocked = make(map[structs.NamespacedID]BlockedEvaluations)
b.cancelable = PendingEvaluations{}
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
......@@ -841,6 +855,15 @@ func (b *EvalBroker) Stats() *BrokerStats {
return stats
}
// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()
return b.cancelable.PopN(batchSize)
}
// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
......@@ -904,7 +927,7 @@ func (p PendingEvaluations) Less(i, j int) bool {
if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return p[i].CreateIndex < p[j].CreateIndex
return (p[i].CreateIndex < p[j].CreateIndex)
}
// Swap is for the sorting interface
......@@ -934,3 +957,92 @@ func (p PendingEvaluations) Peek() *structs.Evaluation {
}
return p[n-1]
}
// PopN removes and returns the minimum N evaluations from the slice.
func (p *PendingEvaluations) PopN(n int) []*structs.Evaluation {
if n > len(*p) {
n = len(*p)
}
popped := []*structs.Evaluation{}
for i := 0; i < n; i++ {
raw := heap.Pop(p)
eval := raw.(*structs.Evaluation)
popped = append(popped, eval)
}
return popped
}
// BlockedEvaluations are those evaluations blocked from being
// enqueued while another evaluation for the same job is being
// processed (is in JobEvals and/or pending)
type BlockedEvaluations []*structs.Evaluation
// Len is for the sorting interface
func (p BlockedEvaluations) Len() int {
return len(p)
}
// Swap is for the sorting interface
func (p BlockedEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority
func (p BlockedEvaluations) Less(i, j int) bool {
if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return !(p[i].CreateIndex < p[j].CreateIndex)
}
// Push is used to add a new evaluation to the slice
func (p *BlockedEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}
// Pop is used to remove an evaluation from the slice
func (p *BlockedEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}
// MarkForCancel removes any evaluations older than the index from the blocked
// list and returns a list of cancelable evals so that Eval.Ack RPCs can write
// batched raft entries to cancel them. This must be called inside the broker's
// lock.
func (p *BlockedEvaluations) MarkForCancel(index uint64) PendingEvaluations {
// In common but pathological cases, we can have a large number of blocked
// evals but will want to cancel most of them. Using heap.Remove requires we
// re-sort for each eval we remove. Because the slice is already sorted, we
// can shift removed items to the left in a single pass and then truncate
// the slice.
cancelable := PendingEvaluations{}
startLen := len(*p) // starting length
j := 0 // position to move next eval to keep
for i := 0; i < len(*p); i++ {
eval := (*p)[i]
if eval.ModifyIndex >= index {
(*p)[j] = eval
j++
} else {
heap.Push(&cancelable, eval)
(*p)[i] = nil
}
}
// truncate the slice
if len(cancelable) > 0 {
*p = (*p)[:startLen-cancelable.Len()]
}
return cancelable
}
package nomad
import (
"container/heap"
"encoding/json"
"errors"
"fmt"
......@@ -11,6 +12,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
......@@ -422,207 +424,113 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
b.Enqueue(eval5)
stats := b.Stats()
if stats.TotalReady != 2 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 3 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 2, must.Sprintf("expected 2 ready: %#v", stats))
must.Eq(t, stats.TotalBlocked, 3, must.Sprintf("expected 3 blocked: %#v", stats))
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval, must.Sprint("expected 1st eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 3 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 3, must.Sprintf("expected 3 blocked: %#v", stats))
// Ack out
err = b.Ack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 2 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 2 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 2, must.Sprintf("expected 2 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 2, must.Sprintf("expected 2 blocked: %#v", stats))
// Dequeue should work
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval2 {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval2, must.Sprint("expected 2nd eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 2 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 2, must.Sprintf("expected 2 blocked: %#v", stats))
// Ack out
err = b.Ack(eval2.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 2 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 2, must.Sprintf("expected 2 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats))
// Dequeue should work
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval3 {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval3, must.Sprint("expected 3rd eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats))
// Ack out
err = b.Ack(eval3.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats))
// Dequeue should work
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval4 {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval4, must.Sprint("expected 4th eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 1, must.Sprintf("expected 1 blocked: %#v", stats))
// Ack out
err = b.Ack(eval4.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 1, must.Sprintf("expected 1 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 0, must.Sprintf("expected 0 blocked: %#v", stats))
// Dequeue should work
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval5 {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval5, must.Sprint("expected 5th eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 1, must.Sprintf("expected 1 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 0, must.Sprintf("expected 0 blocked: %#v", stats))
// Ack out
err = b.Ack(eval5.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, stats.TotalReady, 0, must.Sprintf("expected 0 ready: %#v", stats))
must.Eq(t, stats.TotalUnacked, 0, must.Sprintf("expected 0 unacked: %#v", stats))
must.Eq(t, stats.TotalBlocked, 0, must.Sprintf("expected 0 blocked: %#v", stats))
}
func TestEvalBroker_Enqueue_Disable(t *testing.T) {
......@@ -813,18 +721,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
b.SetEnabled(true)
NUM := 100
for i := 0; i < NUM; i++ {
for i := NUM; i > 0; i-- {
eval1 := mock.Eval()
eval1.CreateIndex = uint64(i)
eval1.ModifyIndex = uint64(i)
b.Enqueue(eval1)
}
for i := 0; i < NUM; i++ {
for i := 1; i < NUM; i++ {
out1, _, _ := b.Dequeue(defaultSched, time.Second)
if out1.CreateIndex != uint64(i) {
t.Fatalf("bad: %d %#v", i, out1)
}
must.Eq(t, out1.CreateIndex, uint64(i),
must.Sprintf("eval was not FIFO by CreateIndex"),
)
}
}
......@@ -1506,3 +1414,38 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) {
require.Equal(1, len(b.blocked))
}
func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) {
ci.Parallel(t)
blocked := BlockedEvaluations{}
// evals are pushed on the blocked queue LIFO by CreateIndex
// order, so Push them in reverse order here to assert we're
// getting the correct order
for i := 100; i > 0; i -= 10 {
eval := mock.Eval()
eval.JobID = "example"
eval.CreateIndex = uint64(i)
eval.ModifyIndex = uint64(i)
heap.Push(&blocked, eval)
}
canceled := blocked.MarkForCancel(40)
must.Len(t, 4, canceled)
must.Len(t, 6, blocked)
got := []uint64{}
for _, eval := range blocked {
must.NotNil(t, eval)
got = append(got, eval.CreateIndex)
}
must.Eq(t, []uint64{100, 90, 80, 70, 60, 50}, got)
popped := canceled.PopN(2)
must.Len(t, 2, canceled)
must.Len(t, 2, popped)
must.Eq(t, popped[0].CreateIndex, 10)
must.Eq(t, popped[1].CreateIndex, 20)
}
......@@ -229,6 +229,27 @@ func (e *Eval) Ack(args *structs.EvalAckRequest,
if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil {
return err
}
const cancelDesc = "cancelled after more recent eval was processed"
cancelable := e.srv.evalBroker.Cancelable(1000)
if len(cancelable) > 0 {
for _, eval := range cancelable {
eval.Status = structs.EvalStatusCancelled
eval.StatusDescription = cancelDesc
}
update := &structs.EvalUpdateRequest{
Evals: cancelable,
WriteRequest: structs.WriteRequest{Region: args.Region},
}
_, _, err = e.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
e.logger.Error("eval cancel failed", "error", err, "method", "ack")
return err
}
}
return nil
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment