Commit 3dc8a05b authored by Alex Dadgar's avatar Alex Dadgar
Browse files

batch test

parent 18fb7e8a
Showing with 175 additions and 50 deletions
+175 -50
......@@ -8,14 +8,11 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// evalBatchDuration is the duration in which evaluations are batched before
// commiting to Raft.
evalBatchDuration = 200 * time.Millisecond
)
// EvalBatcher is used to batch the creation of evaluations
type EvalBatcher struct {
// batch is the batching duration
batch time.Duration
// raft is used to actually commit the evaluations
raft DeploymentRaftEndpoints
......@@ -34,11 +31,12 @@ type EvalBatcher struct {
// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
// create the evaluations and exits the batcher when the passed exit channel is
// closed.
func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
b := &EvalBatcher{
raft: raft,
ctx: ctx,
inCh: make(chan *structs.Evaluation, 10),
batch: batchDuration,
raft: raft,
ctx: ctx,
inCh: make(chan *structs.Evaluation, 10),
}
go b.batcher()
......@@ -49,11 +47,10 @@ func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatc
// tracks the evaluations creation.
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
b.l.Lock()
defer b.l.Unlock()
if b.f == nil {
b.f = NewEvalFuture()
}
b.l.Unlock()
b.inCh <- e
return b.f
......@@ -61,17 +58,26 @@ func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
// batcher is the long lived batcher goroutine
func (b *EvalBatcher) batcher() {
ticker := time.NewTicker(evalBatchDuration)
timer := time.NewTimer(b.batch)
evals := make(map[string]*structs.Evaluation)
for {
select {
case <-b.ctx.Done():
ticker.Stop()
timer.Stop()
return
case e := <-b.inCh:
if len(evals) == 0 {
if !timer.Stop() {
<-timer.C
}
timer.Reset(b.batch)
}
evals[e.DeploymentID] = e
case <-ticker.C:
case <-timer.C:
if len(evals) == 0 {
// Reset the timer
timer.Reset(b.batch)
continue
}
......@@ -83,6 +89,8 @@ func (b *EvalBatcher) batcher() {
// Shouldn't be possible but protect ourselves
if f == nil {
// Reset the timer
timer.Reset(b.batch)
continue
}
......@@ -97,6 +105,9 @@ func (b *EvalBatcher) batcher() {
// Reset the evals list
evals = make(map[string]*structs.Evaluation)
// Reset the timer
timer.Reset(b.batch)
}
}
}
......
......@@ -5,6 +5,7 @@ import (
"fmt"
"log"
"sync"
"time"
"golang.org/x/time/rate"
......@@ -54,9 +55,13 @@ type DeploymentStateWatchers interface {
}
const (
// limitStateQueriesPerSecond is the number of state queries allowed per
// LimitStateQueriesPerSecond is the number of state queries allowed per
// second
limitStateQueriesPerSecond = 15.0
LimitStateQueriesPerSecond = 15.0
// EvalBatchDuration is the duration in which evaluations are batched before
// commiting to Raft.
EvalBatchDuration = 250 * time.Millisecond
)
// Watcher is used to watch deployments and their allocations created
......@@ -69,6 +74,10 @@ type Watcher struct {
// queryLimiter is used to limit the rate of blocking queries
queryLimiter *rate.Limiter
// evalBatchDuration is the duration to batch eval creation across all
// deployment watchers
evalBatchDuration time.Duration
// raft contains the set of Raft endpoints that can be used by the
// deployments watcher
raft DeploymentRaftEndpoints
......@@ -92,17 +101,23 @@ type Watcher struct {
// NewDeploymentsWatcher returns a deployments watcher that is used to watch
// deployments and trigger the scheduler as needed.
func NewDeploymentsWatcher(logger *log.Logger, w DeploymentStateWatchers, raft DeploymentRaftEndpoints) *Watcher {
func NewDeploymentsWatcher(
logger *log.Logger,
w DeploymentStateWatchers,
raft DeploymentRaftEndpoints,
stateQueriesPerSecond float64,
evalBatchDuration time.Duration) *Watcher {
ctx, exitFn := context.WithCancel(context.Background())
return &Watcher{
queryLimiter: rate.NewLimiter(limitStateQueriesPerSecond, 100),
stateWatchers: w,
raft: raft,
watchers: make(map[string]*deploymentWatcher, 32),
evalBatcher: NewEvalBatcher(raft, ctx),
logger: logger,
ctx: ctx,
exitFn: exitFn,
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
evalBatchDuration: evalBatchDuration,
stateWatchers: w,
raft: raft,
watchers: make(map[string]*deploymentWatcher, 32),
evalBatcher: NewEvalBatcher(evalBatchDuration, raft, ctx),
logger: logger,
ctx: ctx,
exitFn: exitFn,
}
}
......@@ -136,7 +151,7 @@ func (w *Watcher) Flush() {
w.watchers = make(map[string]*deploymentWatcher, 32)
w.ctx, w.exitFn = context.WithCancel(context.Background())
w.evalBatcher = NewEvalBatcher(w.raft, w.ctx)
w.evalBatcher = NewEvalBatcher(w.evalBatchDuration, w.raft, w.ctx)
}
// watchDeployments is the long lived go-routine that watches for deployments to
......@@ -309,11 +324,7 @@ func (w *Watcher) PauseDeployment(req *structs.DeploymentPauseRequest, resp *str
// createEvaluation commits the given evaluation to Raft but batches the commit
// with other calls.
func (w *Watcher) createEvaluation(eval *structs.Evaluation) (uint64, error) {
w.l.Lock()
f := w.evalBatcher.CreateEval(eval)
w.l.Unlock()
return f.Results()
return w.evalBatcher.CreateEval(eval).Results()
}
// upsertJob commits the given job to Raft
......
......@@ -18,7 +18,7 @@ import (
func TestWatcher_WatchDeployments(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Return no allocations or evals
m.On("Allocations", mocker.Anything, mocker.Anything).Return(nil).Run(func(args mocker.Arguments) {
......@@ -100,7 +100,7 @@ func TestWatcher_WatchDeployments(t *testing.T) {
func TestWatcher_UnknownDeployment(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
w.SetEnabled(true)
// Set up the calls for retrieving deployments
......@@ -146,7 +146,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) {
func TestWatcher_SetAllocHealth_Unknown(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job, and a deployment
j := mock.Job()
......@@ -195,7 +195,7 @@ func TestWatcher_SetAllocHealth_Unknown(t *testing.T) {
func TestWatcher_SetAllocHealth_Healthy(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job, alloc, and a deployment
j := mock.Job()
......@@ -245,7 +245,7 @@ func TestWatcher_SetAllocHealth_Healthy(t *testing.T) {
func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job, alloc, and a deployment
j := mock.Job()
......@@ -302,7 +302,7 @@ func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) {
func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job, alloc, and a deployment
j := mock.Job()
......@@ -371,7 +371,7 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) {
func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job, canary alloc, and a deployment
j := mock.Job()
......@@ -430,7 +430,7 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) {
func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job, canary alloc, and a deployment
j := mock.Job()
......@@ -489,7 +489,7 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) {
func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job and a deployment
j := mock.Job()
......@@ -537,7 +537,7 @@ func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) {
func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job and a deployment
j := mock.Job()
......@@ -586,7 +586,7 @@ func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) {
func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job and a deployment
j := mock.Job()
......@@ -636,7 +636,7 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) {
func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, LimitStateQueriesPerSecond, EvalBatchDuration)
// Create a job and a deployment
j := mock.Job()
......@@ -686,7 +686,7 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) {
func TestDeploymentWatcher_Watch(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m)
w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, 1*time.Millisecond)
// Create a job, alloc, and a deployment
j := mock.Job()
......@@ -735,8 +735,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) {
HealthyAllocationIDs: []string{a.ID},
},
}
i := m.nextIndex()
assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req), "UpsertDeploymentAllocHealth")
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
}
// Wait for there to be one eval
......@@ -775,8 +774,7 @@ func TestDeploymentWatcher_Watch(t *testing.T) {
UnhealthyAllocationIDs: []string{a.ID},
},
}
i := m.nextIndex()
assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req2), "UpsertDeploymentAllocHealth")
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
// Wait for there to be one eval
testutil.WaitForResult(func() (bool, error) {
......@@ -813,3 +811,108 @@ func TestDeploymentWatcher_Watch(t *testing.T) {
}
// Test evaluations are batched between watchers
func TestWatcher_BatchEvals(t *testing.T) {
assert := assert.New(t)
m := newMockBackend(t)
w := NewDeploymentsWatcher(testLogger(), m, m, 1000.0, EvalBatchDuration)
// Create a job, alloc, for two deployments
j1 := mock.Job()
d1 := mock.Deployment()
d1.JobID = j1.ID
a1 := mock.Alloc()
a1.DeploymentID = d1.ID
j2 := mock.Job()
d2 := mock.Deployment()
d2.JobID = j2.ID
a2 := mock.Alloc()
a2.DeploymentID = d2.ID
assert.Nil(m.state.UpsertJob(m.nextIndex(), j1), "UpsertJob")
assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob")
assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d1, false), "UpsertDeployment")
assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d2, false), "UpsertDeployment")
assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs")
assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs")
// Assert the following methods will be called
m.On("List", mocker.Anything, mocker.Anything).Return(nil).Run(m.listFromState)
m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d1.ID)),
mocker.Anything).Return(nil).Run(m.allocationsFromState)
m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d2.ID)),
mocker.Anything).Return(nil).Run(m.allocationsFromState)
m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)),
mocker.Anything).Return(nil).Run(m.evaluationsFromState)
m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)),
mocker.Anything).Return(nil).Run(m.evaluationsFromState)
m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)),
mocker.Anything).Return(nil).Run(m.getJobFromState)
m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)),
mocker.Anything).Return(nil).Run(m.getJobFromState)
m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j1.ID)),
mocker.Anything).Return(nil).Run(m.getJobVersionsFromState)
m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j2.ID)),
mocker.Anything).Return(nil).Run(m.getJobVersionsFromState)
w.SetEnabled(true)
testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil },
func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") })
// Assert that we will get a createEvaluation call only once and it contains
// both deployments. This will verify that the watcher is batching
// allocation changes
m1 := matchUpsertEvals([]string{d1.ID, d2.ID})
m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once()
// Update the allocs health to healthy which should create an evaluation
req := &structs.ApplyDeploymentAllocHealthRequest{
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
DeploymentID: d1.ID,
HealthyAllocationIDs: []string{a1.ID},
},
}
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth")
req2 := &structs.ApplyDeploymentAllocHealthRequest{
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
DeploymentID: d2.ID,
HealthyAllocationIDs: []string{a2.ID},
},
}
assert.Nil(m.state.UpsertDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth")
// Wait for there to be one eval for each job
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
evals1, err := m.state.EvalsByJob(ws, j1.ID)
if err != nil {
return false, err
}
evals2, err := m.state.EvalsByJob(ws, j2.ID)
if err != nil {
return false, err
}
if l := len(evals1); l != 1 {
return false, fmt.Errorf("Got %d evals; want 1", l)
}
if l := len(evals2); l != 1 {
return false, fmt.Errorf("Got %d evals; want 1", l)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1))
testutil.WaitForResult(func() (bool, error) { return 2 == len(w.watchers), nil },
func(err error) { assert.Equal(2, len(w.watchers), "Should have 2 deployment") })
}
......@@ -15,7 +15,7 @@ import (
)
func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
return log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds)
}
type mockBackend struct {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment