Commit 76194c74 authored by Michael Schurter's avatar Michael Schurter
Browse files

consul service hook

Deregistration works but difficult to test due to terminal updates not
being fully implemented in the new client/ar/tr.
parent bb273896
Showing with 399 additions and 54 deletions
+399 -54
......@@ -28,7 +28,6 @@ import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
......@@ -1495,14 +1494,15 @@ func (r *TaskRunner) startTask() error {
// registerServices and checks with Consul.
func (r *TaskRunner) registerServices(d driver.Driver, h driver.DriverHandle, n *cstructs.DriverNetwork) error {
var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
exec = h
}
interpolatedTask := interpolateServices(r.envBuilder.Build(), r.task)
taskServices := consul.NewTaskServices(r.alloc, interpolatedTask, r, exec, n)
return r.consul.RegisterTask(taskServices)
//var exec driver.ScriptExecutor
//if d.Abilities().Exec {
// // Allow set the script executor if the driver supports it
// exec = h
//}
//interpolatedTask := interpolateServices(r.envBuilder.Build(), r.task)
//taskServices := consul.NewTaskServices(r.alloc, interpolatedTask, r, exec, n)
panic("XXX broken during transition to allocrunnerv2")
return r.consul.RegisterTask(nil)
}
// interpolateServices interpolates tags in a service and checks with values from the
......@@ -1706,31 +1706,34 @@ func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor,
oldAlloc *structs.Allocation, oldTask *structs.Task,
newAlloc *structs.Allocation, newTask *structs.Task) error {
var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
exec = h
}
r.driverNetLock.Lock()
net := r.driverNet.Copy()
r.driverNetLock.Unlock()
oldTaskServices := consul.NewTaskServices(oldAlloc, oldTask, r, exec, net)
newTaskServices := consul.NewTaskServices(newAlloc, newTask, r, exec, net)
return r.consul.UpdateTask(oldTaskServices, newTaskServices)
//var exec driver.ScriptExecutor
//if d.Abilities().Exec {
// // Allow set the script executor if the driver supports it
// exec = h
//}
//r.driverNetLock.Lock()
//net := r.driverNet.Copy()
//r.driverNetLock.Unlock()
//oldTaskServices := consul.NewTaskServices(oldAlloc, oldTask, r, exec, net)
//newTaskServices := consul.NewTaskServices(newAlloc, newTask, r, exec, net)
panic("XXX broken during transition to allocrunnerv2")
//return r.consul.UpdateTask(oldTaskServices, newTaskServices)
return r.consul.UpdateTask(nil, nil)
}
// removeServices and checks from Consul. Handles interpolation and deleting
// Canary=true and Canary=false versions in case Canary=false is set at the
// same time as the alloc is stopped.
func (r *TaskRunner) removeServices() {
interpTask := interpolateServices(r.envBuilder.Build(), r.task)
taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil)
r.consul.RemoveTask(taskServices)
panic("XXX broken during transition to allocrunnerv2")
//interpTask := interpolateServices(r.envBuilder.Build(), r.task)
//taskServices := consul.NewTaskServices(r.alloc, interpTask, r, nil, nil)
//r.consul.RemoveTask(taskServices)
// Flip Canary and remove again in case canary is getting flipped at
// the same time as the alloc is being destroyed
taskServices.Canary = !taskServices.Canary
r.consul.RemoveTask(taskServices)
//taskServices.Canary = !taskServices.Canary
//r.consul.RemoveTask(taskServices)
}
// handleDestroy kills the task handle. In the case that killing fails,
......
......@@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
clientstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
......@@ -36,6 +37,10 @@ type allocRunner struct {
// stateUpdater is used to emit updated task state
stateUpdater cinterfaces.AllocStateHandler
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
// vaultClient is the used to manage Vault tokens
vaultClient vaultclient.VaultClient
......@@ -81,6 +86,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
id: alloc.ID,
alloc: alloc,
clientConfig: config.ClientConfig,
consulClient: config.Consul,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
......@@ -119,6 +125,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
VaultClient: ar.vaultClient,
}
......
......@@ -4,6 +4,7 @@ import (
"github.com/boltdb/bolt"
log "github.com/hashicorp/go-hclog"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -23,6 +24,9 @@ type Config struct {
// StateDB is used to store and restore state.
StateDB *bolt.DB
// Consul is the Consul client used to register task services and checks
Consul consul.ConsulServiceAPI
// Vault is the Vault client to use to retrieve Vault tokens
Vault vaultclient.VaultClient
......
......@@ -3,7 +3,9 @@ package interfaces
import (
"context"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
......@@ -74,7 +76,14 @@ type TaskPrestartHook interface {
}
type TaskPoststartRequest struct {
// Network info
// Exec hook (may be nil)
DriverExec driver.ScriptExecutor
// Network info (may be nil)
DriverNetwork *cstructs.DriverNetwork
// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
}
type TaskPoststartResponse struct{}
......@@ -107,6 +116,13 @@ type TaskExitedHook interface {
type TaskUpdateRequest struct {
VaultToken string
// Alloc is the current version of the allocation (may have been
// updated since the hook was created)
Alloc *structs.Allocation
// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
}
type TaskUpdateResponse struct{}
......
......@@ -9,7 +9,7 @@ import (
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
// Grab the handle
handle := tr.getDriverHandle()
handle, _ := tr.getDriverHandle()
// Check it is running
if handle == nil {
......@@ -41,7 +41,7 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
func (tr *TaskRunner) Signal(event *structs.TaskEvent, s os.Signal) error {
// Grab the handle
handle := tr.getDriverHandle()
handle, _ := tr.getDriverHandle()
// Check it is running
if handle == nil {
......@@ -57,7 +57,7 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s os.Signal) error {
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
// Grab the handle
handle := tr.getDriverHandle()
handle, _ := tr.getDriverHandle()
// Check if the handle is running
if handle == nil {
......
package taskrunner
import (
"context"
"fmt"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
type serviceHookConfig struct {
alloc *structs.Allocation
task *structs.Task
consul consul.ConsulServiceAPI
// Restarter is a subset of the TaskLifecycle interface
restarter agentconsul.TaskRestarter
logger log.Logger
}
type serviceHook struct {
consul consul.ConsulServiceAPI
allocID string
taskName string
restarter agentconsul.TaskRestarter
logger log.Logger
// The following fields may be updated
driverExec driver.ScriptExecutor
driverNet *cstructs.DriverNetwork
canary bool
services []*structs.Service
networks structs.Networks
taskEnv *env.TaskEnv
// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
mu sync.Mutex
}
func newServiceHook(c serviceHookConfig) *serviceHook {
h := &serviceHook{
consul: c.consul,
allocID: c.alloc.ID,
taskName: c.task.Name,
services: c.task.Services,
restarter: c.restarter,
}
if res := c.alloc.TaskResources[c.task.Name]; res != nil {
h.networks = res.Networks
}
if c.alloc.DeploymentStatus != nil && c.alloc.DeploymentStatus.Canary {
h.canary = true
}
h.logger = c.logger.Named(h.Name())
return h
}
func (h *serviceHook) Name() string {
return "consul_services"
}
func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Store the TaskEnv for interpolating now and when Updating
h.driverExec = req.DriverExec
h.driverNet = req.DriverNetwork
h.taskEnv = req.TaskEnv
// Create task services struct with request's driver metadata
taskServices := h.getTaskServices()
return h.consul.RegisterTask(taskServices)
}
func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// Create old task services struct with request's driver metadata as it
// can't change due to Updates
oldTaskServices := h.getTaskServices()
// Store new updated values out of request
canary := false
if req.Alloc.DeploymentStatus != nil {
canary = req.Alloc.DeploymentStatus.Canary
}
var networks structs.Networks
if res := req.Alloc.TaskResources[h.taskName]; res != nil {
networks = res.Networks
}
task := req.Alloc.LookupTask(h.taskName)
if task == nil {
return fmt.Errorf("task %q not found in updated alloc", h.taskName)
}
// Update service hook fields
h.taskEnv = req.TaskEnv
h.services = task.Services
h.networks = networks
h.canary = canary
// Create new task services struct with those new values
newTaskServices := h.getTaskServices()
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
}
func (h *serviceHook) Exited(ctx context.Context, req *interfaces.TaskExitedRequest, _ *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
taskServices := h.getTaskServices()
h.consul.RemoveTask(taskServices)
// Canary flag may be getting flipped when the alloc is being
// destroyed, so remove both variations of the service
taskServices.Canary = !taskServices.Canary
h.consul.RemoveTask(taskServices)
return nil
}
func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
// Interpolate with the task's environment
interpolatedServices := interpolateServices(h.taskEnv, h.services)
// Create task services struct with request's driver metadata
return &agentconsul.TaskServices{
AllocID: h.allocID,
Name: h.taskName,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
}
}
// interpolateServices returns an interpolated copy of services and checks with
// values from the task's environment.
func interpolateServices(taskEnv *env.TaskEnv, services []*structs.Service) []*structs.Service {
interpolated := make([]*structs.Service, len(services))
for i, origService := range services {
// Create a copy as we need to reinterpolate every time the
// environment changes
service := origService.Copy()
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
check.Type = taskEnv.ReplaceEnv(check.Type)
check.Command = taskEnv.ReplaceEnv(check.Command)
check.Args = taskEnv.ParseAndReplace(check.Args)
check.Path = taskEnv.ReplaceEnv(check.Path)
check.Protocol = taskEnv.ReplaceEnv(check.Protocol)
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
newVals := make([]string, len(vs))
for i, v := range vs {
newVals[i] = taskEnv.ReplaceEnv(v)
}
header[taskEnv.ReplaceEnv(k)] = newVals
}
check.Header = header
}
}
service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
interpolated[i] = service
}
return interpolated
}
......@@ -16,9 +16,11 @@ import (
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
clientstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
......@@ -100,8 +102,8 @@ type TaskRunner struct {
// driver is the driver for the task.
driver driver.Driver
// handle is the handle to the currently running driver
handle driver.DriverHandle
handle driver.DriverHandle // the handle to the running driver
driverNet *cstructs.DriverNetwork // driver network if one exists
handleLock sync.Mutex
// task is the task being run
......@@ -121,6 +123,10 @@ type TaskRunner struct {
// transistions.
runnerHooks []interfaces.TaskHook
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
// vaultClient is the client to use to derive and renew Vault tokens
vaultClient vaultclient.VaultClient
......@@ -137,6 +143,7 @@ type TaskRunner struct {
type Config struct {
Alloc *structs.Allocation
ClientConfig *config.Config
Consul consul.ConsulServiceAPI
Task *structs.Task
TaskDir *allocdir.TaskDir
Logger log.Logger
......@@ -174,6 +181,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
taskDir: config.TaskDir,
taskName: config.Task.Name,
envBuilder: envBuilder,
consulClient: config.Consul,
vaultClient: config.VaultClient,
//XXX Make a Copy to avoid races?
state: config.Alloc.TaskStates[config.Task.Name],
......@@ -278,12 +286,12 @@ MAIN:
}
// Grab the handle
handle = tr.getDriverHandle()
handle, _ = tr.getDriverHandle()
select {
case waitRes := <-handle.WaitCh():
// Clear the handle
tr.setDriverHandle(nil)
tr.clearDriverHandle()
// Store the wait result on the restart tracker
tr.restartTracker.SetWaitResult(waitRes)
......@@ -291,7 +299,9 @@ MAIN:
tr.logger.Debug("task killed")
}
// TODO Need to run exited hooks
if err := tr.exited(); err != nil {
tr.logger.Error("exited hooks failed", "error", err)
}
RESTART:
// Actually restart by sleeping and also watching for destroy events
......@@ -365,10 +375,8 @@ func (tr *TaskRunner) runDriver() error {
return err
}
// Grab the handle
tr.setDriverHandle(sresp.Handle)
//XXX need to capture the driver network
// Store the driver handle and associated metadata
tr.setDriverHandle(sresp.Handle, sresp.Network)
// Emit an event that we started
tr.SetState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
......
......@@ -2,6 +2,7 @@ package taskrunner
import (
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
......@@ -35,14 +36,26 @@ func (tr *TaskRunner) setVaultToken(token string) {
tr.vaultToken = token
}
func (tr *TaskRunner) getDriverHandle() driver.DriverHandle {
// getDriverHandle returns the DriverHandle and associated driver metadata (at
// this point just the network) if it exists.
func (tr *TaskRunner) getDriverHandle() (driver.DriverHandle, *cstructs.DriverNetwork) {
tr.handleLock.Lock()
defer tr.handleLock.Unlock()
return tr.handle
return tr.handle, tr.driverNet
}
func (tr *TaskRunner) setDriverHandle(handle driver.DriverHandle) {
func (tr *TaskRunner) setDriverHandle(handle driver.DriverHandle, net *cstructs.DriverNetwork) {
tr.handleLock.Lock()
defer tr.handleLock.Unlock()
tr.handle = handle
tr.driverNet = net
}
// clearDriverHandle clears the driver handle and associated driver metadata
// (driver network).
func (tr *TaskRunner) clearDriverHandle() {
tr.handleLock.Lock()
defer tr.handleLock.Unlock()
tr.handle = nil
tr.driverNet = nil
}
......@@ -5,6 +5,7 @@ import (
"fmt"
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -49,6 +50,17 @@ func (tr *TaskRunner) initHooks() {
envBuilder: tr.envBuilder,
}))
}
// If there are any services, add the hook
if len(task.Services) != 0 {
tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{
alloc: tr.Alloc(),
task: tr.Task(),
consul: tr.consulClient,
restarter: tr,
logger: hookLogger,
}))
}
}
// prestart is used to run the runners prestart hooks.
......@@ -159,6 +171,9 @@ func (tr *TaskRunner) poststart() error {
}()
}
handle, net := tr.getDriverHandle()
var merr multierror.Error
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskPoststartHook)
if !ok {
......@@ -172,11 +187,14 @@ func (tr *TaskRunner) poststart() error {
tr.logger.Trace("running poststart hook", "name", name, "start", start)
}
req := interfaces.TaskPoststartRequest{}
req := interfaces.TaskPoststartRequest{
DriverExec: handle,
DriverNetwork: net,
TaskEnv: tr.envBuilder.Build(),
}
var resp interfaces.TaskPoststartResponse
// XXX We shouldn't exit on the first one
if err := post.Poststart(tr.ctx, &req, &resp); err != nil {
return fmt.Errorf("poststart hook %q failed: %v", name, err)
merr.Errors = append(merr.Errors, fmt.Errorf("poststart hook %q failed: %v", name, err))
}
if tr.logger.IsTrace() {
......@@ -185,7 +203,48 @@ func (tr *TaskRunner) poststart() error {
}
}
return nil
return merr.ErrorOrNil()
}
// exited is used to run the exited hooks before a task is stopped.
func (tr *TaskRunner) exited() error {
if tr.logger.IsTrace() {
start := time.Now()
tr.logger.Trace("running exited hooks", "start", start)
defer func() {
end := time.Now()
tr.logger.Trace("finished exited hooks", "end", end, "duration", end.Sub(start))
}()
}
var merr multierror.Error
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskExitedHook)
if !ok {
continue
}
name := post.Name()
var start time.Time
if tr.logger.IsTrace() {
start = time.Now()
tr.logger.Trace("running exited hook", "name", name, "start", start)
}
req := interfaces.TaskExitedRequest{}
var resp interfaces.TaskExitedResponse
if err := post.Exited(tr.ctx, &req, &resp); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("exited hook %q failed: %v", name, err))
}
if tr.logger.IsTrace() {
end := time.Now()
tr.logger.Trace("finished exited hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return merr.ErrorOrNil()
}
// stop is used to run the stop hooks.
......@@ -199,6 +258,7 @@ func (tr *TaskRunner) stop() error {
}()
}
var merr multierror.Error
for _, hook := range tr.runnerHooks {
post, ok := hook.(interfaces.TaskStopHook)
if !ok {
......@@ -214,9 +274,8 @@ func (tr *TaskRunner) stop() error {
req := interfaces.TaskStopRequest{}
var resp interfaces.TaskStopResponse
// XXX We shouldn't exit on the first one
if err := post.Stop(tr.ctx, &req, &resp); err != nil {
return fmt.Errorf("stop hook %q failed: %v", name, err)
merr.Errors = append(merr.Errors, fmt.Errorf("stop hook %q failed: %v", name, err))
}
if tr.logger.IsTrace() {
......@@ -225,7 +284,7 @@ func (tr *TaskRunner) stop() error {
}
}
return nil
return merr.ErrorOrNil()
}
// update is used to run the runners update hooks.
......@@ -239,6 +298,10 @@ func (tr *TaskRunner) updateHooks() {
}()
}
// Prepare state needed by Update hooks
alloc := tr.Alloc()
// Execute Update hooks
for _, hook := range tr.runnerHooks {
upd, ok := hook.(interfaces.TaskUpdateHook)
if !ok {
......@@ -251,6 +314,8 @@ func (tr *TaskRunner) updateHooks() {
// Build the request
req := interfaces.TaskUpdateRequest{
VaultToken: tr.getVaultToken(),
Alloc: alloc,
TaskEnv: tr.envBuilder.Build(),
}
// Time the update hook
......
......@@ -766,6 +766,7 @@ func (c *Client) restoreState() error {
ClientConfig: c.config,
StateDB: c.stateDB,
StateUpdater: c,
Consul: c.consulService,
Vault: c.vaultClient,
}
c.configLock.RUnlock()
......@@ -1946,6 +1947,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
Logger: logger,
ClientConfig: c.config,
StateDB: c.stateDB,
Consul: c.consulService,
Vault: c.vaultClient,
StateUpdater: c,
}
......
......@@ -24,7 +24,7 @@ type ChecksAPI interface {
// TaskRestarter allows the checkWatcher to restart tasks.
type TaskRestarter interface {
Restart(source, reason string, failure bool)
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error
}
// checkRestart handles restarting a task if a check is unhealthy.
......@@ -60,7 +60,7 @@ type checkRestart struct {
//
// Returns true if a restart was triggered in which case this check should be
// removed (checks are added on task startup).
func (c *checkRestart) apply(now time.Time, status string) bool {
func (c *checkRestart) apply(ctx context.Context, now time.Time, status string) bool {
healthy := func() {
if !c.unhealthyState.IsZero() {
c.logger.Debug("canceling restart because check became healthy")
......@@ -104,7 +104,13 @@ func (c *checkRestart) apply(now time.Time, status string) bool {
// Tell TaskRunner to restart due to failure
const failure = true
c.task.Restart("healthcheck", fmt.Sprintf("check %q unhealthy", c.checkName), failure)
reason := fmt.Sprintf("healthcheck: check %q unhealthy", c.checkName)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reason)
err := c.task.Restart(ctx, event, failure)
if err != nil {
// Error restarting
return false
}
return true
}
......@@ -228,6 +234,11 @@ func (w *checkWatcher) Run(ctx context.Context) {
// Loop over watched checks and update their status from results
for cid, check := range checks {
// Shortcircuit if told to exit
if ctx.Err() != nil {
return
}
if _, ok := restartedTasks[check.taskKey]; ok {
// Check for this task already restarted; remove and skip check
delete(checks, cid)
......@@ -243,7 +254,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
continue
}
restarted := check.apply(now, result.Status)
restarted := check.apply(ctx, now, result.Status)
if restarted {
// Checks are registered+watched on
// startup, so it's safe to remove them
......
......@@ -6964,7 +6964,8 @@ type Allocation struct {
// COMPAT(0.11): Remove in 0.11
// Resources is the total set of resources allocated as part
// of this allocation of the task group.
// of this allocation of the task group. Dynamic ports will be set by
// the scheduler.
Resources *Resources
// COMPAT(0.11): Remove in 0.11
......@@ -6974,7 +6975,8 @@ type Allocation struct {
// COMPAT(0.11): Remove in 0.11
// TaskResources is the set of resources allocated to each
// task. These should sum to the total Resources.
// task. These should sum to the total Resources. Dynamic ports will be
// set by the scheduler.
TaskResources map[string]*Resources
// AllocatedResources is the total resources allocated for the task group.
......@@ -7375,6 +7377,21 @@ func (a *Allocation) ComparableResources() *ComparableResources {
}
}
// LookupTask by name from the Allocation. Returns nil if the Job is not set, the
// TaskGroup does not exist, or the task name cannot be found.
func (a *Allocation) LookupTask(name string) *Task {
if a.Job == nil {
return nil
}
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil {
return nil
}
return tg.LookupTask(name)
}
// Stub returns a list stub for the allocation
func (a *Allocation) Stub() *AllocListStub {
return &AllocListStub{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment