Commit 832b1d56 authored by Michael Schurter's avatar Michael Schurter
Browse files

switch to new raft DesiredTransition message

parent 7deabe95
Showing with 228 additions and 156 deletions
+228 -156
...@@ -81,7 +81,7 @@ type Allocation struct { ...@@ -81,7 +81,7 @@ type Allocation struct {
Metrics *AllocationMetric Metrics *AllocationMetric
DesiredStatus string DesiredStatus string
DesiredDescription string DesiredDescription string
DesiredTransistion DesiredTransistion DesiredTransition DesiredTransition
ClientStatus string ClientStatus string
ClientDescription string ClientDescription string
TaskStates map[string]*TaskState TaskStates map[string]*TaskState
...@@ -207,10 +207,10 @@ type RescheduleEvent struct { ...@@ -207,10 +207,10 @@ type RescheduleEvent struct {
PrevNodeID string PrevNodeID string
} }
// DesiredTransistion is used to mark an allocation as having a desired state // DesiredTransition is used to mark an allocation as having a desired state
// transistion. This information can be used by the scheduler to make the // transition. This information can be used by the scheduler to make the
// correct decision. // correct decision.
type DesiredTransistion struct { type DesiredTransition struct {
// Migrate is used to indicate that this allocation should be stopped and // Migrate is used to indicate that this allocation should be stopped and
// migrated to another node. // migrated to another node.
Migrate *bool Migrate *bool
......
...@@ -202,13 +202,13 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, ...@@ -202,13 +202,13 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
return a.srv.blockingRPC(&opts) return a.srv.blockingRPC(&opts)
} }
// UpdateDesiredTransistion is used to update the desired transistions of an // UpdateDesiredTransition is used to update the desired transitions of an
// allocation. // allocation.
func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransistionRequest, reply *structs.GenericResponse) error { func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error {
if done, err := a.srv.forward("Alloc.UpdateDesiredTransistion", args, args, reply); done { if done, err := a.srv.forward("Alloc.UpdateDesiredTransition", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transistion"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transition"}, time.Now())
// Check that it is a management token. // Check that it is a management token.
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
...@@ -223,9 +223,9 @@ func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransis ...@@ -223,9 +223,9 @@ func (a *Alloc) UpdateDesiredTransistion(args *structs.AllocUpdateDesiredTransis
} }
// Commit this update via Raft // Commit this update via Raft
_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransistionRequestType, args) _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
if err != nil { if err != nil {
a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransistionRequest failed: %v", err) a.srv.logger.Printf("[ERR] nomad.allocs: AllocUpdateDesiredTransitionRequest failed: %v", err)
return err return err
} }
......
...@@ -484,7 +484,7 @@ func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) { ...@@ -484,7 +484,7 @@ func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
} }
} }
func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
...@@ -501,16 +501,38 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { ...@@ -501,16 +501,38 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})) require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}))
t1 := &structs.DesiredTransistion{ t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true), Migrate: helper.BoolToPtr(true),
} }
// Update the allocs desired status // Update the allocs desired status
get := &structs.AllocUpdateDesiredTransistionRequest{ get := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: map[string]*structs.DesiredTransistion{ Allocs: map[string]*structs.DesiredTransition{
alloc.ID: t1, alloc.ID: t1,
alloc2.ID: t1, alloc2.ID: t1,
}, },
Evals: []*structs.Evaluation{
{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
},
{
ID: uuid.Generate(),
Namespace: alloc2.Namespace,
Priority: alloc2.Job.Priority,
Type: alloc2.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc2.Job.ID,
JobModifyIndex: alloc2.Job.ModifyIndex,
Status: structs.EvalStatusPending,
},
},
WriteRequest: structs.WriteRequest{ WriteRequest: structs.WriteRequest{
Region: "global", Region: "global",
}, },
...@@ -518,14 +540,14 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { ...@@ -518,14 +540,14 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
// Try without permissions // Try without permissions
var resp structs.GenericResponse var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp) err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp)
require.NotNil(err) require.NotNil(err)
require.True(structs.IsErrPermissionDenied(err)) require.True(structs.IsErrPermissionDenied(err))
// Try with permissions // Try with permissions
get.WriteRequest.AuthToken = s1.getLeaderAcl() get.WriteRequest.AuthToken = s1.getLeaderAcl()
var resp2 structs.GenericResponse var resp2 structs.GenericResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransistion", get, &resp2)) require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp2))
require.NotZero(resp2.Index) require.NotZero(resp2.Index)
// Look up the allocations // Look up the allocations
...@@ -533,9 +555,15 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) { ...@@ -533,9 +555,15 @@ func TestAllocEndpoint_UpdateDesiredTransistion(t *testing.T) {
require.Nil(err) require.Nil(err)
out2, err := state.AllocByID(nil, alloc.ID) out2, err := state.AllocByID(nil, alloc.ID)
require.Nil(err) require.Nil(err)
e1, err := state.EvalByID(nil, get.Evals[0].ID)
require.Nil(err)
e2, err := state.EvalByID(nil, get.Evals[1].ID)
require.Nil(err)
require.NotNil(out1.DesiredTransistion.Migrate) require.NotNil(out1.DesiredTransition.Migrate)
require.NotNil(out2.DesiredTransistion.Migrate) require.NotNil(out2.DesiredTransition.Migrate)
require.True(*out1.DesiredTransistion.Migrate) require.NotNil(e1)
require.True(*out2.DesiredTransistion.Migrate) require.NotNil(e2)
require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransition.Migrate)
} }
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
...@@ -54,20 +55,17 @@ func makeTaskGroupKey(a *structs.Allocation) string { ...@@ -54,20 +55,17 @@ func makeTaskGroupKey(a *structs.Allocation) string {
// stopAllocs tracks allocs to drain by a unique TG key // stopAllocs tracks allocs to drain by a unique TG key
type stopAllocs struct { type stopAllocs struct {
allocBatch []*structs.Allocation allocBatch map[string]*structs.DesiredTransition
// namespace+jobid -> Job // namespace+jobid -> Job
jobBatch map[jobKey]*structs.Job jobBatch map[jobKey]*structs.Job
} }
//FIXME this method does an awful lot
func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) { func (s *stopAllocs) add(j *structs.Job, a *structs.Allocation) {
// Update the allocation // Add the desired migration transition to the batch
a.ModifyTime = time.Now().UnixNano() s.allocBatch[a.ID] = &structs.DesiredTransition{
a.DesiredStatus = structs.AllocDesiredStatusStop Migrate: helper.BoolToPtr(true),
}
// Add alloc to the allocation batch
s.allocBatch = append(s.allocBatch, a)
// Add job to the job batch // Add job to the job batch
s.jobBatch[jobKey{a.Namespace, a.JobID}] = j s.jobBatch[jobKey{a.Namespace, a.JobID}] = j
...@@ -204,6 +202,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -204,6 +202,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
// track number of allocs left on this node to be drained // track number of allocs left on this node to be drained
allocsLeft := false allocsLeft := false
deadlineReached := node.DrainStrategy.DeadlineTime().Before(now)
for _, alloc := range allocs { for _, alloc := range allocs {
jobkey := jobKey{alloc.Namespace, alloc.JobID} jobkey := jobKey{alloc.Namespace, alloc.JobID}
...@@ -224,13 +223,6 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -224,13 +223,6 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
panic(err) panic(err)
} }
// Don't bother collecting system jobs
if job.Type == structs.JobTypeSystem {
skipJob[jobkey] = struct{}{}
s.logger.Printf("[TRACE] nomad.drain: skipping system job %s", job.Name)
continue
}
// If alloc isn't yet terminal this node has // If alloc isn't yet terminal this node has
// allocs left to be drained // allocs left to be drained
if !alloc.TerminalStatus() { if !alloc.TerminalStatus() {
...@@ -240,9 +232,10 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -240,9 +232,10 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
} }
} }
// Don't bother collecting batch jobs for nodes that haven't hit their deadline // Don't bother collecting system/batch jobs for nodes that haven't hit their deadline
if job.Type == structs.JobTypeBatch && node.DrainStrategy.DeadlineTime().After(now) { if job.Type != structs.JobTypeService && !deadlineReached {
s.logger.Printf("[TRACE] nomad.drain: not draining batch job %s because deadline isn't for %s", job.Name, node.DrainStrategy.DeadlineTime().Sub(now)) s.logger.Printf("[TRACE] nomad.drain: not draining %s job %s because deadline isn't for %s",
job.Type, job.Name, node.DrainStrategy.DeadlineTime().Sub(now))
skipJob[jobkey] = struct{}{} skipJob[jobkey] = struct{}{}
continue continue
} }
...@@ -273,26 +266,21 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -273,26 +266,21 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
jobWatcher.watch(jobkey, nodeID) jobWatcher.watch(jobkey, nodeID)
} }
// if node has no allocs, it's done draining! // if node has no allocs or has hit its deadline, it's done draining!
if !allocsLeft { if !allocsLeft || deadlineReached {
s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain", nodeID) s.logger.Printf("[TRACE] nomad.drain: node %s has no more allocs left to drain or has reached deadline", nodeID)
jobWatcher.nodeDone(nodeID) jobWatcher.nodeDone(nodeID)
delete(nodes, nodeID)
doneNodes[nodeID] = node doneNodes[nodeID] = node
} }
} }
// stoplist are the allocations to stop and their jobs to emit // stoplist are the allocations to migrate and their jobs to emit
// evaluations for // evaluations for
stoplist := &stopAllocs{ stoplist := &stopAllocs{
allocBatch: make([]*structs.Allocation, 0, len(drainable)), allocBatch: make(map[string]*structs.DesiredTransition),
jobBatch: make(map[jobKey]*structs.Job), jobBatch: make(map[jobKey]*structs.Job),
} }
// deadlineNodes is a map of node IDs that have reached their
// deadline and allocs that will be stopped due to deadline
deadlineNodes := map[string]int{}
// build drain list considering deadline & max_parallel // build drain list considering deadline & max_parallel
for _, drainingJob := range drainable { for _, drainingJob := range drainable {
for _, alloc := range drainingJob.allocs { for _, alloc := range drainingJob.allocs {
...@@ -315,14 +303,13 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -315,14 +303,13 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
stoplist.add(drainingJob.job, alloc) stoplist.add(drainingJob.job, alloc)
upPerTG[tgKey]-- upPerTG[tgKey]--
deadlineNodes[node.ID]++
continue continue
} }
// Batch jobs are only stopped when the node // Batch/System jobs are only stopped when the
// deadline is reached which has already been // node deadline is reached which has already
// done. // been done.
if drainingJob.job.Type == structs.JobTypeBatch { if drainingJob.job.Type != structs.JobTypeService {
continue continue
} }
...@@ -360,32 +347,9 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -360,32 +347,9 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
} }
} }
// log drains due to node deadlines
for nodeID, remaining := range deadlineNodes {
s.logger.Printf("[DEBUG] nomad.drain: node %s drain deadline reached; stopping %d remaining allocs", nodeID, remaining)
jobWatcher.nodeDone(nodeID)
}
if len(stoplist.allocBatch) > 0 { if len(stoplist.allocBatch) > 0 {
s.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch)) s.logger.Printf("[DEBUG] nomad.drain: stopping %d alloc(s) for %d job(s)", len(stoplist.allocBatch), len(stoplist.jobBatch))
// Stop allocs in stoplist and add them to drainingAllocs + prevAllocWatcher
batch := &structs.AllocUpdateRequest{
Alloc: stoplist.allocBatch,
WriteRequest: structs.WriteRequest{Region: s.config.Region},
}
// Commit this update via Raft
//TODO Not the right request
_, index, err := s.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
//FIXME
panic(err)
}
//TODO i bet there's something useful to do with this index
_ = index
// Reevaluate affected jobs // Reevaluate affected jobs
evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch)) evals := make([]*structs.Evaluation, 0, len(stoplist.jobBatch))
for _, job := range stoplist.jobBatch { for _, job := range stoplist.jobBatch {
...@@ -401,17 +365,23 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -401,17 +365,23 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
}) })
} }
evalUpdate := &structs.EvalUpdateRequest{ // Send raft request
batch := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: stoplist.allocBatch,
Evals: evals, Evals: evals,
WriteRequest: structs.WriteRequest{Region: s.config.Region}, WriteRequest: structs.WriteRequest{Region: s.config.Region},
} }
// Commit this evaluation via Raft // Commit this update via Raft
_, _, err = s.raftApply(structs.EvalUpdateRequestType, evalUpdate) //TODO Not the right request
_, index, err := s.raftApply(structs.AllocUpdateDesiredTransitionRequestType, batch)
if err != nil { if err != nil {
//FIXME //FIXME
panic(err) panic(err)
} }
//TODO i bet there's something useful to do with this index
_ = index
} }
// Unset drain for nodes done draining // Unset drain for nodes done draining
...@@ -429,6 +399,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) { ...@@ -429,6 +399,7 @@ func (s *Server) startNodeDrainer(stopCh chan struct{}) {
panic(err) panic(err)
} }
s.logger.Printf("[INFO] nomad.drain: node %s (%s) completed draining", nodeID, node.Name) s.logger.Printf("[INFO] nomad.drain: node %s (%s) completed draining", nodeID, node.Name)
delete(nodes, nodeID)
} }
} }
} }
...@@ -529,8 +500,7 @@ func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) ...@@ -529,8 +500,7 @@ func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore)
return nil, 0, err return nil, 0, err
} }
//FIXME initial cap? resp := make([]*structs.Node, 0, 8)
resp := make([]*structs.Node, 0, 1)
for { for {
raw := iter.Next() raw := iter.Next()
......
...@@ -15,6 +15,8 @@ import ( ...@@ -15,6 +15,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/nomad/testutil/rpcapi" "github.com/hashicorp/nomad/testutil/rpcapi"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -188,9 +190,16 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { ...@@ -188,9 +190,16 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
t.Errorf("failed waiting for all allocs to migrate: %v", err) t.Errorf("failed waiting for all allocs to migrate: %v", err)
}) })
node1, err := rpc.NodeGet(c1.NodeID())
assert := assert.New(t)
require.Nil(err)
assert.False(node1.Node.Drain)
assert.Nil(node1.Node.DrainStrategy)
assert.Equal(structs.NodeSchedulingIneligible, node1.Node.SchedulingEligibility)
jobs, err := rpc.JobList() jobs, err := rpc.JobList()
require.Nil(err) require.Nil(err)
t.Logf("%d jobs", len(jobs.Jobs)) t.Logf("--> %d jobs", len(jobs.Jobs))
for _, job := range jobs.Jobs { for _, job := range jobs.Jobs {
t.Logf("job: %s status: %s %s", job.Name, job.Status, job.StatusDescription) t.Logf("job: %s status: %s %s", job.Name, job.Status, job.StatusDescription)
} }
...@@ -211,8 +220,9 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) { ...@@ -211,8 +220,9 @@ func TestNodeDrainer_SimpleDrain(t *testing.T) {
panic("unreachable") panic("unreachable")
}) })
t.Logf("%d allocs", len(allocs)) t.Logf("--> %d allocs", len(allocs))
for _, alloc := range allocs { for _, alloc := range allocs {
t.Logf("job: %s node: %s alloc: %s desired: %s actual: %s replaces: %s", alloc.Job.Name, alloc.NodeID[:6], alloc.ID, alloc.DesiredStatus, alloc.ClientStatus, alloc.PreviousAllocation) t.Logf("job: %s node: %s alloc: %s desired_status: %s desired_transition: %s actual: %s replaces: %s",
alloc.Job.Name, alloc.NodeID[:6], alloc.ID[:6], alloc.DesiredStatus, pretty.Sprint(alloc.DesiredTransition.Migrate), alloc.ClientStatus, alloc.PreviousAllocation)
} }
} }
...@@ -240,7 +240,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { ...@@ -240,7 +240,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpsertNodeEvent(buf[1:], log.Index) return n.applyUpsertNodeEvent(buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType: case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index) return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransistionRequestType: case structs.AllocUpdateDesiredTransitionRequestType:
return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index) return n.applyAllocUpdateDesiredTransition(buf[1:], log.Index)
} }
...@@ -653,17 +653,22 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} ...@@ -653,17 +653,22 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
return nil return nil
} }
// applyAllocUpdateDesiredTransition is used to update the desired transistions // applyAllocUpdateDesiredTransition is used to update the desired transitions
// of a set of allocations. // of a set of allocations.
func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} { func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transistion"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now())
var req structs.AllocUpdateDesiredTransistionRequest var req structs.AllocUpdateDesiredTransitionRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
if err := n.state.UpdateAllocsDesiredTransistions(index, req.Allocs); err != nil { if err := n.state.UpdateAllocsDesiredTransitions(index, req.Allocs, req.Evals); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransistions failed: %v", err) n.logger.Printf("[ERR] nomad.fsm: UpdateAllocsDesiredTransitions failed: %v", err)
return err
}
if err := n.upsertEvals(index, req.Evals); err != nil {
n.logger.Printf("[ERR] nomad.fsm: AllocUpdateDesiredTransition failed to upsert %d eval(s): %v", len(req.Evals), err)
return err return err
} }
return nil return nil
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
...@@ -1241,7 +1242,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { ...@@ -1241,7 +1242,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
require.Equal(eval, res) require.Equal(eval, res)
} }
func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { func TestFSM_UpdateAllocDesiredTransition(t *testing.T) {
t.Parallel() t.Parallel()
fsm := testFSM(t) fsm := testFSM(t)
state := fsm.State() state := fsm.State()
...@@ -1254,17 +1255,28 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { ...@@ -1254,17 +1255,28 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) {
state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID)) state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2}) state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2})
t1 := &structs.DesiredTransistion{ t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true), Migrate: helper.BoolToPtr(true),
} }
req := structs.AllocUpdateDesiredTransistionRequest{ eval := &structs.Evaluation{
Allocs: map[string]*structs.DesiredTransistion{ ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
}
req := structs.AllocUpdateDesiredTransitionRequest{
Allocs: map[string]*structs.DesiredTransition{
alloc.ID: t1, alloc.ID: t1,
alloc2.ID: t1, alloc2.ID: t1,
}, },
Evals: []*structs.Evaluation{eval},
} }
buf, err := structs.Encode(structs.AllocUpdateDesiredTransistionRequestType, req) buf, err := structs.Encode(structs.AllocUpdateDesiredTransitionRequestType, req)
require.Nil(err) require.Nil(err)
resp := fsm.Apply(makeLog(buf)) resp := fsm.Apply(makeLog(buf))
...@@ -1276,11 +1288,13 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) { ...@@ -1276,11 +1288,13 @@ func TestFSM_UpdateAllocDesiredTransistion(t *testing.T) {
require.Nil(err) require.Nil(err)
out2, err := fsm.State().AllocByID(ws, alloc2.ID) out2, err := fsm.State().AllocByID(ws, alloc2.ID)
require.Nil(err) require.Nil(err)
_, err = fsm.State().EvalByID(ws, eval.ID)
require.Nil(err)
require.NotNil(out1.DesiredTransistion.Migrate) require.NotNil(out1.DesiredTransition.Migrate)
require.NotNil(out2.DesiredTransistion.Migrate) require.NotNil(out2.DesiredTransition.Migrate)
require.True(*out1.DesiredTransistion.Migrate) require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransistion.Migrate) require.True(*out2.DesiredTransition.Migrate)
} }
func TestFSM_UpsertVaultAccessor(t *testing.T) { func TestFSM_UpsertVaultAccessor(t *testing.T) {
......
...@@ -644,8 +644,9 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er ...@@ -644,8 +644,9 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er
} }
copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible
} else { } else {
// When stopping a drain unset the strategy but leave the node
// ineligible for scheduling
copyNode.DrainStrategy = nil copyNode.DrainStrategy = nil
copyNode.SchedulingEligibility = structs.NodeSchedulingEligible
} }
copyNode.ModifyIndex = index copyNode.ModifyIndex = index
...@@ -2008,15 +2009,17 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation ...@@ -2008,15 +2009,17 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return nil return nil
} }
// UpdateAllocsDesiredTransistions is used to update a set of allocations // UpdateAllocsDesiredTransitions is used to update a set of allocations
// desired transistions. // desired transitions.
func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[string]*structs.DesiredTransistion) error { func (s *StateStore) UpdateAllocsDesiredTransitions(index uint64, allocs map[string]*structs.DesiredTransition,
evals []*structs.Evaluation) error {
txn := s.db.Txn(true) txn := s.db.Txn(true)
defer txn.Abort() defer txn.Abort()
// Handle each of the updated allocations // Handle each of the updated allocations
for id, transistion := range allocs { for id, transition := range allocs {
if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transistion); err != nil { if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil {
return err return err
} }
} }
...@@ -2031,10 +2034,10 @@ func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[st ...@@ -2031,10 +2034,10 @@ func (s *StateStore) UpdateAllocsDesiredTransistions(index uint64, allocs map[st
} }
// nestedUpdateAllocDesiredTransition is used to nest an update of an // nestedUpdateAllocDesiredTransition is used to nest an update of an
// allocations desired transistion // allocations desired transition
func (s *StateStore) nestedUpdateAllocDesiredTransition( func (s *StateStore) nestedUpdateAllocDesiredTransition(
txn *memdb.Txn, index uint64, allocID string, txn *memdb.Txn, index uint64, allocID string,
transistion *structs.DesiredTransistion) error { transition *structs.DesiredTransition) error {
// Look for existing alloc // Look for existing alloc
existing, err := txn.First("allocs", "id", allocID) existing, err := txn.First("allocs", "id", allocID)
...@@ -2051,8 +2054,8 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( ...@@ -2051,8 +2054,8 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition(
// Copy everything from the existing allocation // Copy everything from the existing allocation
copyAlloc := exist.Copy() copyAlloc := exist.Copy()
// Merge the desired transistions // Merge the desired transitions
copyAlloc.DesiredTransistion.Merge(transistion) copyAlloc.DesiredTransition.Merge(transition)
// Update the modify index // Update the modify index
copyAlloc.ModifyIndex = index copyAlloc.ModifyIndex = index
......
...@@ -3823,7 +3823,7 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) { ...@@ -3823,7 +3823,7 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) {
} }
} }
func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { func TestStateStore_UpdateAllocDesiredTransition(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
...@@ -3833,21 +3833,32 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { ...@@ -3833,21 +3833,32 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
require.Nil(state.UpsertJob(999, alloc.Job)) require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc})) require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))
t1 := &structs.DesiredTransistion{ t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true), Migrate: helper.BoolToPtr(true),
} }
t2 := &structs.DesiredTransistion{ t2 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(false), Migrate: helper.BoolToPtr(false),
} }
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
}
evals := []*structs.Evaluation{eval}
m := map[string]*structs.DesiredTransistion{alloc.ID: t1} m := map[string]*structs.DesiredTransition{alloc.ID: t1}
require.Nil(state.UpdateAllocsDesiredTransistions(1001, m)) require.Nil(state.UpdateAllocsDesiredTransitions(1001, m, evals))
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID) out, err := state.AllocByID(ws, alloc.ID)
require.Nil(err) require.Nil(err)
require.NotNil(out.DesiredTransistion.Migrate) require.NotNil(out.DesiredTransition.Migrate)
require.True(*out.DesiredTransistion.Migrate) require.True(*out.DesiredTransition.Migrate)
require.EqualValues(1000, out.CreateIndex) require.EqualValues(1000, out.CreateIndex)
require.EqualValues(1001, out.ModifyIndex) require.EqualValues(1001, out.ModifyIndex)
...@@ -3855,14 +3866,14 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { ...@@ -3855,14 +3866,14 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
require.Nil(err) require.Nil(err)
require.EqualValues(1001, index) require.EqualValues(1001, index)
m = map[string]*structs.DesiredTransistion{alloc.ID: t2} m = map[string]*structs.DesiredTransition{alloc.ID: t2}
require.Nil(state.UpdateAllocsDesiredTransistions(1002, m)) require.Nil(state.UpdateAllocsDesiredTransitions(1002, m, evals))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
out, err = state.AllocByID(ws, alloc.ID) out, err = state.AllocByID(ws, alloc.ID)
require.Nil(err) require.Nil(err)
require.NotNil(out.DesiredTransistion.Migrate) require.NotNil(out.DesiredTransition.Migrate)
require.False(*out.DesiredTransistion.Migrate) require.False(*out.DesiredTransition.Migrate)
require.EqualValues(1000, out.CreateIndex) require.EqualValues(1000, out.CreateIndex)
require.EqualValues(1002, out.ModifyIndex) require.EqualValues(1002, out.ModifyIndex)
...@@ -3871,8 +3882,8 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) { ...@@ -3871,8 +3882,8 @@ func TestStateStore_UpdateAllocDesiredTransistion(t *testing.T) {
require.EqualValues(1002, index) require.EqualValues(1002, index)
// Try with a bogus alloc id // Try with a bogus alloc id
m = map[string]*structs.DesiredTransistion{uuid.Generate(): t2} m = map[string]*structs.DesiredTransition{uuid.Generate(): t2}
require.Nil(state.UpdateAllocsDesiredTransistions(1003, m)) require.Nil(state.UpdateAllocsDesiredTransitions(1003, m, evals))
} }
func TestStateStore_JobSummary(t *testing.T) { func TestStateStore_JobSummary(t *testing.T) {
......
...@@ -78,7 +78,7 @@ const ( ...@@ -78,7 +78,7 @@ const (
AutopilotRequestType AutopilotRequestType
UpsertNodeEventsType UpsertNodeEventsType
JobBatchDeregisterRequestType JobBatchDeregisterRequestType
AllocUpdateDesiredTransistionRequestType AllocUpdateDesiredTransitionRequestType
) )
const ( const (
...@@ -574,12 +574,15 @@ type AllocUpdateRequest struct { ...@@ -574,12 +574,15 @@ type AllocUpdateRequest struct {
WriteRequest WriteRequest
} }
// AllocUpdateDesiredTransistionRequest is used to submit changes to allocations // AllocUpdateDesiredTransitionRequest is used to submit changes to allocations
// desired transistion state. // desired transition state.
type AllocUpdateDesiredTransistionRequest struct { type AllocUpdateDesiredTransitionRequest struct {
// Allocs is the mapping of allocation ids to their desired state // Allocs is the mapping of allocation ids to their desired state
// transistion // transition
Allocs map[string]*DesiredTransistion Allocs map[string]*DesiredTransition
// Evals is the set of evaluations to create
Evals []*Evaluation
WriteRequest WriteRequest
} }
...@@ -5349,10 +5352,10 @@ func (re *RescheduleEvent) Copy() *RescheduleEvent { ...@@ -5349,10 +5352,10 @@ func (re *RescheduleEvent) Copy() *RescheduleEvent {
return copy return copy
} }
// DesiredTransistion is used to mark an allocation as having a desired state // DesiredTransition is used to mark an allocation as having a desired state
// transistion. This information can be used by the scheduler to make the // transition. This information can be used by the scheduler to make the
// correct decision. // correct decision.
type DesiredTransistion struct { type DesiredTransition struct {
// Migrate is used to indicate that this allocation should be stopped and // Migrate is used to indicate that this allocation should be stopped and
// migrated to another node. // migrated to another node.
Migrate *bool Migrate *bool
...@@ -5360,14 +5363,14 @@ type DesiredTransistion struct { ...@@ -5360,14 +5363,14 @@ type DesiredTransistion struct {
// Merge merges the two desired transitions, preferring the values from the // Merge merges the two desired transitions, preferring the values from the
// passed in object. // passed in object.
func (d *DesiredTransistion) Merge(o *DesiredTransistion) { func (d *DesiredTransition) Merge(o *DesiredTransition) {
if o.Migrate != nil { if o.Migrate != nil {
d.Migrate = o.Migrate d.Migrate = o.Migrate
} }
} }
// ShouldMigrate returns whether the transistion object dictates a migration. // ShouldMigrate returns whether the transition object dictates a migration.
func (d *DesiredTransistion) ShouldMigrate() bool { func (d *DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate return d.Migrate != nil && *d.Migrate
} }
...@@ -5432,9 +5435,9 @@ type Allocation struct { ...@@ -5432,9 +5435,9 @@ type Allocation struct {
// DesiredStatusDescription is meant to provide more human useful information // DesiredStatusDescription is meant to provide more human useful information
DesiredDescription string DesiredDescription string
// DesiredTransistion is used to indicate that a state transistion // DesiredTransition is used to indicate that a state transition
// is desired for a given reason. // is desired for a given reason.
DesiredTransistion DesiredTransistion DesiredTransition DesiredTransition
// Status of the allocation on the client // Status of the allocation on the client
ClientStatus string ClientStatus string
......
...@@ -2245,7 +2245,7 @@ func TestServiceSched_NodeDown(t *testing.T) { ...@@ -2245,7 +2245,7 @@ func TestServiceSched_NodeDown(t *testing.T) {
// Mark appropriate allocs for migration // Mark appropriate allocs for migration
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
out := allocs[i] out := allocs[i]
out.DesiredTransistion.Migrate = helper.BoolToPtr(true) out.DesiredTransition.Migrate = helper.BoolToPtr(true)
} }
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
...@@ -2367,7 +2367,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { ...@@ -2367,7 +2367,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
alloc.JobID = job.ID alloc.JobID = job.ID
alloc.NodeID = node.ID alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
allocs = append(allocs, alloc) allocs = append(allocs, alloc)
} }
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
...@@ -2453,7 +2453,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { ...@@ -2453,7 +2453,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
for i := 0; i < 6; i++ { for i := 0; i < 6; i++ {
newAlloc := allocs[i].Copy() newAlloc := allocs[i].Copy()
newAlloc.ClientStatus = structs.AllocDesiredStatusStop newAlloc.ClientStatus = structs.AllocDesiredStatusStop
newAlloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) newAlloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
stop = append(stop, newAlloc) stop = append(stop, newAlloc)
} }
noErr(t, h.State.UpsertAllocs(h.NextIndex(), stop)) noErr(t, h.State.UpsertAllocs(h.NextIndex(), stop))
...@@ -2556,7 +2556,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { ...@@ -2556,7 +2556,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
alloc.JobID = job.ID alloc.JobID = job.ID
alloc.NodeID = node.ID alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
allocs = append(allocs, alloc) allocs = append(allocs, alloc)
} }
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
...@@ -3948,7 +3948,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { ...@@ -3948,7 +3948,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) {
alloc.NodeID = node.ID alloc.NodeID = node.ID
alloc.Job.TaskGroups[0].Count = 1 alloc.Job.TaskGroups[0].Count = 1
alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertJob(h.NextIndex(), alloc.Job)) noErr(t, h.State.UpsertJob(h.NextIndex(), alloc.Job))
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
......
...@@ -927,7 +927,7 @@ func TestReconciler_DrainNode(t *testing.T) { ...@@ -927,7 +927,7 @@ func TestReconciler_DrainNode(t *testing.T) {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
n := mock.Node() n := mock.Node()
n.ID = allocs[i].NodeID n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true n.Drain = true
tainted[n.ID] = n tainted[n.ID] = n
} }
...@@ -980,7 +980,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { ...@@ -980,7 +980,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
n := mock.Node() n := mock.Node()
n.ID = allocs[i].NodeID n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true n.Drain = true
tainted[n.ID] = n tainted[n.ID] = n
} }
...@@ -1034,7 +1034,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { ...@@ -1034,7 +1034,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
n := mock.Node() n := mock.Node()
n.ID = allocs[i].NodeID n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true n.Drain = true
tainted[n.ID] = n tainted[n.ID] = n
} }
...@@ -2216,7 +2216,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) { ...@@ -2216,7 +2216,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
n := mock.Node() n := mock.Node()
n.ID = allocs[i].NodeID n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true n.Drain = true
tainted[n.ID] = n tainted[n.ID] = n
} }
...@@ -2290,7 +2290,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { ...@@ -2290,7 +2290,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
tainted := make(map[string]*structs.Node, 1) tainted := make(map[string]*structs.Node, 1)
n := mock.Node() n := mock.Node()
n.ID = allocs[11].NodeID n.ID = allocs[11].NodeID
allocs[11].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true n.Drain = true
tainted[n.ID] = n tainted[n.ID] = n
...@@ -3030,7 +3030,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { ...@@ -3030,7 +3030,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
n.Status = structs.NodeStatusDown n.Status = structs.NodeStatusDown
} else { } else {
n.Drain = true n.Drain = true
allocs[2+i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true)
} }
tainted[n.ID] = n tainted[n.ID] = n
} }
...@@ -3116,7 +3116,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) { ...@@ -3116,7 +3116,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) {
n.Status = structs.NodeStatusDown n.Status = structs.NodeStatusDown
} else { } else {
n.Drain = true n.Drain = true
allocs[6+i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true)
} }
tainted[n.ID] = n tainted[n.ID] = n
} }
...@@ -3442,7 +3442,7 @@ func TestReconciler_TaintedNode_MultiGroups(t *testing.T) { ...@@ -3442,7 +3442,7 @@ func TestReconciler_TaintedNode_MultiGroups(t *testing.T) {
for i := 0; i < 15; i++ { for i := 0; i < 15; i++ {
n := mock.Node() n := mock.Node()
n.ID = allocs[i].NodeID n.ID = allocs[i].NodeID
allocs[i].DesiredTransistion.Migrate = helper.BoolToPtr(true) allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true)
n.Drain = true n.Drain = true
tainted[n.ID] = n tainted[n.ID] = n
} }
......
...@@ -218,7 +218,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi ...@@ -218,7 +218,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
if !alloc.TerminalStatus() { if !alloc.TerminalStatus() {
if n == nil || n.TerminalStatus() { if n == nil || n.TerminalStatus() {
lost[alloc.ID] = alloc lost[alloc.ID] = alloc
} else if alloc.DesiredTransistion.ShouldMigrate() { } else if alloc.DesiredTransition.ShouldMigrate() {
migrate[alloc.ID] = alloc migrate[alloc.ID] = alloc
} else { } else {
untainted[alloc.ID] = alloc untainted[alloc.ID] = alloc
......
...@@ -972,7 +972,7 @@ func TestSystemSched_NodeDown(t *testing.T) { ...@@ -972,7 +972,7 @@ func TestSystemSched_NodeDown(t *testing.T) {
alloc.JobID = job.ID alloc.JobID = job.ID
alloc.NodeID = node.ID alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]" alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain // Create a mock evaluation to deal with drain
...@@ -1101,7 +1101,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { ...@@ -1101,7 +1101,7 @@ func TestSystemSched_NodeDrain(t *testing.T) {
alloc.JobID = job.ID alloc.JobID = job.ID
alloc.NodeID = node.ID alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]" alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain // Create a mock evaluation to deal with drain
...@@ -1415,7 +1415,7 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { ...@@ -1415,7 +1415,7 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) {
alloc.JobID = job.ID alloc.JobID = job.ID
alloc.NodeID = node.ID alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]" alloc.Name = "my-job.web[0]"
alloc.DesiredTransistion.Migrate = helper.BoolToPtr(true) alloc.DesiredTransition.Migrate = helper.BoolToPtr(true)
alloc.TaskGroup = "web" alloc.TaskGroup = "web"
alloc2 := mock.Alloc() alloc2 := mock.Alloc()
......
...@@ -111,7 +111,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, ...@@ -111,7 +111,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
TaskGroup: tg, TaskGroup: tg,
Alloc: exist, Alloc: exist,
}) })
} else if exist.DesiredTransistion.ShouldMigrate() { } else if exist.DesiredTransition.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{ result.migrate = append(result.migrate, allocTuple{
Name: name, Name: name,
TaskGroup: tg, TaskGroup: tg,
......
...@@ -91,7 +91,7 @@ func TestDiffAllocs(t *testing.T) { ...@@ -91,7 +91,7 @@ func TestDiffAllocs(t *testing.T) {
NodeID: "drainNode", NodeID: "drainNode",
Name: "my-job.web[2]", Name: "my-job.web[2]",
Job: oldJob, Job: oldJob,
DesiredTransistion: structs.DesiredTransistion{ DesiredTransition: structs.DesiredTransition{
Migrate: helper.BoolToPtr(true), Migrate: helper.BoolToPtr(true),
}, },
}, },
...@@ -223,7 +223,7 @@ func TestDiffSystemAllocs(t *testing.T) { ...@@ -223,7 +223,7 @@ func TestDiffSystemAllocs(t *testing.T) {
NodeID: drainNode.ID, NodeID: drainNode.ID,
Name: "my-job.web[0]", Name: "my-job.web[0]",
Job: oldJob, Job: oldJob,
DesiredTransistion: structs.DesiredTransistion{ DesiredTransition: structs.DesiredTransition{
Migrate: helper.BoolToPtr(true), Migrate: helper.BoolToPtr(true),
}, },
}, },
......
...@@ -72,6 +72,21 @@ func (r *RPC) AllocGetAllocs(ids []string) (*structs.AllocsGetResponse, error) { ...@@ -72,6 +72,21 @@ func (r *RPC) AllocGetAllocs(ids []string) (*structs.AllocsGetResponse, error) {
return &resp, nil return &resp, nil
} }
// Eval.List RPC
func (r *RPC) EvalList() (*structs.EvalListResponse, error) {
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: r.Region,
Namespace: r.Namespace,
},
}
var resp structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(r.codec, "Eval.List", get, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// Job.List RPC // Job.List RPC
func (r *RPC) JobList() (*structs.JobListResponse, error) { func (r *RPC) JobList() (*structs.JobListResponse, error) {
get := &structs.JobListRequest{ get := &structs.JobListRequest{
...@@ -112,3 +127,16 @@ func (r *RPC) NodeGetAllocs(nodeID string) (*structs.NodeAllocsResponse, error) ...@@ -112,3 +127,16 @@ func (r *RPC) NodeGetAllocs(nodeID string) (*structs.NodeAllocsResponse, error)
} }
return &resp, nil return &resp, nil
} }
// Node.GetNode RPC
func (r *RPC) NodeGet(nodeID string) (*structs.SingleNodeResponse, error) {
get := &structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{Region: r.Region},
}
var resp structs.SingleNodeResponse
if err := msgpackrpc.CallWithCodec(r.codec, "Node.GetNode", get, &resp); err != nil {
return nil, err
}
return &resp, nil
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment