Unverified Commit 1b81bfb6 authored by Michael Schurter's avatar Michael Schurter Committed by GitHub
Browse files

Merge pull request #4053 from hashicorp/b-drain-sys-jobs-2

drain: fix draining of system jobs
Showing with 367 additions and 22 deletions
+367 -22
......@@ -410,7 +410,7 @@ func monitorDrain(output func(string), nodeClient *api.Nodes, nodeID string, ind
// Loop on alloc messages for a bit longer as we may have gotten the
// "node done" first (since the watchers run concurrently the events
// may be received out of order)
deadline := 250 * time.Millisecond
deadline := 500 * time.Millisecond
timer := time.NewTimer(deadline)
for {
select {
......
......@@ -233,18 +233,18 @@ func (n *NodeDrainer) run(ctx context.Context) {
// marks them for migration.
func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
// Retrieve the set of allocations that will be force stopped.
n.l.RLock()
var forceStop []*structs.Allocation
n.l.RLock()
for _, node := range nodes {
draining, ok := n.nodes[node]
if !ok {
n.logger.Printf("[DEBUG] nomad.node_drainer: skipping untracked deadlined node %q", node)
n.logger.Printf("[DEBUG] nomad.drain: skipping untracked deadlined node %q", node)
continue
}
allocs, err := draining.DeadlineAllocs()
allocs, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.node_drainer: failed to retrive allocs on deadlined node %q: %v", node, err)
n.logger.Printf("[ERR] nomad.drain: failed to retrive allocs on deadlined node %q: %v", node, err)
continue
}
......@@ -272,9 +272,11 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
nodes[alloc.NodeID] = struct{}{}
}
var done []string
var remainingAllocs []*structs.Allocation
// For each node, check if it is now done
n.l.RLock()
var done []string
for node := range nodes {
draining, ok := n.nodes[node]
if !ok {
......@@ -283,7 +285,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
isDone, err := draining.IsDone()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: checking if node %q is done draining: %v", node, err)
n.logger.Printf("[ERR] nomad.drain: error checking if node %q is done draining: %v", node, err)
continue
}
......@@ -292,9 +294,27 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
done = append(done, node)
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: node %q is done draining but encountered an error getting remaining allocs: %v", node, err)
continue
}
remainingAllocs = append(remainingAllocs, remaining...)
}
n.l.RUnlock()
// Stop any running system jobs on otherwise done nodes
if len(remainingAllocs) > 0 {
future := structs.NewBatchFuture()
n.drainAllocs(future, remainingAllocs)
if err := future.Wait(); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done nodes: %v",
len(remainingAllocs), err)
}
}
// Submit the node transistions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(done) {
......
......@@ -47,7 +47,9 @@ func (n *drainingNode) DeadlineTime() (bool, time.Time) {
return n.node.DrainStrategy.DeadlineTime()
}
// IsDone returns if the node is done draining
// IsDone returns if the node is done draining batch and service allocs. System
// allocs must be stopped before marking drain complete unless they're being
// ignored.
func (n *drainingNode) IsDone() (bool, error) {
n.l.RLock()
defer n.l.RUnlock()
......@@ -57,9 +59,6 @@ func (n *drainingNode) IsDone() (bool, error) {
return false, fmt.Errorf("node doesn't have a drain strategy set")
}
// Grab the relevant drain info
ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs
// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
......@@ -67,8 +66,9 @@ func (n *drainingNode) IsDone() (bool, error) {
}
for _, alloc := range allocs {
// Skip system if configured to
if alloc.Job.Type == structs.JobTypeSystem && ignoreSystem {
// System jobs are only stopped after a node is done draining
// everything else, so ignore them here.
if alloc.Job.Type == structs.JobTypeSystem {
continue
}
......@@ -81,10 +81,9 @@ func (n *drainingNode) IsDone() (bool, error) {
return true, nil
}
// TODO test that we return the right thing given the strategies
// DeadlineAllocs returns the set of allocations that should be drained given a
// node is at its deadline
func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
// RemainingAllocs returns the set of allocations remaining on a node that
// still need to be drained.
func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) {
n.l.RLock()
defer n.l.RUnlock()
......@@ -94,10 +93,6 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
}
// Grab the relevant drain info
inf, _ := n.node.DrainStrategy.DeadlineTime()
if inf {
return nil, nil
}
ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs
// Retrieve the allocs on the node
......@@ -124,7 +119,7 @@ func (n *drainingNode) DeadlineAllocs() ([]*structs.Allocation, error) {
return drain, nil
}
// RunningServices returns the set of jobs on the node
// RunningServices returns the set of service jobs on the node.
func (n *drainingNode) RunningServices() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()
......
package drainer
import (
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testDrainingNode creates a *drainingNode with a 1h deadline but no allocs
func testDrainingNode(t *testing.T) *drainingNode {
t.Helper()
sconfig := &state.StateStoreConfig{
LogOutput: testlog.NewWriter(t),
Region: "global",
}
state, err := state.NewStateStore(sconfig)
require.Nil(t, err)
node := mock.Node()
node.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: time.Hour,
},
ForceDeadline: time.Now().Add(time.Hour),
}
require.Nil(t, state.UpsertNode(100, node))
return NewDrainingNode(node, state)
}
func assertDrainingNode(t *testing.T, dn *drainingNode, isDone bool, remaining, running int) {
t.Helper()
done, err := dn.IsDone()
require.Nil(t, err)
assert.Equal(t, isDone, done, "IsDone mismatch")
allocs, err := dn.RemainingAllocs()
require.Nil(t, err)
assert.Len(t, allocs, remaining, "RemainingAllocs mismatch")
jobs, err := dn.RunningServices()
require.Nil(t, err)
assert.Len(t, jobs, running, "RunningServices mismatch")
}
func TestDrainingNode_Table(t *testing.T) {
cases := []struct {
name string
isDone bool
remaining int
running int
setup func(*testing.T, *drainingNode)
}{
{
name: "Empty",
isDone: true,
remaining: 0,
running: 0,
setup: func(*testing.T, *drainingNode) {},
},
{
name: "Batch",
isDone: false,
remaining: 1,
running: 0,
setup: func(t *testing.T, dn *drainingNode) {
alloc := mock.BatchAlloc()
alloc.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, alloc.Job))
require.Nil(t, dn.state.UpsertAllocs(102, []*structs.Allocation{alloc}))
},
},
{
name: "Service",
isDone: false,
remaining: 1,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
alloc := mock.Alloc()
alloc.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, alloc.Job))
require.Nil(t, dn.state.UpsertAllocs(102, []*structs.Allocation{alloc}))
},
},
{
name: "System",
isDone: true,
remaining: 1,
running: 0,
setup: func(t *testing.T, dn *drainingNode) {
alloc := mock.SystemAlloc()
alloc.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, alloc.Job))
require.Nil(t, dn.state.UpsertAllocs(102, []*structs.Allocation{alloc}))
},
},
{
name: "AllTerminal",
isDone: true,
remaining: 0,
running: 0,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
a.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, a.Job))
}
require.Nil(t, dn.state.UpsertAllocs(102, allocs))
// StateStore doesn't like inserting new allocs
// with a terminal status, so set the status in
// a second pass
for _, a := range allocs {
a.ClientStatus = structs.AllocClientStatusComplete
}
require.Nil(t, dn.state.UpsertAllocs(103, allocs))
},
},
{
name: "ServiceTerminal",
isDone: false,
remaining: 2,
running: 0,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
a.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, a.Job))
}
require.Nil(t, dn.state.UpsertAllocs(102, allocs))
// Set only the service job as terminal
allocs[0].ClientStatus = structs.AllocClientStatusComplete
require.Nil(t, dn.state.UpsertAllocs(103, allocs))
},
},
{
name: "AllTerminalButBatch",
isDone: false,
remaining: 1,
running: 0,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
a.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, a.Job))
}
require.Nil(t, dn.state.UpsertAllocs(102, allocs))
// Set only the service and batch jobs as terminal
allocs[0].ClientStatus = structs.AllocClientStatusComplete
allocs[2].ClientStatus = structs.AllocClientStatusComplete
require.Nil(t, dn.state.UpsertAllocs(103, allocs))
},
},
{
name: "AllTerminalButSystem",
isDone: true,
remaining: 1,
running: 0,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{mock.Alloc(), mock.BatchAlloc(), mock.SystemAlloc()}
for _, a := range allocs {
a.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, a.Job))
}
require.Nil(t, dn.state.UpsertAllocs(102, allocs))
// Set only the service and batch jobs as terminal
allocs[0].ClientStatus = structs.AllocClientStatusComplete
allocs[1].ClientStatus = structs.AllocClientStatusComplete
require.Nil(t, dn.state.UpsertAllocs(103, allocs))
},
},
{
name: "HalfTerminal",
isDone: false,
remaining: 3,
running: 1,
setup: func(t *testing.T, dn *drainingNode) {
allocs := []*structs.Allocation{
mock.Alloc(),
mock.BatchAlloc(),
mock.SystemAlloc(),
mock.Alloc(),
mock.BatchAlloc(),
mock.SystemAlloc(),
}
for _, a := range allocs {
a.NodeID = dn.node.ID
require.Nil(t, dn.state.UpsertJob(101, a.Job))
}
require.Nil(t, dn.state.UpsertAllocs(102, allocs))
// Set only the service and batch jobs as terminal
allocs[0].ClientStatus = structs.AllocClientStatusComplete
allocs[1].ClientStatus = structs.AllocClientStatusComplete
allocs[2].ClientStatus = structs.AllocClientStatusComplete
require.Nil(t, dn.state.UpsertAllocs(103, allocs))
},
},
}
// Default test drainingNode has no allocs, so it should be done and
// have no remaining allocs.
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
dn := testDrainingNode(t)
tc.setup(t, dn)
assertDrainingNode(t, dn, tc.isDone, tc.remaining, tc.running)
})
}
}
......@@ -87,6 +87,21 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
if done {
// Node is done draining. Stop remaining system allocs before
// marking node as complete.
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Printf("[ERR] nomad.drain: error getting remaining allocs on drained node %q: %v",
node.ID, err)
} else if len(remaining) > 0 {
future := structs.NewBatchFuture()
n.drainAllocs(future, remaining)
if err := future.Wait(); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to drain %d remaining allocs from done node %q: %v",
len(remaining), node.ID, err)
}
}
index, err := n.raft.NodesDrainComplete([]string{node.ID})
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
......
......@@ -387,6 +387,98 @@ func Alloc() *structs.Allocation {
return alloc
}
func BatchAlloc() *structs.Allocation {
alloc := &structs.Allocation{
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: "12345678-abcd-efab-cdef-123456789abc",
Namespace: structs.DefaultNamespace,
TaskGroup: "worker",
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
},
TaskResources: map[string]*structs.Resources{
"worker": {
CPU: 100,
MemoryMB: 100,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
},
},
},
},
SharedResources: &structs.Resources{
DiskMB: 150,
},
Job: BatchJob(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc.JobID = alloc.Job.ID
return alloc
}
func SystemAlloc() *structs.Allocation {
alloc := &structs.Allocation{
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: "12345678-abcd-efab-cdef-123456789abc",
Namespace: structs.DefaultNamespace,
TaskGroup: "web",
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
},
TaskResources: map[string]*structs.Resources{
"web": {
CPU: 500,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http", Value: 9876}},
},
},
},
},
SharedResources: &structs.Resources{
DiskMB: 150,
},
Job: SystemJob(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc.JobID = alloc.Job.ID
return alloc
}
func VaultAccessor() *structs.VaultAccessor {
return &structs.VaultAccessor{
Accessor: uuid.Generate(),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment