Commit 255df6c8 authored by Michael Schurter's avatar Michael Schurter
Browse files

wip - add migrate support to allow health watcher

parent 086ea327
Showing with 78 additions and 18 deletions
+78 -18
......@@ -31,7 +31,17 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
// See if we should watch the allocs health
alloc := r.Alloc()
if alloc.DeploymentID == "" || alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
if alloc.Job.Type == structs.JobTypeSystem || alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
// Neither deployments nor migrations apply to system jobs and
// we don't need to track allocations which already have a
// status
return
}
isDeploy := alloc.DeploymentID != ""
if isDeploy && alloc.Job.Type != structs.JobTypeService {
// Deployments don't track non-Service jobs
return
}
......@@ -39,7 +49,8 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
if tg == nil {
r.logger.Printf("[ERR] client.alloc_watcher: failed to lookup allocation's task group. Exiting watcher")
return
} else if tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual {
}
if isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
return
}
......@@ -47,14 +58,36 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
l := r.allocBroadcast.Listen()
defer l.Close()
// Define the deadline, health method, min healthy time from the
// deployment if this is a deployment; otherwise from the migration
// strategy.
var deadline time.Time
var useChecks bool
var minHealthyTime time.Duration
if isDeploy {
deadline = time.Now().Add(tg.Update.HealthyDeadline)
minHealthyTime = tg.Update.MinHealthyTime
useChecks = tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
} else {
strategy := tg.Migrate
if strategy == nil {
// For backwards compat with pre-0.8 allocations that
// don't have a migrate strategy set.
strategy = structs.DefaultMigrateStrategy()
}
deadline = time.Now().Add(strategy.HealthyDeadline)
minHealthyTime = strategy.MinHealthyTime
useChecks = strategy.HealthCheck == structs.MigrateStrategyHealthChecks
}
// Create a new context with the health deadline
deadline := time.Now().Add(tg.Update.HealthyDeadline)
healthCtx, healthCtxCancel := context.WithDeadline(ctx, deadline)
defer healthCtxCancel()
r.logger.Printf("[DEBUG] client.alloc_watcher: deadline (%v) for alloc %q is at %v", tg.Update.HealthyDeadline, alloc.ID, deadline)
// Create the health tracker object
tracker := newAllocHealthTracker(healthCtx, r.logger, alloc, l, r.consulClient)
tracker := newAllocHealthTracker(healthCtx, r.logger, alloc, l, r.consulClient, minHealthyTime, useChecks)
tracker.Start()
allocHealthy := false
......@@ -77,8 +110,8 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
r.allocHealth = helper.BoolToPtr(allocHealthy)
r.allocLock.Unlock()
// We are unhealthy so emit task events explaining why
if !allocHealthy {
// If deployment is unhealthy emit task events explaining why
if !allocHealthy && isDeploy {
r.taskLock.RLock()
for task, event := range tracker.TaskEvents() {
if tr, ok := r.tasks[task]; ok {
......@@ -107,6 +140,13 @@ type allocHealthTracker struct {
// tg is the task group we are tracking
tg *structs.TaskGroup
// minHealthyTime is the duration an alloc must remain healthy to be
// considered healthy
minHealthyTime time.Duration
// useChecks specifies whether to use Consul healh checks or not
useChecks bool
// consulCheckCount is the number of checks the task group will attempt to
// register
consulCheckCount int
......@@ -146,7 +186,8 @@ type allocHealthTracker struct {
// alloc listener and consul API object are given so that the watcher can detect
// health changes.
func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc *structs.Allocation,
allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI) *allocHealthTracker {
allocUpdates *cstructs.AllocListener, consulClient ConsulServiceAPI,
minHealthyTime time.Duration, useChecks bool) *allocHealthTracker {
a := &allocHealthTracker{
logger: logger,
......@@ -154,8 +195,11 @@ func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
allocUpdates: allocUpdates,
consulClient: consulClient,
//FIXME should i wrap all these parameters up in a struct?
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
}
a.taskHealth = make(map[string]*taskHealthState, len(a.tg.Tasks))
......@@ -176,7 +220,7 @@ func newAllocHealthTracker(parentCtx context.Context, logger *log.Logger, alloc
// Start starts the watcher.
func (a *allocHealthTracker) Start() {
go a.watchTaskEvents()
if a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks {
if a.useChecks {
go a.watchConsulEvents()
}
}
......@@ -210,7 +254,9 @@ func (a *allocHealthTracker) TaskEvents() map[string]string {
// Go through are task information and build the event map
for task, state := range a.taskHealth {
if e, ok := state.event(deadline, a.tg.Update); ok {
//FIXME skip this for migrations?
useChecks := a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
if e, ok := state.event(deadline, a.tg.Update.MinHealthyTime, useChecks); ok {
events[task] = e
}
}
......@@ -227,7 +273,7 @@ func (a *allocHealthTracker) setTaskHealth(healthy, terminal bool) {
// If we are marked healthy but we also require Consul to be healthy and it
// isn't yet, return, unless the task is terminal
requireConsul := a.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks && a.consulCheckCount > 0
requireConsul := a.useChecks && a.consulCheckCount > 0
if !terminal && healthy && requireConsul && !a.checksHealthy {
return
}
......@@ -337,7 +383,7 @@ func (a *allocHealthTracker) watchTaskEvents() {
// Set the timer since all tasks are started
if !latestStartTime.IsZero() {
allStartedTime = latestStartTime
healthyTimer.Reset(a.tg.Update.MinHealthyTime)
healthyTimer.Reset(a.minHealthyTime)
}
}
......@@ -453,7 +499,7 @@ OUTER:
}
primed = true
healthyTimer.Reset(a.tg.Update.MinHealthyTime)
healthyTimer.Reset(a.minHealthyTime)
}
}
}
......@@ -470,7 +516,7 @@ type taskHealthState struct {
// event takes the deadline time for the allocation to be healthy and the update
// strategy of the group. It returns true if the task has contributed to the
// allocation being unhealthy and if so, an event description of why.
func (t *taskHealthState) event(deadline time.Time, update *structs.UpdateStrategy) (string, bool) {
func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration, useChecks bool) (string, bool) {
requireChecks := false
desiredChecks := 0
for _, s := range t.task.Services {
......@@ -479,7 +525,7 @@ func (t *taskHealthState) event(deadline time.Time, update *structs.UpdateStrate
desiredChecks += nc
}
}
requireChecks = requireChecks && update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
requireChecks = requireChecks && useChecks
if t.state != nil {
if t.state.Failed {
......@@ -490,8 +536,9 @@ func (t *taskHealthState) event(deadline time.Time, update *structs.UpdateStrate
}
// We are running so check if we have been running long enough
if t.state.StartedAt.Add(update.MinHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", update.MinHealthyTime), true
//FIXME need minHealthyTime here
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
}
}
......
......@@ -2737,6 +2737,19 @@ type MigrateStrategy struct {
HealthyDeadline time.Duration
}
// DefaultMigrateStrategy is used for backwards compat with pre-0.8 Allocations
// that lack an update strategy.
//
// This function should match its counterpart in api/tasks.go
func DefaultMigrateStrategy() *MigrateStrategy {
return &MigrateStrategy{
MaxParallel: 1,
HealthCheck: MigrateStrategyHealthChecks,
MinHealthyTime: 10 * time.Second,
HealthyDeadline: 5 * time.Minute,
}
}
func (m *MigrateStrategy) Validate() error {
var mErr multierror.Error
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment