Commit aff85141 authored by Armon Dadgar's avatar Armon Dadgar
Browse files

nomad: adding client drain endpoint

parent 9f76b6a9
Showing with 105 additions and 0 deletions
+105 -0
......@@ -179,6 +179,59 @@ func (c *ClientEndpoint) UpdateStatus(args *structs.NodeUpdateStatusRequest, rep
return nil
}
// UpdateDrain is used to update the drain mode of a client node
func (c *ClientEndpoint) UpdateDrain(args *structs.NodeUpdateDrainRequest,
reply *structs.NodeDrainUpdateResponse) error {
if done, err := c.srv.forward("Client.UpdateDrain", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "update_drain"}, time.Now())
// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for drain update")
}
// Look for the node
snap, err := c.srv.fsm.State().Snapshot()
if err != nil {
return err
}
node, err := snap.GetNodeByID(args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
// Commit this update via Raft
var index uint64
if node.Drain != args.Drain {
_, index, err = c.srv.raftApply(structs.NodeUpdateDrainRequestType, args)
if err != nil {
c.srv.logger.Printf("[ERR] nomad.client: drain update failed: %v", err)
return err
}
reply.NodeModifyIndex = index
}
// Check if we should trigger evaluations
if args.Drain {
evalIDs, evalIndex, err := c.createNodeEvals(args.NodeID, index)
if err != nil {
c.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err)
return err
}
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
}
// Set the reply index
reply.Index = index
return nil
}
// Evaluate is used to force a re-evaluation of the node
func (c *ClientEndpoint) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error {
if done, err := c.srv.forward("Client.Evaluate", args, args, reply); done {
......
......@@ -195,6 +195,50 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) {
}
}
func TestClientEndpoint_UpdateDrain(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
var resp structs.NodeUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Client.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Update the status
dereg := &structs.NodeUpdateDrainRequest{
NodeID: node.ID,
Drain: true,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
var resp2 structs.NodeDrainUpdateResponse
if err := msgpackrpc.CallWithCodec(codec, "Client.UpdateDrain", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.GetNodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !out.Drain {
t.Fatalf("bad: %#v", out)
}
}
func TestClientEndpoint_GetNode(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
......
......@@ -307,6 +307,14 @@ type NodeUpdateResponse struct {
QueryMeta
}
// NodeDrainUpdateResponse is used to respond to a node drain update
type NodeDrainUpdateResponse struct {
EvalIDs []string
EvalCreateIndex uint64
NodeModifyIndex uint64
QueryMeta
}
// NodeAllocsResponse is used to return allocs for a single node
type NodeAllocsResponse struct {
Allocs []*Allocation
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment