diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 46dcad696d4cc33519ad1e798fff574aae74e01b..f167cf1f5cbeffff93d35061246d759b4059d94e 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -233,18 +233,18 @@ func (n *NodeDrainer) run(ctx context.Context) { // marks them for migration. func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { // Retrieve the set of allocations that will be force stopped. - n.l.RLock() var forceStop []*structs.Allocation + n.l.RLock() for _, node := range nodes { draining, ok := n.nodes[node] if !ok { - n.logger.Printf("[DEBUG] nomad.node_drainer: skipping untracked deadlined node %q", node) + n.logger.Printf("[DEBUG] nomad.drain: skipping untracked deadlined node %q", node) continue } - allocs, err := draining.DeadlineAllocs() + allocs, err := draining.RemainingAllocs() if err != nil { - n.logger.Printf("[ERR] nomad.node_drainer: failed to retrive allocs on deadlined node %q: %v", node, err) + n.logger.Printf("[ERR] nomad.drain: failed to retrive allocs on deadlined node %q: %v", node, err) continue } @@ -272,9 +272,11 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { nodes[alloc.NodeID] = struct{}{} } + var done []string + var remainingAllocs []*structs.Allocation + // For each node, check if it is now done n.l.RLock() - var done []string for node := range nodes { draining, ok := n.nodes[node] if !ok { @@ -283,7 +285,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { isDone, err := draining.IsDone() if err != nil { - n.logger.Printf("[ERR] nomad.drain: checking if node %q is done draining: %v", node, err) + n.logger.Printf("[ERR] nomad.drain: error checking if node %q is done draining: %v", node, err) continue } @@ -292,9 +294,27 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { } done = append(done, node) + + remaining, err := draining.RemainingAllocs() + if err != nil { + n.logger.Printf("[ERR] nomad.drain: node %q is done draining but encountered an error getting remaining allocs: %v", node, err) + continue + } + + remainingAllocs = append(remainingAllocs, remaining...) } n.l.RUnlock() + // Stop any running system jobs on otherwise done nodes + if len(remainingAllocs) > 0 { + future := structs.NewBatchFuture() + n.drainAllocs(future, remainingAllocs) + if err := future.Wait(); err != nil { + n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done nodes: %v", + len(remainingAllocs), err) + } + } + // Submit the node transistions in a sharded form to ensure a reasonable // Raft transaction size. for _, nodes := range partitionIds(done) { diff --git a/nomad/drainer/draining_node.go b/nomad/drainer/draining_node.go index af5c094b808901c26ef06d1e4dda921272593ddd..28780e7d5824c0b6f67192333ab56a828072cb19 100644 --- a/nomad/drainer/draining_node.go +++ b/nomad/drainer/draining_node.go @@ -47,7 +47,9 @@ func (n *drainingNode) DeadlineTime() (bool, time.Time) { return n.node.DrainStrategy.DeadlineTime() } -// IsDone returns if the node is done draining +// IsDone returns if the node is done draining batch and service allocs. System +// allocs must be stopped before marking drain complete unless they're being +// ignored. func (n *drainingNode) IsDone() (bool, error) { n.l.RLock() defer n.l.RUnlock() @@ -57,9 +59,6 @@ func (n *drainingNode) IsDone() (bool, error) { return false, fmt.Errorf("node doesn't have a drain strategy set") } - // Grab the relevant drain info - ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs - // Retrieve the allocs on the node allocs, err := n.state.AllocsByNode(nil, n.node.ID) if err != nil { @@ -67,8 +66,9 @@ func (n *drainingNode) IsDone() (bool, error) { } for _, alloc := range allocs { - // Skip system if configured to - if alloc.Job.Type == structs.JobTypeSystem && ignoreSystem { + // System jobs are only stopped after a node is done draining + // everything else, so ignore them here. + if alloc.Job.Type == structs.JobTypeSystem { continue } @@ -82,9 +82,9 @@ func (n *drainingNode) IsDone() (bool, error) { } // TODO test that we return the right thing given the strategies -// DeadlineAllocs returns the set of allocations that should be drained given a -// node is at its deadline -func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { +// RemainingAllocs returns the set of allocations remaining on a node that +// still need to be drained. +func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) { n.l.RLock() defer n.l.RUnlock() @@ -94,10 +94,6 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) { } // Grab the relevant drain info - inf, _ := n.node.DrainStrategy.DeadlineTime() - if inf { - return nil, nil - } ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs // Retrieve the allocs on the node diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 97c6cf8b24ce7c9a631f56ffa83ebe07fba5393a..8b816ae1f610608347848f724b53740acb1dd879 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -87,6 +87,21 @@ func (n *NodeDrainer) Update(node *structs.Node) { } if done { + // Node is done draining. Stop remaining system allocs before + // marking node as complete. + remaining, err := draining.RemainingAllocs() + if err != nil { + n.logger.Printf("[ERR] nomad.drain: error getting remaining allocs on drained node %q: %v", + node.ID, err) + } else if len(remaining) > 0 { + future := structs.NewBatchFuture() + n.drainAllocs(future, remaining) + if err := future.Wait(); err != nil { + n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done node %q: %v", + len(remaining), node.ID, err) + } + } + index, err := n.raft.NodesDrainComplete([]string{node.ID}) if err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)