Unverified Commit 07257e9d authored by Alex Dadgar's avatar Alex Dadgar Committed by GitHub
Browse files

Merge pull request #4118 from hashicorp/f-deployment-desired

Deployment watcher gates whether allocs can reschedule
parents 29acccc0 6f37a2a1
Branches unavailable
No related merge requests found
Showing with 324 additions and 251 deletions
+324 -251
......@@ -214,6 +214,10 @@ type DesiredTransition struct {
// Migrate is used to indicate that this allocation should be stopped and
// migrated to another node.
Migrate *bool
// Reschedule is used to indicate that this allocation is eligible to be
// rescheduled.
Reschedule *bool
}
// ShouldMigrate returns whether the transition object dictates a migration.
......
......@@ -25,14 +25,6 @@ func (d *deploymentWatcherRaftShim) convertApplyErrors(applyResp interface{}, in
return index, err
}
func (d *deploymentWatcherRaftShim) UpsertEvals(evals []*structs.Evaluation) (uint64, error) {
update := &structs.EvalUpdateRequest{
Evals: evals,
}
fsmErrIntf, index, raftErr := d.apply(structs.EvalUpdateRequestType, update)
return d.convertApplyErrors(fsmErrIntf, index, raftErr)
}
func (d *deploymentWatcherRaftShim) UpsertJob(job *structs.Job) (uint64, error) {
job.SetSubmitTime()
update := &structs.JobRegisterRequest{
......@@ -56,3 +48,8 @@ func (d *deploymentWatcherRaftShim) UpdateDeploymentAllocHealth(req *structs.App
fsmErrIntf, index, raftErr := d.apply(structs.DeploymentAllocHealthRequestType, req)
return d.convertApplyErrors(fsmErrIntf, index, raftErr)
}
func (d *deploymentWatcherRaftShim) UpdateAllocDesiredTransition(req *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) {
fsmErrIntf, index, raftErr := d.apply(structs.AllocUpdateDesiredTransitionRequestType, req)
return d.convertApplyErrors(fsmErrIntf, index, raftErr)
}
......@@ -7,58 +7,62 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// EvalBatcher is used to batch the creation of evaluations
type EvalBatcher struct {
// AllocUpdateBatcher is used to batch the updates to the desired transitions
// of allocations and the creation of evals.
type AllocUpdateBatcher struct {
// batch is the batching duration
batch time.Duration
// raft is used to actually commit the evaluations
// raft is used to actually commit the updates
raft DeploymentRaftEndpoints
// workCh is used to pass evaluations to the daemon process
workCh chan *evalWrapper
workCh chan *updateWrapper
// ctx is used to exit the daemon batcher
ctx context.Context
}
// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
// create the evaluations and exits the batcher when the passed exit channel is
// closed.
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
b := &EvalBatcher{
// NewAllocUpdateBatcher returns an AllocUpdateBatcher that uses the passed raft endpoints to
// create the allocation desired transition updates and new evaluations and
// exits the batcher when the passed exit channel is closed.
func NewAllocUpdateBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *AllocUpdateBatcher {
b := &AllocUpdateBatcher{
batch: batchDuration,
raft: raft,
ctx: ctx,
workCh: make(chan *evalWrapper, 10),
workCh: make(chan *updateWrapper, 10),
}
go b.batcher()
return b
}
// CreateEval batches the creation of the evaluation and returns a future that
// tracks the evaluations creation.
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
wrapper := &evalWrapper{
e: e,
f: make(chan *EvalFuture, 1),
// CreateUpdate batches the allocation desired transition update and returns a
// future that tracks the completion of the request.
func (b *AllocUpdateBatcher) CreateUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) *BatchFuture {
wrapper := &updateWrapper{
allocs: allocs,
e: eval,
f: make(chan *BatchFuture, 1),
}
b.workCh <- wrapper
return <-wrapper.f
}
type evalWrapper struct {
e *structs.Evaluation
f chan *EvalFuture
type updateWrapper struct {
allocs map[string]*structs.DesiredTransition
e *structs.Evaluation
f chan *BatchFuture
}
// batcher is the long lived batcher goroutine
func (b *EvalBatcher) batcher() {
func (b *AllocUpdateBatcher) batcher() {
var timerCh <-chan time.Time
allocs := make(map[string]*structs.DesiredTransition)
evals := make(map[string]*structs.Evaluation)
future := NewEvalFuture()
future := NewBatchFuture()
for {
select {
case <-b.ctx.Done():
......@@ -68,59 +72,68 @@ func (b *EvalBatcher) batcher() {
timerCh = time.After(b.batch)
}
// Store the eval and attach the future
// Store the eval and alloc updates, and attach the future
evals[w.e.DeploymentID] = w.e
for id, upd := range w.allocs {
allocs[id] = upd
}
w.f <- future
case <-timerCh:
// Capture the future and create a new one
f := future
future = NewEvalFuture()
future = NewBatchFuture()
// Shouldn't be possible
if f == nil {
panic("no future")
}
// Capture the evals
all := make([]*structs.Evaluation, 0, len(evals))
// Create the request
req := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: allocs,
Evals: make([]*structs.Evaluation, 0, len(evals)),
}
for _, e := range evals {
all = append(all, e)
req.Evals = append(req.Evals, e)
}
// Upsert the evals in a go routine
go f.Set(b.raft.UpsertEvals(all))
go f.Set(b.raft.UpdateAllocDesiredTransition(req))
// Reset the evals list and timer
evals = make(map[string]*structs.Evaluation)
allocs = make(map[string]*structs.DesiredTransition)
timerCh = nil
}
}
}
// EvalFuture is a future that can be used to retrieve the index the eval was
// BatchFuture is a future that can be used to retrieve the index the eval was
// created at or any error in the creation process
type EvalFuture struct {
type BatchFuture struct {
index uint64
err error
waitCh chan struct{}
}
// NewEvalFuture returns a new EvalFuture
func NewEvalFuture() *EvalFuture {
return &EvalFuture{
// NewBatchFuture returns a new BatchFuture
func NewBatchFuture() *BatchFuture {
return &BatchFuture{
waitCh: make(chan struct{}),
}
}
// Set sets the results of the future, unblocking any client.
func (f *EvalFuture) Set(index uint64, err error) {
func (f *BatchFuture) Set(index uint64, err error) {
f.index = index
f.err = err
close(f.waitCh)
}
// Results returns the creation index and any error.
func (f *EvalFuture) Results() (uint64, error) {
func (f *BatchFuture) Results() (uint64, error) {
<-f.waitCh
return f.index, f.err
}
......@@ -22,11 +22,21 @@ const (
perJobEvalBatchPeriod = 1 * time.Second
)
var (
// allowRescheduleTransition is the transition that allows failed
// allocations part of a deployment to be rescheduled. We create a one off
// variable to avoid creating a new object for every request.
allowRescheduleTransition = &structs.DesiredTransition{
Reschedule: helper.BoolToPtr(true),
}
)
// deploymentTriggers are the set of functions required to trigger changes on
// behalf of a deployment
type deploymentTriggers interface {
// createEvaluation is used to create an evaluation.
createEvaluation(eval *structs.Evaluation) (uint64, error)
// createUpdate is used to create allocation desired transition updates and
// an evaluation.
createUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) (uint64, error)
// upsertJob is used to roll back a job when autoreverting for a deployment
upsertJob(job *structs.Job) (uint64, error)
......@@ -69,9 +79,13 @@ type deploymentWatcher struct {
j *structs.Job
// outstandingBatch marks whether an outstanding function exists to create
// the evaluation. Access should be done through the lock
// the evaluation. Access should be done through the lock.
outstandingBatch bool
// outstandingAllowReplacements is the map of allocations that will be
// marked as allowing a replacement. Access should be done through the lock.
outstandingAllowReplacements map[string]*structs.DesiredTransition
// latestEval is the latest eval for the job. It is updated by the watch
// loop and any time an evaluation is created. The field should be accessed
// by holding the lock or using the setter and getter methods.
......@@ -346,12 +360,12 @@ func (w *deploymentWatcher) StopWatch() {
// deployment changes. Its function is to create evaluations to trigger the
// scheduler when more progress can be made, to fail the deployment if it has
// failed and potentially rolling back the job. Progress can be made when an
// allocation transistions to healthy, so we create an eval.
// allocation transitions to healthy, so we create an eval.
func (w *deploymentWatcher) watch() {
// Get the deadline. This is likely a zero time to begin with but we need to
// handle the case that the deployment has already progressed and we are now
// just starting to watch it. This must likely would occur if there was a
// leader transistion and we are now starting our watcher.
// leader transition and we are now starting our watcher.
currentDeadline := getDeploymentProgressCutoff(w.getDeployment())
var deadlineTimer *time.Timer
if currentDeadline.IsZero() {
......@@ -391,9 +405,10 @@ FAIL:
next := getDeploymentProgressCutoff(w.getDeployment())
if !next.Equal(currentDeadline) {
currentDeadline = next
if deadlineTimer.Reset(next.Sub(time.Now())) {
if !deadlineTimer.Stop() {
<-deadlineTimer.C
}
deadlineTimer.Reset(next.Sub(time.Now()))
}
case updates = <-w.getAllocsCh(allocIndex):
......@@ -427,8 +442,8 @@ FAIL:
}
// Create an eval to push the deployment along
if res.createEval {
w.createEvalBatched(allocIndex)
if res.createEval || len(res.allowReplacements) != 0 {
w.createBatchedUpdate(res.allowReplacements, allocIndex)
}
}
}
......@@ -470,9 +485,10 @@ FAIL:
// allocUpdateResult is used to return the desired actions given the newest set
// of allocations for the deployment.
type allocUpdateResult struct {
createEval bool
failDeployment bool
rollback bool
createEval bool
failDeployment bool
rollback bool
allowReplacements []string
}
// handleAllocUpdate is used to compute the set of actions to take based on the
......@@ -502,13 +518,18 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
continue
}
// Determine if the update stanza for this group is progress based
progressBased := dstate.ProgressDeadline != 0
// We need to create an eval so the job can progress.
if alloc.DeploymentStatus.IsHealthy() {
res.createEval = true
} else if progressBased && alloc.DeploymentStatus.IsUnhealthy() && deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
}
// If the group is using a deadline, we don't have to do anything.
if dstate.ProgressDeadline != 0 {
// If the group is using a progress deadline, we don't have to do anything.
if progressBased {
continue
}
......@@ -599,12 +620,21 @@ func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) {
return stable, nil
}
// createEvalBatched creates an eval but batches calls together
func (w *deploymentWatcher) createEvalBatched(forIndex uint64) {
// createBatchedUpdate creates an eval for the given index as well as updating
// the given allocations to allow them to reschedule.
func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forIndex uint64) {
w.l.Lock()
defer w.l.Unlock()
if w.outstandingBatch || forIndex < w.latestEval {
// Store the allocations that can be replaced
for _, allocID := range allowReplacements {
if w.outstandingAllowReplacements == nil {
w.outstandingAllowReplacements = make(map[string]*structs.DesiredTransition, len(allowReplacements))
}
w.outstandingAllowReplacements[allocID] = allowRescheduleTransition
}
if w.outstandingBatch || (forIndex < w.latestEval && len(allowReplacements) == 0) {
return
}
......@@ -619,17 +649,18 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) {
default:
}
w.l.Lock()
replacements := w.outstandingAllowReplacements
w.outstandingAllowReplacements = nil
w.outstandingBatch = false
w.l.Unlock()
// Create the eval
if index, err := w.createEvaluation(w.getEval()); err != nil {
if index, err := w.createUpdate(replacements, w.getEval()); err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to create evaluation for deployment %q: %v", w.deploymentID, err)
} else {
w.setLatestEval(index)
}
w.l.Lock()
w.outstandingBatch = false
w.l.Unlock()
})
}
......
......@@ -19,9 +19,10 @@ const (
// second
LimitStateQueriesPerSecond = 100.0
// CrossDeploymentEvalBatchDuration is the duration in which evaluations are
// batched across all deployment watchers before committing to Raft.
CrossDeploymentEvalBatchDuration = 250 * time.Millisecond
// CrossDeploymentUpdateBatchDuration is the duration in which allocation
// desired transition and evaluation creation updates are batched across
// all deployment watchers before committing to Raft.
CrossDeploymentUpdateBatchDuration = 250 * time.Millisecond
)
var (
......@@ -33,9 +34,6 @@ var (
// DeploymentRaftEndpoints exposes the deployment watcher to a set of functions
// to apply data transforms via Raft.
type DeploymentRaftEndpoints interface {
// UpsertEvals is used to upsert a set of evaluations
UpsertEvals([]*structs.Evaluation) (uint64, error)
// UpsertJob is used to upsert a job
UpsertJob(job *structs.Job) (uint64, error)
......@@ -49,6 +47,10 @@ type DeploymentRaftEndpoints interface {
// UpdateDeploymentAllocHealth is used to set the health of allocations in a
// deployment
UpdateDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error)
// UpdateAllocDesiredTransition is used to update the desired transition
// for allocations.
UpdateAllocDesiredTransition(req *structs.AllocUpdateDesiredTransitionRequest) (uint64, error)
}
// Watcher is used to watch deployments and their allocations created
......@@ -61,9 +63,9 @@ type Watcher struct {
// queryLimiter is used to limit the rate of blocking queries
queryLimiter *rate.Limiter
// evalBatchDuration is the duration to batch eval creation across all
// deployment watchers
evalBatchDuration time.Duration
// updateBatchDuration is the duration to batch allocation desired
// transition and eval creation across all deployment watchers
updateBatchDuration time.Duration
// raft contains the set of Raft endpoints that can be used by the
// deployments watcher
......@@ -75,8 +77,9 @@ type Watcher struct {
// watchers is the set of active watchers, one per deployment
watchers map[string]*deploymentWatcher
// evalBatcher is used to batch the creation of evaluations
evalBatcher *EvalBatcher
// allocUpdateBatcher is used to batch the creation of evaluations and
// allocation desired transition updates
allocUpdateBatcher *AllocUpdateBatcher
// ctx and exitFn are used to cancel the watcher
ctx context.Context
......@@ -89,13 +92,13 @@ type Watcher struct {
// deployments and trigger the scheduler as needed.
func NewDeploymentsWatcher(logger *log.Logger,
raft DeploymentRaftEndpoints, stateQueriesPerSecond float64,
evalBatchDuration time.Duration) *Watcher {
updateBatchDuration time.Duration) *Watcher {
return &Watcher{
raft: raft,
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
evalBatchDuration: evalBatchDuration,
logger: logger,
raft: raft,
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
updateBatchDuration: updateBatchDuration,
logger: logger,
}
}
......@@ -136,7 +139,7 @@ func (w *Watcher) flush() {
w.watchers = make(map[string]*deploymentWatcher, 32)
w.ctx, w.exitFn = context.WithCancel(context.Background())
w.evalBatcher = NewEvalBatcher(w.evalBatchDuration, w.raft, w.ctx)
w.allocUpdateBatcher = NewAllocUpdateBatcher(w.updateBatchDuration, w.raft, w.ctx)
}
// watchDeployments is the long lived go-routine that watches for deployments to
......@@ -354,10 +357,10 @@ func (w *Watcher) FailDeployment(req *structs.DeploymentFailRequest, resp *struc
return watcher.FailDeployment(req, resp)
}
// createEvaluation commits the given evaluation to Raft but batches the commit
// with other calls.
func (w *Watcher) createEvaluation(eval *structs.Evaluation) (uint64, error) {
return w.evalBatcher.CreateEval(eval).Results()
// createUpdate commits the given allocation desired transition and evaluation
// to Raft but batches the commit with other calls.
func (w *Watcher) createUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) (uint64, error) {
return w.allocUpdateBatcher.CreateUpdate(allocs, eval).Results()
}
// upsertJob commits the given job to Raft
......
......@@ -22,7 +22,7 @@ func testDeploymentWatcher(t *testing.T, qps float64, batchDur time.Duration) (*
}
func defaultTestDeploymentWatcher(t *testing.T) (*Watcher, *mockBackend) {
return testDeploymentWatcher(t, LimitStateQueriesPerSecond, CrossDeploymentEvalBatchDuration)
return testDeploymentWatcher(t, LimitStateQueriesPerSecond, CrossDeploymentUpdateBatchDuration)
}
// Tests that the watcher properly watches for deployments and reconciles them
......@@ -712,10 +712,10 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) {
j2.Stable = false
assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2")
// Assert that we will get a createEvaluation call only once. This will
// Assert that we will get a update allocation call only once. This will
// verify that the watcher is batching allocation changes
m1 := matchUpsertEvals([]string{d.ID})
m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once()
m1 := matchUpdateAllocDesiredTransitions([]string{d.ID})
m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once()
// Assert that we get a call to UpsertDeploymentStatusUpdate
c := &matchDeploymentStatusUpdateConfig{
......@@ -787,7 +787,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) {
t.Fatal(err)
})
m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1))
m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1))
// After we upsert the job version will go to 2. So use this to assert the
// original call happened.
......@@ -907,8 +907,8 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) {
// Assert that we will get a createEvaluation call only once. This will
// verify that the watcher is batching allocation changes
m1 := matchUpsertEvals([]string{d.ID})
m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once()
m1 := matchUpdateAllocDesiredTransitions([]string{d.ID})
m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once()
// Assert that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status
c := &matchDeploymentStatusUpdateConfig{
......@@ -980,15 +980,15 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) {
t.Fatal(err)
})
m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1))
m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1))
// verify that the job version hasn't changed after upsert
m.state.JobByID(nil, structs.DefaultNamespace, j.ID)
assert.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version)
}
// Test evaluations are batched between watchers
func TestWatcher_BatchEvals(t *testing.T) {
// Test allocation updates and evaluation creation is batched between watchers
func TestWatcher_BatchAllocUpdates(t *testing.T) {
t.Parallel()
assert := assert.New(t)
w, m := testDeploymentWatcher(t, 1000.0, 1*time.Second)
......@@ -1024,8 +1024,8 @@ func TestWatcher_BatchEvals(t *testing.T) {
// Assert that we will get a createEvaluation call only once and it contains
// both deployments. This will verify that the watcher is batching
// allocation changes
m1 := matchUpsertEvals([]string{d1.ID, d2.ID})
m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once()
m1 := matchUpdateAllocDesiredTransitions([]string{d1.ID, d2.ID})
m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once()
w.SetEnabled(true, m.state)
testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil },
......@@ -1062,11 +1062,11 @@ func TestWatcher_BatchEvals(t *testing.T) {
}
if l := len(evals1); l != 1 {
return false, fmt.Errorf("Got %d evals; want 1", l)
return false, fmt.Errorf("Got %d evals for job %v; want 1", l, j1.ID)
}
if l := len(evals2); l != 1 {
return false, fmt.Errorf("Got %d evals; want 1", l)
return false, fmt.Errorf("Got %d evals for job 2; want 1", l)
}
return true, nil
......@@ -1074,7 +1074,7 @@ func TestWatcher_BatchEvals(t *testing.T) {
t.Fatal(err)
})
m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1))
m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1))
testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil },
func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") })
}
......@@ -39,16 +39,16 @@ func (m *mockBackend) nextIndex() uint64 {
return i
}
func (m *mockBackend) UpsertEvals(evals []*structs.Evaluation) (uint64, error) {
m.Called(evals)
func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) {
m.Called(u)
i := m.nextIndex()
return i, m.state.UpsertEvals(i, evals)
return i, m.state.UpdateAllocsDesiredTransitions(i, u.Allocs, u.Evals)
}
// matchUpsertEvals is used to match an upsert request
func matchUpsertEvals(deploymentIDs []string) func(evals []*structs.Evaluation) bool {
return func(evals []*structs.Evaluation) bool {
if len(evals) != len(deploymentIDs) {
// matchUpdateAllocDesiredTransitions is used to match an upsert request
func matchUpdateAllocDesiredTransitions(deploymentIDs []string) func(update *structs.AllocUpdateDesiredTransitionRequest) bool {
return func(update *structs.AllocUpdateDesiredTransitionRequest) bool {
if len(update.Evals) != len(deploymentIDs) {
return false
}
......@@ -57,7 +57,7 @@ func matchUpsertEvals(deploymentIDs []string) func(evals []*structs.Evaluation)
dmap[d] = struct{}{}
}
for _, e := range evals {
for _, e := range update.Evals {
if _, ok := dmap[e.DeploymentID]; !ok {
return false
}
......
......@@ -254,7 +254,7 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
n.l.RUnlock()
n.batchDrainAllocs(forceStop)
// Submit the node transistions in a sharded form to ensure a reasonable
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
......@@ -324,7 +324,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
}
// Submit the node transistions in a sharded form to ensure a reasonable
// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
......@@ -374,9 +374,9 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er
func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) {
// Compute the effected jobs and make the transition map
jobs := make(map[string]*structs.Allocation, 4)
transistions := make(map[string]*structs.DesiredTransition, len(allocs))
transitions := make(map[string]*structs.DesiredTransition, len(allocs))
for _, alloc := range allocs {
transistions[alloc.ID] = &structs.DesiredTransition{
transitions[alloc.ID] = &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
jobs[alloc.JobID] = alloc
......@@ -397,7 +397,7 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs
// Commit this update via Raft
var finalIndex uint64
for _, u := range partitionAllocDrain(defaultMaxIdsPerTxn, transistions, evals) {
for _, u := range partitionAllocDrain(defaultMaxIdsPerTxn, transitions, evals) {
index, err := n.raft.AllocUpdateDesiredTransition(u.Transitions, u.Evals)
if err != nil {
future.Respond(0, err)
......
......@@ -13,9 +13,9 @@ func TestDrainer_PartitionAllocDrain(t *testing.T) {
maxIdsPerTxn := 2
require := require.New(t)
transistions := map[string]*structs.DesiredTransition{"a": nil, "b": nil, "c": nil}
transitions := map[string]*structs.DesiredTransition{"a": nil, "b": nil, "c": nil}
evals := []*structs.Evaluation{nil, nil, nil}
requests := partitionAllocDrain(maxIdsPerTxn, transistions, evals)
requests := partitionAllocDrain(maxIdsPerTxn, transitions, evals)
require.Len(requests, 3)
first := requests[0]
......
......@@ -37,6 +37,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context,
// For each alloc that doesn't have its deployment status set, set it
var updates []*structs.Allocation
now := time.Now()
for _, alloc := range allocs {
if alloc.Job.Type != structs.JobTypeService {
continue
......@@ -47,7 +48,8 @@ func allocPromoter(errCh chan<- error, ctx context.Context,
}
newAlloc := alloc.Copy()
newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
Healthy: helper.BoolToPtr(true),
Timestamp: now,
}
updates = append(updates, newAlloc)
logger.Printf("Marked deployment health for alloc %q", alloc.ID)
......@@ -658,7 +660,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
})
}
// Test that transistions to force drain work.
// Test that transitions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
require := require.New(t)
......
......@@ -582,19 +582,34 @@ func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error
return err
}
n.handleUpsertedEvals(evals)
return nil
}
// handleUpsertingEval is a helper for taking action after upserting
// evaluations.
func (n *nomadFSM) handleUpsertedEvals(evals []*structs.Evaluation) {
for _, eval := range evals {
if eval.ShouldEnqueue() {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
}
n.handleUpsertedEval(eval)
}
}
// handleUpsertingEval is a helper for taking action after upserting an eval.
func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
if eval == nil {
return
}
if eval.ShouldEnqueue() {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
}
return nil
}
func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
......@@ -731,10 +746,7 @@ func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) i
return err
}
if err := n.upsertEvals(index, req.Evals); err != nil {
n.logger.Printf("[ERR] nomad.fsm: AllocUpdateDesiredTransition failed to upsert %d eval(s): %v", len(req.Evals), err)
return err
}
n.handleUpsertedEvals(req.Evals)
return nil
}
......@@ -826,10 +838,7 @@ func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interfa
return err
}
if req.Eval != nil && req.Eval.ShouldEnqueue() {
n.evalBroker.Enqueue(req.Eval)
}
n.handleUpsertedEval(req.Eval)
return nil
}
......@@ -846,10 +855,7 @@ func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{
return err
}
if req.Eval != nil && req.Eval.ShouldEnqueue() {
n.evalBroker.Enqueue(req.Eval)
}
n.handleUpsertedEval(req.Eval)
return nil
}
......@@ -867,10 +873,7 @@ func (n *nomadFSM) applyDeploymentAllocHealth(buf []byte, index uint64) interfac
return err
}
if req.Eval != nil && req.Eval.ShouldEnqueue() {
n.evalBroker.Enqueue(req.Eval)
}
n.handleUpsertedEval(req.Eval)
return nil
}
......
......@@ -473,7 +473,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
}
reply.NodeModifyIndex = index
// If the node is transistioning to be eligible, create Node evaluations
// If the node is transitioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
......@@ -553,7 +553,7 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
}
}
// If the node is transistioning to be eligible, create Node evaluations
// If the node is transitioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
......
......@@ -893,7 +893,7 @@ func (s *Server) setupDeploymentWatcher() error {
s.deploymentWatcher = deploymentwatcher.NewDeploymentsWatcher(
s.logger, raftShim,
deploymentwatcher.LimitStateQueriesPerSecond,
deploymentwatcher.CrossDeploymentEvalBatchDuration)
deploymentwatcher.CrossDeploymentUpdateBatchDuration)
return nil
}
......
......@@ -2084,6 +2084,12 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[str
}
}
for _, eval := range evals {
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}
}
// Update the indexes
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
......
......@@ -4016,6 +4016,11 @@ func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) {
require.Nil(err)
require.EqualValues(1001, index)
// Check the eval is created
eout, err := state.EvalByID(nil, eval.ID)
require.Nil(err)
require.NotNil(eout)
m = map[string]*structs.DesiredTransition{alloc.ID: t2}
require.Nil(state.UpdateAllocsDesiredTransitions(1002, m, evals))
......
......@@ -5535,6 +5535,13 @@ type DesiredTransition struct {
// Migrate is used to indicate that this allocation should be stopped and
// migrated to another node.
Migrate *bool
// Reschedule is used to indicate that this allocation is eligible to be
// rescheduled. Most allocations are automatically eligible for
// rescheduling, so this field is only required when an allocation is not
// automatically eligible. An example is an allocation that is part of a
// deployment.
Reschedule *bool
}
// Merge merges the two desired transitions, preferring the values from the
......@@ -5543,6 +5550,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) {
if o.Migrate != nil {
d.Migrate = o.Migrate
}
if o.Reschedule != nil {
d.Reschedule = o.Reschedule
}
}
// ShouldMigrate returns whether the transition object dictates a migration.
......@@ -5550,6 +5561,12 @@ func (d *DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}
// ShouldReschedule returns whether the transition object dictates a
// rescheduling.
func (d *DesiredTransition) ShouldReschedule() bool {
return d.Reschedule != nil && *d.Reschedule
}
const (
AllocDesiredStatusRun = "run" // Allocation should run
AllocDesiredStatusStop = "stop" // Allocation should stop
......@@ -5969,6 +5986,7 @@ func (a *Allocation) Stub() *AllocListStub {
DesiredDescription: a.DesiredDescription,
ClientStatus: a.ClientStatus,
ClientDescription: a.ClientDescription,
DesiredTransition: a.DesiredTransition,
TaskStates: a.TaskStates,
DeploymentStatus: a.DeploymentStatus,
FollowupEvalID: a.FollowupEvalID,
......@@ -5992,6 +6010,7 @@ type AllocListStub struct {
DesiredDescription string
ClientStatus string
ClientDescription string
DesiredTransition DesiredTransition
TaskStates map[string]*TaskState
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
......
......@@ -194,26 +194,8 @@ func (a *allocReconciler) Compute() *reconcileResults {
// Detect if the deployment is paused
if a.deployment != nil {
// XXX Fix
// XXX An idea for not replacing failed allocs that are part of
// deployment that will fail immediately is to only replace them if
// their desired transition has a replace bool set by the deployment
// watcher.
// Detect if any allocs associated with this deploy have failed
// Failed allocations could edge trigger an evaluation before the deployment watcher
// runs and marks the deploy as failed. This block makes sure that is still
// considered a failed deploy
failedAllocsInDeploy := false
//for _, as := range m {
//for _, alloc := range as {
//if alloc.DeploymentID == a.deployment.ID && alloc.ClientStatus == structs.AllocClientStatusFailed {
//failedAllocsInDeploy = true
//}
//}
//}
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed || failedAllocsInDeploy
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
// Reconcile each group
......
......@@ -19,69 +19,6 @@ import (
"github.com/stretchr/testify/require"
)
/*
Basic Tests:
√ Place when there is nothing in the cluster
√ Place remainder when there is some in the cluster
√ Scale down from n to n-m where n != m
√ Scale down from n to zero
√ Inplace upgrade test
√ Inplace upgrade and scale up test
√ Inplace upgrade and scale down test
√ Destructive upgrade
√ Destructive upgrade and scale up test
√ Destructive upgrade and scale down test
√ Handle lost nodes
√ Handle lost nodes and scale up
√ Handle lost nodes and scale down
√ Handle draining nodes
√ Handle draining nodes and scale up
√ Handle draining nodes and scale down
√ Handle task group being removed
√ Handle job being stopped both as .Stopped and nil
√ Place more that one group
√ Handle delayed rescheduling failed allocs for batch jobs
√ Handle delayed rescheduling failed allocs for service jobs
√ Handle eligible now rescheduling failed allocs for batch jobs
√ Handle eligible now rescheduling failed allocs for service jobs
√ Previously rescheduled allocs should not be rescheduled again
√ Aggregated evaluations for allocations that fail close together
Update stanza Tests:
√ Stopped job cancels any active deployment
√ Stopped job doesn't cancel terminal deployment
√ JobIndex change cancels any active deployment
√ JobIndex change doesn't cancels any terminal deployment
√ Destructive changes create deployment and get rolled out via max_parallelism
√ Don't create a deployment if there are no changes
√ Deployment created by all inplace updates
√ Paused or failed deployment doesn't create any more canaries
√ Paused or failed deployment doesn't do any placements unless replacing lost allocs
√ Paused or failed deployment doesn't do destructive updates
√ Paused does do migrations
√ Failed deployment doesn't do migrations
√ Canary that is on a draining node
√ Canary that is on a lost node
√ Stop old canaries
√ Create new canaries on job change
√ Create new canaries on job change while scaling up
√ Create new canaries on job change while scaling down
√ Fill canaries if partial placement
√ Promote canaries unblocks max_parallel
√ Promote canaries when canaries == count
√ Only place as many as are healthy in deployment
√ Limit calculation accounts for healthy allocs on migrating/lost nodes
√ Failed deployment should not place anything
√ Run after canaries have been promoted, new allocs have been rolled out and there is no deployment
√ Failed deployment cancels non-promoted task groups
√ Failed deployment and updated job works
√ Finished deployment gets marked as complete
√ Change job change while scaling up
√ Update the job when all allocations from the previous job haven't been placed yet.
√ Paused or failed deployment doesn't do any rescheduling of failed allocs
√ Running deployment with failed allocs doesn't do any rescheduling of failed allocs
*/
var (
canaryUpdate = &structs.UpdateStrategy{
Canary: 2,
......@@ -3117,6 +3054,7 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) {
DesiredTotal: 2,
DesiredCanaries: 2,
PlacedAllocs: 2,
HealthyAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
......@@ -3490,6 +3428,69 @@ func TestReconciler_CompleteDeployment(t *testing.T) {
})
}
// Tests that the reconciler marks a deployment as complete once there is
// nothing left to place even if there are failed allocations that are part of
// the deployment.
func TestReconciler_MarkDeploymentComplete_FailedAllocations(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = noCanaryUpdate
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
DesiredTotal: 10,
PlacedAllocs: 20,
HealthyAllocs: 10,
}
// Create 10 healthy allocs and 10 allocs that are failed
var allocs []*structs.Allocation
for i := 0; i < 20; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i%10))
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DeploymentID = d.ID
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{}
if i < 10 {
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.DeploymentStatus.Healthy = helper.BoolToPtr(true)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.DeploymentStatus.Healthy = helper.BoolToPtr(false)
}
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
updates := []*structs.DeploymentStatusUpdate{
{
DeploymentID: d.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
},
}
// Assert the correct results
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: updates,
place: 0,
inplace: 0,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Ignore: 10,
},
},
})
}
// Test that a failed deployment cancels non-promoted canaries
func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) {
// Create a job with two task groups
......@@ -3913,7 +3914,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
})
}
// Test that a running deployment with failed allocs will not result in rescheduling failed allocations
// Test that a running deployment with failed allocs will not result in
// rescheduling failed allocations unless they are marked as reschedulable.
func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = noCanaryUpdate
......@@ -3925,13 +3927,13 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
d.Status = structs.DeploymentStatusRunning
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
Promoted: false,
DesiredTotal: 5,
PlacedAllocs: 4,
DesiredTotal: 10,
PlacedAllocs: 10,
}
// Create 4 allocations and mark two as failed
// Create 10 allocations
var allocs []*structs.Allocation
for i := 0; i < 4; i++ {
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
......@@ -3939,31 +3941,30 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DeploymentID = d.ID
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs = append(allocs, alloc)
}
// Create allocs that are reschedulable now
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[3].ClientStatus = structs.AllocClientStatusFailed
allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
// Mark half of them as reschedulable
for i := 0; i < 5; i++ {
allocs[i].DesiredTransition.Reschedule = helper.BoolToPtr(true)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert that no rescheduled placements were created
assertResults(t, r, &resultExpectation{
place: 0,
place: 5,
createDeployment: nil,
deploymentUpdates: nil,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Ignore: 2,
Place: 5,
Ignore: 5,
},
},
})
......@@ -3993,12 +3994,12 @@ func TestReconciler_FailedDeployment_AutoRevert_CancelCanaries(t *testing.T) {
jobv2.Version = 2
jobv2.TaskGroups[0].Meta = map[string]string{"version": "2"}
// Create an existing failed deployment that has promoted one task group
d := structs.NewDeployment(jobv2)
state := &structs.DeploymentState{
Promoted: false,
DesiredTotal: 3,
PlacedAllocs: 3,
Promoted: true,
DesiredTotal: 3,
PlacedAllocs: 3,
HealthyAllocs: 3,
}
d.TaskGroups[job.TaskGroups[0].Name] = state
......
......@@ -321,8 +321,15 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation
// should be rescheduled now, later or left in the untainted set
func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
rescheduleTime, eligible := alloc.NextRescheduleTime()
// If the allocation is part of a deployment, only allow it to reschedule if
// it has been marked eligible for it explicitly.
if alloc.DeploymentID != "" && !alloc.DesiredTransition.ShouldReschedule() {
return
}
// Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time
rescheduleTime, eligible := alloc.NextRescheduleTime()
if eligible && (alloc.FollowupEvalID == evalID || rescheduleTime.Sub(now) <= rescheduleWindowSize) {
rescheduleNow = true
return
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment