Commit 50131290 authored by Michael Schurter's avatar Michael Schurter
Browse files

drain: stop sys jobs after drain completes

System allocs should be drained when a node's deadline is hit or when
all other allocs on the node have stopped/migrated.
parent 3583f853
Showing with 50 additions and 19 deletions
+50 -19
......@@ -233,18 +233,18 @@ func (n *NodeDrainer) run(ctx context.Context) {
// marks them for migration.
func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
// Retrieve the set of allocations that will be force stopped.
n.l.RLock()
var forceStop []*structs.Allocation
n.l.RLock()
for _, node := range nodes {
draining, ok := n.nodes[node]
if !ok {
n.logger.Printf("[DEBUG] nomad.node_drainer: skipping untracked deadlined node %q", node)
n.logger.Printf("[DEBUG] nomad.drain: skipping untracked deadlined node %q", node)
continue
}
allocs, err := draining.DeadlineAllocs()
allocs, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.node_drainer: failed to retrive allocs on deadlined node %q: %v", node, err)
n.logger.Printf("[ERR] nomad.drain: failed to retrive allocs on deadlined node %q: %v", node, err)
continue
}
......@@ -272,9 +272,11 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
nodes[alloc.NodeID] = struct{}{}
}
var done []string
var remainingAllocs []*structs.Allocation
// For each node, check if it is now done
n.l.RLock()
var done []string
for node := range nodes {
draining, ok := n.nodes[node]
if !ok {
......@@ -283,7 +285,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
isDone, err := draining.IsDone()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: checking if node %q is done draining: %v", node, err)
n.logger.Printf("[ERR] nomad.drain: error checking if node %q is done draining: %v", node, err)
continue
}
......@@ -292,9 +294,27 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
done = append(done, node)
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: node %q is done draining but encountered an error getting remaining allocs: %v", node, err)
continue
}
remainingAllocs = append(remainingAllocs, remaining...)
}
n.l.RUnlock()
// Stop any running system jobs on otherwise done nodes
if len(remainingAllocs) > 0 {
future := structs.NewBatchFuture()
n.drainAllocs(future, remainingAllocs)
if err := future.Wait(); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done nodes: %v",
len(remainingAllocs), err)
}
}
// Submit the node transistions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(done) {
......
......@@ -47,7 +47,9 @@ func (n *drainingNode) DeadlineTime() (bool, time.Time) {
return n.node.DrainStrategy.DeadlineTime()
}
// IsDone returns if the node is done draining
// IsDone returns if the node is done draining batch and service allocs. System
// allocs must be stopped before marking drain complete unless they're being
// ignored.
func (n *drainingNode) IsDone() (bool, error) {
n.l.RLock()
defer n.l.RUnlock()
......@@ -57,9 +59,6 @@ func (n *drainingNode) IsDone() (bool, error) {
return false, fmt.Errorf("node doesn't have a drain strategy set")
}
// Grab the relevant drain info
ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs
// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
......@@ -67,8 +66,9 @@ func (n *drainingNode) IsDone() (bool, error) {
}
for _, alloc := range allocs {
// Skip system if configured to
if alloc.Job.Type == structs.JobTypeSystem && ignoreSystem {
// System jobs are only stopped after a node is done draining
// everything else, so ignore them here.
if alloc.Job.Type == structs.JobTypeSystem {
continue
}
......@@ -82,9 +82,9 @@ func (n *drainingNode) IsDone() (bool, error) {
}
// TODO test that we return the right thing given the strategies
// DeadlineAllocs returns the set of allocations that should be drained given a
// node is at its deadline
func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
// RemainingAllocs returns the set of allocations remaining on a node that
// still need to be drained.
func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) {
n.l.RLock()
defer n.l.RUnlock()
......@@ -94,10 +94,6 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
}
// Grab the relevant drain info
inf, _ := n.node.DrainStrategy.DeadlineTime()
if inf {
return nil, nil
}
ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs
// Retrieve the allocs on the node
......
......@@ -87,6 +87,21 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
if done {
// Node is done draining. Stop remaining system allocs before
// marking node as complete.
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: error getting remaining allocs on drained node %q: %v",
node.ID, err)
} else if len(remaining) > 0 {
future := structs.NewBatchFuture()
n.drainAllocs(future, remaining)
if err := future.Wait(); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done node %q: %v",
len(remaining), node.ID, err)
}
}
index, err := n.raft.NodesDrainComplete([]string{node.ID})
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment