Unverified Commit 688a7d3a authored by Michael Schurter's avatar Michael Schurter Committed by GitHub
Browse files

Merge pull request #4094 from hashicorp/b-drain-panic

 drain: fix double-close panic on drain future
parents 3643f116 717bd751
Showing with 160 additions and 124 deletions
+160 -124
......@@ -256,7 +256,7 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
// Submit the node transistions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(nodes) {
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
......@@ -326,7 +326,7 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
// Submit the node transistions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(done) {
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
......@@ -397,10 +397,11 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs
// Commit this update via Raft
var finalIndex uint64
for _, u := range partitionAllocDrain(transistions, evals) {
index, err := n.raft.AllocUpdateDesiredTransition(u.Transistions, u.Evals)
for _, u := range partitionAllocDrain(defaultMaxIdsPerTxn, transistions, evals) {
index, err := n.raft.AllocUpdateDesiredTransition(u.Transitions, u.Evals)
if err != nil {
future.Respond(index, err)
future.Respond(0, err)
return
}
finalIndex = index
}
......
......@@ -4,70 +4,70 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// maxIdsPerTxn is the maximum number of IDs that can be included in a
// single Raft transaction. This is to ensure that the Raft message does not
// become too large.
maxIdsPerTxn = (1024 * 256) / 36 // 0.25 MB of ids.
const (
// defaultMaxIdsPerTxn is the maximum number of IDs that can be included in a
// single Raft transaction. This is to ensure that the Raft message
// does not become too large.
defaultMaxIdsPerTxn = (1024 * 256) / 36 // 0.25 MB of ids.
)
// partitionIds takes a set of IDs and returns a partitioned view of them such
// that no batch would result in an overly large raft transaction.
func partitionIds(ids []string) [][]string {
func partitionIds(maxIds int, ids []string) [][]string {
index := 0
total := len(ids)
var partitions [][]string
for remaining := total - index; remaining > 0; remaining = total - index {
if remaining < maxIdsPerTxn {
if remaining < maxIds {
partitions = append(partitions, ids[index:])
break
} else {
partitions = append(partitions, ids[index:index+maxIdsPerTxn])
index += maxIdsPerTxn
partitions = append(partitions, ids[index:index+maxIds])
index += maxIds
}
}
return partitions
}
// transistionTuple is used to group desired transistions and evals
type transistionTuple struct {
Transistions map[string]*structs.DesiredTransition
Evals []*structs.Evaluation
// transitionTuple is used to group desired transitions and evals
type transitionTuple struct {
Transitions map[string]*structs.DesiredTransition
Evals []*structs.Evaluation
}
// partitionAllocDrain returns a list of alloc transistions and evals to apply
// partitionAllocDrain returns a list of alloc transitions and evals to apply
// in a single raft transaction.This is necessary to ensure that the Raft
// transaction does not become too large.
func partitionAllocDrain(transistions map[string]*structs.DesiredTransition,
evals []*structs.Evaluation) []*transistionTuple {
func partitionAllocDrain(maxIds int, transitions map[string]*structs.DesiredTransition,
evals []*structs.Evaluation) []*transitionTuple {
// Determine a stable ordering of the transistioning allocs
allocs := make([]string, 0, len(transistions))
for id := range transistions {
// Determine a stable ordering of the transitioning allocs
allocs := make([]string, 0, len(transitions))
for id := range transitions {
allocs = append(allocs, id)
}
var requests []*transistionTuple
var requests []*transitionTuple
submittedEvals, submittedTrans := 0, 0
for submittedEvals != len(evals) || submittedTrans != len(transistions) {
req := &transistionTuple{
Transistions: make(map[string]*structs.DesiredTransition),
for submittedEvals != len(evals) || submittedTrans != len(transitions) {
req := &transitionTuple{
Transitions: make(map[string]*structs.DesiredTransition),
}
requests = append(requests, req)
available := maxIdsPerTxn
available := maxIds
// Add the allocs first
if remaining := len(allocs) - submittedTrans; remaining > 0 {
if remaining <= available {
for _, id := range allocs[submittedTrans:] {
req.Transistions[id] = transistions[id]
req.Transitions[id] = transitions[id]
}
available -= remaining
submittedTrans += remaining
} else {
for _, id := range allocs[submittedTrans : submittedTrans+available] {
req.Transistions[id] = transistions[id]
req.Transitions[id] = transitions[id]
}
submittedTrans += available
......
......@@ -8,40 +8,38 @@ import (
)
func TestDrainer_PartitionAllocDrain(t *testing.T) {
t.Parallel()
// Set the max ids per reap to something lower.
old := maxIdsPerTxn
defer func() { maxIdsPerTxn = old }()
maxIdsPerTxn = 2
maxIdsPerTxn := 2
require := require.New(t)
transistions := map[string]*structs.DesiredTransition{"a": nil, "b": nil, "c": nil}
evals := []*structs.Evaluation{nil, nil, nil}
requests := partitionAllocDrain(transistions, evals)
requests := partitionAllocDrain(maxIdsPerTxn, transistions, evals)
require.Len(requests, 3)
first := requests[0]
require.Len(first.Transistions, 2)
require.Len(first.Transitions, 2)
require.Len(first.Evals, 0)
second := requests[1]
require.Len(second.Transistions, 1)
require.Len(second.Transitions, 1)
require.Len(second.Evals, 1)
third := requests[2]
require.Len(third.Transistions, 0)
require.Len(third.Transitions, 0)
require.Len(third.Evals, 2)
}
func TestDrainer_PartitionIds(t *testing.T) {
t.Parallel()
require := require.New(t)
// Set the max ids per reap to something lower.
old := maxIdsPerTxn
defer func() { maxIdsPerTxn = old }()
maxIdsPerTxn = 2
maxIdsPerTxn := 2
ids := []string{"1", "2", "3", "4", "5"}
requests := partitionIds(ids)
requests := partitionIds(maxIdsPerTxn, ids)
require.Len(requests, 3)
require.Len(requests[0], 2)
require.Len(requests[1], 2)
......
......@@ -18,10 +18,9 @@ import (
"github.com/stretchr/testify/require"
)
func allocPromoter(t *testing.T, ctx context.Context,
func allocPromoter(errCh chan<- error, ctx context.Context,
state *state.StateStore, codec rpc.ClientCodec, nodeID string,
logger *log.Logger) {
t.Helper()
nindex := uint64(1)
for {
......@@ -31,7 +30,8 @@ func allocPromoter(t *testing.T, ctx context.Context,
return
}
t.Fatalf("failed to get node allocs: %v", err)
errCh <- fmt.Errorf("failed to get node allocs: %v", err)
return
}
nindex = index
......@@ -67,12 +67,23 @@ func allocPromoter(t *testing.T, ctx context.Context,
if ctx.Err() == context.Canceled {
return
} else if err != nil {
require.Nil(t, err)
errCh <- err
}
}
}
}
// checkAllocPromoter is a small helper to return an error or nil from an error
// chan like the one given to the allocPromoter goroutine.
func checkAllocPromoter(errCh chan error) error {
select {
case err := <-errCh:
return err
default:
return nil
}
}
func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) {
resp, index, err := state.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx)
if err != nil {
......@@ -169,10 +180,11 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
......@@ -186,6 +198,9 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) {
// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
......@@ -422,14 +437,19 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
// Wait for the allocs to be stopped
var finalAllocs []*structs.Allocation
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
var err error
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
if err != nil {
......@@ -575,10 +595,11 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
// Wait for the service allocs to be stopped on the draining node
testutil.WaitForResult(func() (bool, error) {
......@@ -594,6 +615,9 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
......@@ -635,7 +659,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
}
// Test that transistions to force drain work.
func TestDrainer_Batch_TransistionToForce(t *testing.T) {
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
require := require.New(t)
......@@ -707,12 +731,17 @@ func TestDrainer_Batch_TransistionToForce(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
// Make sure the batch job isn't affected
testutil.AssertUntil(500*time.Millisecond, func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
......
package structs
// BatchFuture is used to wait on a batch update to complete
type BatchFuture struct {
doneCh chan struct{}
err error
index uint64
}
// NewBatchFuture creates a new batch future
func NewBatchFuture() *BatchFuture {
return &BatchFuture{
doneCh: make(chan struct{}),
}
}
// Wait is used to block for the future to complete and returns the error
func (b *BatchFuture) Wait() error {
<-b.doneCh
return b.err
}
// WaitCh is used to block for the future to complete
func (b *BatchFuture) WaitCh() <-chan struct{} {
return b.doneCh
}
// Error is used to return the error of the batch, only after Wait()
func (b *BatchFuture) Error() error {
return b.err
}
// Index is used to return the index of the batch, only after Wait()
func (b *BatchFuture) Index() uint64 {
return b.index
}
// Respond is used to unblock the future
func (b *BatchFuture) Respond(index uint64, err error) {
b.index = index
b.err = err
close(b.doneCh)
}
package structs
import (
"fmt"
"testing"
"time"
)
func TestBatchFuture(t *testing.T) {
t.Parallel()
bf := NewBatchFuture()
// Async respond to the future
expect := fmt.Errorf("testing")
go func() {
time.Sleep(10 * time.Millisecond)
bf.Respond(1000, expect)
}()
// Block for the result
start := time.Now()
err := bf.Wait()
diff := time.Since(start)
if diff < 5*time.Millisecond {
t.Fatalf("too fast")
}
// Check the results
if err != expect {
t.Fatalf("bad: %s", err)
}
if bf.Index() != 1000 {
t.Fatalf("bad: %d", bf.Index())
}
}
......@@ -7112,45 +7112,3 @@ type ACLTokenUpsertResponse struct {
Tokens []*ACLToken
WriteMeta
}
// BatchFuture is used to wait on a batch update to complete
type BatchFuture struct {
doneCh chan struct{}
err error
index uint64
}
// NewBatchFuture creates a new batch future
func NewBatchFuture() *BatchFuture {
return &BatchFuture{
doneCh: make(chan struct{}),
}
}
// Wait is used to block for the future to complete and returns the error
func (b *BatchFuture) Wait() error {
<-b.doneCh
return b.err
}
// WaitCh is used to block for the future to complete
func (b *BatchFuture) WaitCh() <-chan struct{} {
return b.doneCh
}
// Error is used to return the error of the batch, only after Wait()
func (b *BatchFuture) Error() error {
return b.err
}
// Index is used to return the index of the batch, only after Wait()
func (b *BatchFuture) Index() uint64 {
return b.index
}
// Respond is used to unblock the future
func (b *BatchFuture) Respond(index uint64, err error) {
b.index = index
b.err = err
close(b.doneCh)
}
......@@ -3652,34 +3652,6 @@ func TestNetworkResourcesEquals(t *testing.T) {
}
}
func TestBatchFuture(t *testing.T) {
t.Parallel()
bf := NewBatchFuture()
// Async respond to the future
expect := fmt.Errorf("testing")
go func() {
time.Sleep(10 * time.Millisecond)
bf.Respond(1000, expect)
}()
// Block for the result
start := time.Now()
err := bf.Wait()
diff := time.Since(start)
if diff < 5*time.Millisecond {
t.Fatalf("too fast")
}
// Check the results
if err != expect {
t.Fatalf("bad: %s", err)
}
if bf.Index() != 1000 {
t.Fatalf("bad: %d", bf.Index())
}
}
func TestNode_Canonicalize(t *testing.T) {
t.Parallel()
require := require.New(t)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment