Unverified Commit 84c9bc95 authored by Michael Schurter's avatar Michael Schurter Committed by GitHub
Browse files

Merge pull request #4792 from hashicorp/r-clientv2-rebased

AllocRunner v2 Feature Branch PR
parents fb3b8c09 2361c190
Showing with 2126 additions and 2363 deletions
+2126 -2363
## 0.9.0 (Unreleased)
__BACKWARDS INCOMPATIBILITIES:__
* core: Switch to structured logging using [go-hclog](https://github.com/hashicorp/go-hclog)
IMPROVEMENTS:
* core: Added advertise address to client node meta data [[GH-4390](https://github.com/hashicorp/nomad/issues/4390)]
* core: Added support for specifying node affinities. Affinities allow job operators to specify weighted placement preferences
according to different node attributes [[GH-4512](https://github.com/hashicorp/nomad/issues/4512)]
* core: Added support for spreading allocations across a specific attribute. Operators can specify spread
target percentages across failure domains such as datacenter or rack [[GH-4512](https://github.com/hashicorp/nomad/issues/4512)]
* client: Refactor client to support plugins and improve state handling [[GH-4792](https://github.com/hashicorp/nomad/pull/4792)]
* client: Extend timeout to 60 seconds for Windows CPU fingerprinting [[GH-4441](https://github.com/hashicorp/nomad/pull/4441)]
* driver/docker: Add support for specifying `cpu_cfs_period` in the Docker driver [[GH-4462](https://github.com/hashicorp/nomad/issues/4462)]
* telemetry: All client metrics include a new `node_class` tag [[GH-3882](https://github.com/hashicorp/nomad/issues/3882)]
......@@ -15,6 +19,7 @@ IMPROVEMENTS:
BUG FIXES:
* core: Fixed bug in reconciler where allocs already stopped were being unnecessarily updated [[GH-4764](https://github.com/hashicorp/nomad/issues/4764)]
* client: Fix an issue reloading the client config [[GH-4730](https://github.com/hashicorp/nomad/issues/4730)]
## 0.8.6 (September 26, 2018)
......
......@@ -134,7 +134,7 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
if err := c.RPC("ACL.ResolveToken", &req, &resp); err != nil {
// If we encounter an error but have a cached value, mask the error and extend the cache
if ok {
c.logger.Printf("[WARN] client: failed to resolve token, using expired cached value: %v", err)
c.logger.Warn("failed to resolve token, using expired cached value", "error", err)
cached := raw.(*cachedACLValue)
return cached.Token, nil
}
......@@ -198,7 +198,7 @@ func (c *Client) resolvePolicies(secretID string, policies []string) ([]*structs
if err := c.RPC("ACL.GetPolicies", &req, &resp); err != nil {
// If we encounter an error but have cached policies, mask the error and extend the cache
if len(missing) == 0 {
c.logger.Printf("[WARN] client: failed to resolve policies, using expired cached value: %v", err)
c.logger.Warn("failed to resolve policies, using expired cached value", "error", err)
out = append(out, expired...)
return out, nil
}
......
......@@ -76,6 +76,7 @@ func TestAllocations_GarbageCollectAll_ACL(t *testing.T) {
}
func TestAllocations_GarbageCollect(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
t.Parallel()
require := require.New(t)
client := TestClient(t, func(c *config.Config) {
......@@ -174,6 +175,7 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) {
}
func TestAllocations_Stats(t *testing.T) {
t.Skip("missing exec driver plugin implementation")
t.Parallel()
require := require.New(t)
client := TestClient(t, nil)
......
......@@ -19,6 +19,7 @@ import (
// TestPrevAlloc_StreamAllocDir_TLS asserts ephemeral disk migrations still
// work when TLS is enabled.
func TestPrevAlloc_StreamAllocDir_TLS(t *testing.T) {
t.Skip("missing mock driver plugin implementation")
const (
caFn = "../helper/tlsutil/testdata/global-ca.pem"
serverCertFn = "../helper/tlsutil/testdata/global-server.pem"
......
......@@ -6,11 +6,12 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -58,6 +59,8 @@ var (
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
)
// AllocDir allows creating, destroying, and accessing an allocation's
// directory. All methods are safe for concurrent use.
type AllocDir struct {
// AllocDir is the directory used for storing any state
// of this allocation. It will be purged on alloc destroy.
......@@ -73,7 +76,9 @@ type AllocDir struct {
// built is true if Build has successfully run
built bool
logger *log.Logger
mu sync.RWMutex
logger hclog.Logger
}
// AllocDirFS exposes file operations on the alloc dir
......@@ -88,7 +93,8 @@ type AllocDirFS interface {
// NewAllocDir initializes the AllocDir struct with allocDir as base path for
// the allocation directory.
func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir {
func NewAllocDir(logger hclog.Logger, allocDir string) *AllocDir {
logger = logger.Named("alloc_dir")
return &AllocDir{
AllocDir: allocDir,
SharedDir: filepath.Join(allocDir, SharedAllocName),
......@@ -100,6 +106,9 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir {
// Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is
// nil.
func (d *AllocDir) Copy() *AllocDir {
d.mu.RLock()
defer d.mu.RUnlock()
if d == nil {
return nil
}
......@@ -117,6 +126,9 @@ func (d *AllocDir) Copy() *AllocDir {
// NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map.
func (d *AllocDir) NewTaskDir(name string) *TaskDir {
d.mu.Lock()
defer d.mu.Unlock()
td := newTaskDir(d.logger, d.AllocDir, name)
d.TaskDirs[name] = td
return td
......@@ -129,6 +141,9 @@ func (d *AllocDir) NewTaskDir(name string) *TaskDir {
// file "NOMAD-${ALLOC_ID}-ERROR.log" will be appended to the tar with the
// error message as the contents.
func (d *AllocDir) Snapshot(w io.Writer) error {
d.mu.RLock()
defer d.mu.RUnlock()
allocDataDir := filepath.Join(d.SharedDir, SharedDataDir)
rootPaths := []string{allocDataDir}
for _, taskdir := range d.TaskDirs {
......@@ -195,7 +210,7 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// the snapshotting side closed the connect
// prematurely and won't try to use the tar
// anyway.
d.logger.Printf("[WARN] client: snapshotting failed and unable to write error marker: %v", writeErr)
d.logger.Warn("snapshotting failed and unable to write error marker", "error", writeErr)
}
return fmt.Errorf("failed to snapshot %s: %v", path, err)
}
......@@ -206,11 +221,16 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// Move other alloc directory's shared path and local dir to this alloc dir.
func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
d.mu.RLock()
if !d.built {
// Enforce the invariant that Build is called before Move
d.mu.RUnlock()
return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir)
}
// Moving is slow and only reads immutable fields, so unlock during heavy IO
d.mu.RUnlock()
// Move the data directory
otherDataDir := filepath.Join(other.SharedDir, SharedDataDir)
dataDir := filepath.Join(d.SharedDir, SharedDataDir)
......@@ -246,7 +266,6 @@ func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
var mErr multierror.Error
if err := d.UnmountAll(); err != nil {
......@@ -258,12 +277,17 @@ func (d *AllocDir) Destroy() error {
}
// Unset built since the alloc dir has been destroyed.
d.mu.Lock()
d.built = false
d.mu.Unlock()
return mErr.ErrorOrNil()
}
// UnmountAll linked/mounted directories in task dirs.
func (d *AllocDir) UnmountAll() error {
d.mu.RLock()
defer d.mu.RUnlock()
var mErr multierror.Error
for _, dir := range d.TaskDirs {
// Check if the directory has the shared alloc mounted.
......@@ -322,7 +346,9 @@ func (d *AllocDir) Build() error {
}
// Mark as built
d.mu.Lock()
d.built = true
d.mu.Unlock()
return nil
}
......@@ -386,11 +412,14 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
p := filepath.Join(d.AllocDir, path)
// Check if it is trying to read into a secret directory
d.mu.RLock()
for _, dir := range d.TaskDirs {
if filepath.HasPrefix(p, dir.SecretsDir) {
d.mu.RUnlock()
return nil, fmt.Errorf("Reading secret file prohibited: %s", path)
}
}
d.mu.RUnlock()
f, err := os.Open(p)
if err != nil {
......
......@@ -54,7 +54,7 @@ func TestAllocDir_BuildAlloc(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
d.NewTaskDir(t1.Name)
d.NewTaskDir(t2.Name)
......@@ -91,7 +91,7 @@ func TestAllocDir_MountSharedAlloc(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
if err := d.Build(); err != nil {
t.Fatalf("Build() failed: %v", err)
......@@ -136,7 +136,7 @@ func TestAllocDir_Snapshot(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
if err := d.Build(); err != nil {
t.Fatalf("Build() failed: %v", err)
......@@ -223,13 +223,13 @@ func TestAllocDir_Move(t *testing.T) {
defer os.RemoveAll(tmp2)
// Create two alloc dirs
d1 := NewAllocDir(testlog.Logger(t), tmp1)
d1 := NewAllocDir(testlog.HCLogger(t), tmp1)
if err := d1.Build(); err != nil {
t.Fatalf("Build() failed: %v", err)
}
defer d1.Destroy()
d2 := NewAllocDir(testlog.Logger(t), tmp2)
d2 := NewAllocDir(testlog.HCLogger(t), tmp2)
if err := d2.Build(); err != nil {
t.Fatalf("Build() failed: %v", err)
}
......@@ -284,7 +284,7 @@ func TestAllocDir_EscapeChecking(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
if err := d.Build(); err != nil {
t.Fatalf("Build() failed: %v", err)
}
......@@ -325,7 +325,7 @@ func TestAllocDir_ReadAt_SecretDir(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
if err := d.Build(); err != nil {
t.Fatalf("Build() failed: %v", err)
}
......@@ -410,7 +410,7 @@ func TestAllocDir_CreateDir(t *testing.T) {
// TestAllocDir_Copy asserts that AllocDir.Copy does a deep copy of itself and
// all TaskDirs.
func TestAllocDir_Copy(t *testing.T) {
a := NewAllocDir(testlog.Logger(t), "foo")
a := NewAllocDir(testlog.HCLogger(t), "foo")
a.NewTaskDir("bar")
a.NewTaskDir("baz")
......
......@@ -3,16 +3,19 @@ package allocdir
import (
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
hclog "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// TaskDir contains all of the paths relevant to a task. All paths are on the
// host system so drivers should mount/link into task containers as necessary.
type TaskDir struct {
// AllocDir is the path to the alloc directory on the host
AllocDir string
// Dir is the path to Task directory on the host
Dir string
......@@ -37,16 +40,20 @@ type TaskDir struct {
// <task_dir>/secrets/
SecretsDir string
logger *log.Logger
logger hclog.Logger
}
// newTaskDir creates a TaskDir struct with paths set. Call Build() to
// create paths on disk.
//
// Call AllocDir.NewTaskDir to create new TaskDirs
func newTaskDir(logger *log.Logger, allocDir, taskName string) *TaskDir {
func newTaskDir(logger hclog.Logger, allocDir, taskName string) *TaskDir {
taskDir := filepath.Join(allocDir, taskName)
logger = logger.Named("task_dir").With("task_name", taskName)
return &TaskDir{
AllocDir: allocDir,
Dir: taskDir,
SharedAllocDir: filepath.Join(allocDir, SharedAllocName),
LogDir: filepath.Join(allocDir, SharedAllocName, LogDirName),
......
......@@ -22,7 +22,7 @@ func TestLinuxSpecialDirs(t *testing.T) {
}
defer os.RemoveAll(allocDir)
td := newTaskDir(testlog.Logger(t), allocDir, "test")
td := newTaskDir(testlog.HCLogger(t), allocDir, "test")
// Despite the task dir not existing, unmountSpecialDirs should *not*
// return an error
......
......@@ -18,7 +18,7 @@ func TestTaskDir_EmbedNonexistent(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
td := d.NewTaskDir(t1.Name)
if err := d.Build(); err != nil {
......@@ -40,7 +40,7 @@ func TestTaskDir_EmbedDirs(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
td := d.NewTaskDir(t1.Name)
if err := d.Build(); err != nil {
......@@ -97,7 +97,7 @@ func TestTaskDir_NonRoot_Image(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
td := d.NewTaskDir(t1.Name)
if err := d.Build(); err != nil {
......@@ -120,7 +120,7 @@ func TestTaskDir_NonRoot(t *testing.T) {
}
defer os.RemoveAll(tmp)
d := NewAllocDir(testlog.Logger(t), tmp)
d := NewAllocDir(testlog.HCLogger(t), tmp)
defer d.Destroy()
td := d.NewTaskDir(t1.Name)
if err := d.Build(); err != nil {
......
package allochealth
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/api"
hclog "github.com/hashicorp/go-hclog"
cconsul "github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// allocHealthEventSource is the source used for emitting task events
allocHealthEventSource = "Alloc Unhealthy"
// consulCheckLookupInterval is the interval at which we check if the
// Consul checks are healthy or unhealthy.
consulCheckLookupInterval = 500 * time.Millisecond
)
// Tracker tracks the health of an allocation and makes health events watchable
// via channels.
type Tracker struct {
// ctx and cancelFn is used to shutdown the tracker
ctx context.Context
cancelFn context.CancelFunc
// alloc is the alloc we are tracking
alloc *structs.Allocation
// tg is the task group we are tracking
tg *structs.TaskGroup
// minHealthyTime is the duration an alloc must remain healthy to be
// considered healthy
minHealthyTime time.Duration
// useChecks specifies whether to use Consul healh checks or not
useChecks bool
// consulCheckCount is the number of checks the task group will attempt to
// register
consulCheckCount int
// allocUpdates is a listener for retrieving new alloc updates
allocUpdates *cstructs.AllocListener
// consulClient is used to look up the state of the task's checks
consulClient cconsul.ConsulServiceAPI
// healthy is used to signal whether we have determined the allocation to be
// healthy or unhealthy
healthy chan bool
// allocStopped is triggered when the allocation is stopped and tracking is
// not needed
allocStopped chan struct{}
// l is used to lock shared fields listed below
l sync.Mutex
// tasksHealthy marks whether all the tasks have met their health check
// (disregards Consul)
tasksHealthy bool
// allocFailed marks whether the allocation failed
allocFailed bool
// checksHealthy marks whether all the task's Consul checks are healthy
checksHealthy bool
// taskHealth contains the health state for each task
taskHealth map[string]*taskHealthState
logger hclog.Logger
}
// NewTracker returns a health tracker for the given allocation. An alloc
// listener and consul API object are given so that the watcher can detect
// health changes.
func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.Allocation,
allocUpdates *cstructs.AllocListener, consulClient cconsul.ConsulServiceAPI,
minHealthyTime time.Duration, useChecks bool) *Tracker {
// Do not create a named sub-logger as the hook controlling
// this struct should pass in an appropriately named
// sub-logger.
t := &Tracker{
healthy: make(chan bool, 1),
allocStopped: make(chan struct{}),
alloc: alloc,
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
minHealthyTime: minHealthyTime,
useChecks: useChecks,
allocUpdates: allocUpdates,
consulClient: consulClient,
logger: logger,
}
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
for _, task := range t.tg.Tasks {
t.taskHealth[task.Name] = &taskHealthState{task: task}
}
for _, task := range t.tg.Tasks {
for _, s := range task.Services {
t.consulCheckCount += len(s.Checks)
}
}
t.ctx, t.cancelFn = context.WithCancel(parentCtx)
return t
}
// Start starts the watcher.
func (t *Tracker) Start() {
go t.watchTaskEvents()
if t.useChecks {
go t.watchConsulEvents()
}
}
// HealthyCh returns a channel that will emit a boolean indicating the health of
// the allocation.
func (t *Tracker) HealthyCh() <-chan bool {
return t.healthy
}
// AllocStoppedCh returns a channel that will be fired if the allocation is
// stopped. This means that health will not be set.
func (t *Tracker) AllocStoppedCh() <-chan struct{} {
return t.allocStopped
}
// TaskEvents returns a map of events by task. This should only be called after
// health has been determined. Only tasks that have contributed to the
// allocation being unhealthy will have an event.
func (t *Tracker) TaskEvents() map[string]*structs.TaskEvent {
t.l.Lock()
defer t.l.Unlock()
// Nothing to do since the failure wasn't task related
if t.allocFailed {
return nil
}
deadline, _ := t.ctx.Deadline()
events := make(map[string]*structs.TaskEvent, len(t.tg.Tasks))
// Go through are task information and build the event map
for task, state := range t.taskHealth {
useChecks := t.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
if e, ok := state.event(deadline, t.tg.Update.MinHealthyTime, useChecks); ok {
events[task] = structs.NewTaskEvent(allocHealthEventSource).SetMessage(e)
}
}
return events
}
// setTaskHealth is used to set the tasks health as healthy or unhealthy. If the
// allocation is terminal, health is immediately broadcasted.
func (t *Tracker) setTaskHealth(healthy, terminal bool) {
t.l.Lock()
defer t.l.Unlock()
t.tasksHealthy = healthy
// If we are marked healthy but we also require Consul to be healthy and it
// isn't yet, return, unless the task is terminal
requireConsul := t.useChecks && t.consulCheckCount > 0
if !terminal && healthy && requireConsul && !t.checksHealthy {
return
}
select {
case t.healthy <- healthy:
default:
}
// Shutdown the tracker
t.cancelFn()
}
// setCheckHealth is used to mark the checks as either healthy or unhealthy.
func (t *Tracker) setCheckHealth(healthy bool) {
t.l.Lock()
defer t.l.Unlock()
t.checksHealthy = healthy
// Only signal if we are healthy and so is the tasks
if !healthy || !t.tasksHealthy {
return
}
select {
case t.healthy <- healthy:
default:
}
// Shutdown the tracker
t.cancelFn()
}
// markAllocStopped is used to mark the allocation as having stopped.
func (t *Tracker) markAllocStopped() {
close(t.allocStopped)
t.cancelFn()
}
// watchTaskEvents is a long lived watcher that watches for the health of the
// allocation's tasks.
func (t *Tracker) watchTaskEvents() {
alloc := t.alloc
allStartedTime := time.Time{}
healthyTimer := time.NewTimer(0)
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
for {
// If the alloc is being stopped by the server just exit
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
t.logger.Trace("desired status is terminal for alloc", "alloc_id", alloc.ID, "desired_status", alloc.DesiredStatus)
t.markAllocStopped()
return
}
// Store the task states
t.l.Lock()
for task, state := range alloc.TaskStates {
t.taskHealth[task].state = state
}
t.l.Unlock()
// Detect if the alloc is unhealthy or if all tasks have started yet
latestStartTime := time.Time{}
for _, state := range alloc.TaskStates {
// One of the tasks has failed so we can exit watching
if state.Failed || !state.FinishedAt.IsZero() {
t.setTaskHealth(false, true)
return
}
if state.State != structs.TaskStateRunning {
latestStartTime = time.Time{}
break
} else if state.StartedAt.After(latestStartTime) {
latestStartTime = state.StartedAt
}
}
// If the alloc is marked as failed by the client but none of the
// individual tasks failed, that means something failed at the alloc
// level.
if alloc.ClientStatus == structs.AllocClientStatusFailed {
t.l.Lock()
t.allocFailed = true
t.l.Unlock()
t.setTaskHealth(false, true)
return
}
if !latestStartTime.Equal(allStartedTime) {
// Avoid the timer from firing at the old start time
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
// Set the timer since all tasks are started
if !latestStartTime.IsZero() {
allStartedTime = latestStartTime
healthyTimer.Reset(t.minHealthyTime)
}
}
select {
case <-t.ctx.Done():
return
case newAlloc, ok := <-t.allocUpdates.Ch:
if !ok {
return
}
alloc = newAlloc
case <-healthyTimer.C:
t.setTaskHealth(true, false)
}
}
}
// watchConsulEvents iis a long lived watcher that watches for the health of the
// allocation's Consul checks.
func (t *Tracker) watchConsulEvents() {
// checkTicker is the ticker that triggers us to look at the checks in
// Consul
checkTicker := time.NewTicker(consulCheckLookupInterval)
defer checkTicker.Stop()
// healthyTimer fires when the checks have been healthy for the
// MinHealthyTime
healthyTimer := time.NewTimer(0)
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
// primed marks whether the healthy timer has been set
primed := false
// Store whether the last Consul checks call was successful or not
consulChecksErr := false
// allocReg are the registered objects in Consul for the allocation
var allocReg *consul.AllocRegistration
OUTER:
for {
select {
case <-t.ctx.Done():
return
case <-checkTicker.C:
newAllocReg, err := t.consulClient.AllocRegistrations(t.alloc.ID)
if err != nil {
if !consulChecksErr {
consulChecksErr = true
t.logger.Warn("error looking up Consul registrations for allocation", "error", err, "alloc_id", t.alloc.ID)
}
continue OUTER
} else {
consulChecksErr = false
allocReg = newAllocReg
}
case <-healthyTimer.C:
t.setCheckHealth(true)
}
if allocReg == nil {
continue
}
// Store the task registrations
t.l.Lock()
for task, reg := range allocReg.Tasks {
t.taskHealth[task].taskRegistrations = reg
}
t.l.Unlock()
// Detect if all the checks are passing
passed := true
CHECKS:
for _, treg := range allocReg.Tasks {
for _, sreg := range treg.Services {
for _, check := range sreg.Checks {
if check.Status == api.HealthPassing {
continue
}
passed = false
t.setCheckHealth(false)
break CHECKS
}
}
}
if !passed {
// Reset the timer since we have transitioned back to unhealthy
if primed {
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
primed = false
}
} else if !primed {
// Reset the timer to fire after MinHealthyTime
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
primed = true
healthyTimer.Reset(t.minHealthyTime)
}
}
}
// taskHealthState captures all known health information about a task. It is
// largely used to determine if the task has contributed to the allocation being
// unhealthy.
type taskHealthState struct {
task *structs.Task
state *structs.TaskState
taskRegistrations *consul.TaskRegistration
}
// event takes the deadline time for the allocation to be healthy and the update
// strategy of the group. It returns true if the task has contributed to the
// allocation being unhealthy and if so, an event description of why.
func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration, useChecks bool) (string, bool) {
requireChecks := false
desiredChecks := 0
for _, s := range t.task.Services {
if nc := len(s.Checks); nc > 0 {
requireChecks = true
desiredChecks += nc
}
}
requireChecks = requireChecks && useChecks
if t.state != nil {
if t.state.Failed {
return "Unhealthy because of failed task", true
}
if t.state.State != structs.TaskStateRunning {
return "Task not running by deadline", true
}
// We are running so check if we have been running long enough
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
}
}
if t.taskRegistrations != nil {
var notPassing []string
passing := 0
OUTER:
for _, sreg := range t.taskRegistrations.Services {
for _, check := range sreg.Checks {
if check.Status != api.HealthPassing {
notPassing = append(notPassing, sreg.Service.Service)
continue OUTER
} else {
passing++
}
}
}
if len(notPassing) != 0 {
return fmt.Sprintf("Services not healthy by deadline: %s", strings.Join(notPassing, ", ")), true
}
if passing != desiredChecks {
return fmt.Sprintf("Only %d out of %d checks registered and passing", passing, desiredChecks), true
}
} else if requireChecks {
return "Service checks not registered", true
}
return "", false
}
This diff is collapsed.
package allocrunner
import (
"context"
"fmt"
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
)
// allocHealthSetter is a shim to allow the alloc health watcher hook to set
// and clear the alloc health without full access to the alloc runner state
type allocHealthSetter struct {
ar *allocRunner
}
// ClearHealth allows the health watcher hook to clear the alloc's deployment
// health if the deployment id changes. It does not update the server as the
// status is only cleared when already receiving an update from the server.
//
// Only for use by health hook.
func (a *allocHealthSetter) ClearHealth() {
a.ar.stateLock.Lock()
a.ar.state.ClearDeploymentStatus()
a.ar.stateLock.Unlock()
}
// SetHealth allows the health watcher hook to set the alloc's
// deployment/migration health and emit task events.
//
// Only for use by health hook.
func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents map[string]*structs.TaskEvent) {
// Updating alloc deployment state is tricky because it may be nil, but
// if it's not then we need to maintain the values of Canary and
// ModifyIndex as they're only mutated by the server.
a.ar.stateLock.Lock()
a.ar.state.SetDeploymentStatus(time.Now(), healthy)
a.ar.stateLock.Unlock()
// If deployment is unhealthy emit task events explaining why
a.ar.tasksLock.RLock()
if !healthy && isDeploy {
for task, event := range trackerTaskEvents {
if tr, ok := a.ar.tasks[task]; ok {
// Append but don't emit event since the server
// will be updated below
tr.AppendEvent(event)
}
}
}
// Gather the state of the other tasks
states := make(map[string]*structs.TaskState, len(a.ar.tasks))
for name, tr := range a.ar.tasks {
states[name] = tr.TaskState()
}
a.ar.tasksLock.RUnlock()
// Build the client allocation
calloc := a.ar.clientAlloc(states)
// Update the server
a.ar.stateUpdater.AllocStateUpdated(calloc)
// Broadcast client alloc to listeners
a.ar.allocBroadcaster.Send(calloc)
}
// initRunnerHooks intializes the runners hooks.
func (ar *allocRunner) initRunnerHooks() {
hookLogger := ar.logger.Named("runner_hook")
// create health setting shim
hs := &allocHealthSetter{ar}
// Create the alloc directory hook. This is run first to ensure the
// directory path exists for other hooks.
ar.runnerHooks = []interfaces.RunnerHook{
newAllocDirHook(hookLogger, ar.allocDir),
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient),
}
}
// prerun is used to run the runners prerun hooks.
func (ar *allocRunner) prerun() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running pre-run hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished pre-run hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPrerunHook)
if !ok {
continue
}
//TODO Check hook state
name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running pre-run hook", "name", name, "start", start)
}
if err := pre.Prerun(context.TODO()); err != nil {
return fmt.Errorf("pre-run hook %q failed: %v", name, err)
}
//TODO Persist hook state locally
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished pre-run hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// update runs the alloc runner update hooks. Update hooks are run
// asynchronously with all other alloc runner operations.
func (ar *allocRunner) update(update *structs.Allocation) error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running update hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished update hooks", "end", end, "duration", end.Sub(start))
}()
}
req := &interfaces.RunnerUpdateRequest{
Alloc: update,
}
var merr multierror.Error
for _, hook := range ar.runnerHooks {
h, ok := hook.(interfaces.RunnerUpdateHook)
if !ok {
continue
}
name := h.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running pre-run hook", "name", name, "start", start)
}
if err := h.Update(req); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("update hook %q failed: %v", name, err))
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished update hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return merr.ErrorOrNil()
}
// postrun is used to run the runners postrun hooks.
func (ar *allocRunner) postrun() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running post-run hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished post-run hooks", "end", end, "duration", end.Sub(start))
}()
}
for _, hook := range ar.runnerHooks {
post, ok := hook.(interfaces.RunnerPostrunHook)
if !ok {
continue
}
name := post.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running post-run hook", "name", name, "start", start)
}
if err := post.Postrun(); err != nil {
return fmt.Errorf("hook %q failed: %v", name, err)
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished post-run hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
// destroy is used to run the runners destroy hooks. All hooks are run and
// errors are returned as a multierror.
func (ar *allocRunner) destroy() error {
if ar.logger.IsTrace() {
start := time.Now()
ar.logger.Trace("running destroy hooks", "start", start)
defer func() {
end := time.Now()
ar.logger.Trace("finished destroy hooks", "end", end, "duration", end.Sub(start))
}()
}
var merr multierror.Error
for _, hook := range ar.runnerHooks {
h, ok := hook.(interfaces.RunnerDestroyHook)
if !ok {
continue
}
name := h.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running destroy hook", "name", name, "start", start)
}
if err := h.Destroy(); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("destroy hook %q failed: %v", name, err))
}
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished destroy hooks", "name", name, "end", end, "duration", end.Sub(start))
}
}
return nil
}
This diff is collapsed.
package allocrunner
import (
"context"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
)
// allocDirHook creates and destroys the root directory and shared directories
// for an allocation.
type allocDirHook struct {
allocDir *allocdir.AllocDir
logger log.Logger
}
func newAllocDirHook(logger log.Logger, allocDir *allocdir.AllocDir) *allocDirHook {
ad := &allocDirHook{
allocDir: allocDir,
}
ad.logger = logger.Named(ad.Name())
return ad
}
func (h *allocDirHook) Name() string {
return "alloc_dir"
}
func (h *allocDirHook) Prerun(context.Context) error {
return h.allocDir.Build()
}
func (h *allocDirHook) Destroy() error {
return h.allocDir.Destroy()
}
package allocrunner
import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocwatcher"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/interfaces"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/loader"
)
// Config holds the configuration for creating an allocation runner.
type Config struct {
// Logger is the logger for the allocation runner.
Logger log.Logger
// ClientConfig is the clients configuration.
ClientConfig *clientconfig.Config
// Alloc captures the allocation that should be run.
Alloc *structs.Allocation
// StateDB is used to store and restore state.
StateDB cstate.StateDB
// Consul is the Consul client used to register task services and checks
Consul consul.ConsulServiceAPI
// Vault is the Vault client to use to retrieve Vault tokens
Vault vaultclient.VaultClient
// StateUpdater is used to emit updated task state
StateUpdater interfaces.AllocStateHandler
// PrevAllocWatcher handles waiting on previous allocations and
// migrating their ephemeral disk when necessary.
PrevAllocWatcher allocwatcher.PrevAllocWatcher
// PluginLoader is used to load plugins.
PluginLoader loader.PluginCatalog
// PluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
PluginSingletonLoader loader.PluginCatalog
}
package allocrunner
import (
"context"
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allochealth"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
// healthMutator is able to set/clear alloc health.
type healthSetter interface {
// Set health via the mutator
SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent)
// Clear health when the deployment ID changes
ClearHealth()
}
// allocHealthWatcherHook is responsible for watching an allocation's task
// status and (optionally) Consul health check status to determine if the
// allocation is health or unhealthy. Used by deployments and migrations.
type allocHealthWatcherHook struct {
healthSetter healthSetter
// consul client used to monitor health checks
consul consul.ConsulServiceAPI
// listener is given to trackers to listen for alloc updates and closed
// when the alloc is destroyed.
listener *cstructs.AllocListener
// hookLock is held by hook methods to prevent concurrent access by
// Update and synchronous hooks.
hookLock sync.Mutex
// watchDone is created before calling watchHealth and is closed when
// watchHealth exits. Must be passed into watchHealth to avoid races.
// Initialized already closed as Update may be called before Prerun.
watchDone chan struct{}
// ranOnce is set once Prerun or Update have run at least once. This
// prevents Prerun from running if an Update has already been
// processed. Must hold hookLock to access.
ranOnce bool
// cancelFn stops the health watching/setting goroutine. Wait on
// watchLock to block until the watcher exits.
cancelFn context.CancelFunc
// alloc set by new func or Update. Must hold hookLock to access.
alloc *structs.Allocation
// isDeploy is true if monitoring a deployment. Set in init(). Must
// hold hookLock to access.
isDeploy bool
logger log.Logger
}
func newAllocHealthWatcherHook(logger log.Logger, alloc *structs.Allocation, hs healthSetter,
listener *cstructs.AllocListener, consul consul.ConsulServiceAPI) interfaces.RunnerHook {
// Neither deployments nor migrations care about the health of
// non-service jobs so never watch their health
if alloc.Job.Type != structs.JobTypeService {
return noopAllocHealthWatcherHook{}
}
// Initialize watchDone with a closed chan in case Update runs before Prerun
closedDone := make(chan struct{})
close(closedDone)
h := &allocHealthWatcherHook{
alloc: alloc,
cancelFn: func() {}, // initialize to prevent nil func panics
watchDone: closedDone,
consul: consul,
healthSetter: hs,
listener: listener,
}
h.logger = logger.Named(h.Name())
return h
}
func (h *allocHealthWatcherHook) Name() string {
return "alloc_health_watcher"
}
// init starts the allochealth.Tracker and watchHealth goroutine on either
// Prerun or Update. Caller must set/update alloc and logger fields.
//
// Not threadsafe so the caller should lock since Updates occur concurrently.
func (h *allocHealthWatcherHook) init() error {
// No need to watch health as it's already set
if h.alloc.DeploymentStatus.HasHealth() {
return nil
}
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("task group %q does not exist in job %q", h.alloc.TaskGroup, h.alloc.Job.ID)
}
h.isDeploy = h.alloc.DeploymentID != ""
// No need to watch allocs for deployments that rely on operators
// manually setting health
if h.isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
return nil
}
// Define the deadline, health method, min healthy time from the
// deployment if this is a deployment; otherwise from the migration
// strategy.
deadline, useChecks, minHealthyTime := getHealthParams(time.Now(), tg, h.isDeploy)
// Create a context that is canceled when the tracker should shutdown
// or the deadline is reached.
ctx := context.Background()
ctx, h.cancelFn = context.WithDeadline(ctx, deadline)
// Create a new tracker, start it, and watch for health results.
tracker := allochealth.NewTracker(ctx, h.logger, h.alloc,
h.listener, h.consul, minHealthyTime, useChecks)
tracker.Start()
// Create a new done chan and start watching for health updates
h.watchDone = make(chan struct{})
go h.watchHealth(ctx, tracker, h.watchDone)
return nil
}
func (h *allocHealthWatcherHook) Prerun(context.Context) error {
h.hookLock.Lock()
defer h.hookLock.Unlock()
if h.ranOnce {
// An Update beat Prerun to running the watcher; noop
return nil
}
h.ranOnce = true
return h.init()
}
func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.hookLock.Lock()
defer h.hookLock.Unlock()
// Prevent Prerun from running after an Update
h.ranOnce = true
// Cancel the old watcher and create a new one
h.cancelFn()
// Wait until the watcher exits
<-h.watchDone
// Deployment has changed, reset status
if req.Alloc.DeploymentID != h.alloc.DeploymentID {
h.healthSetter.ClearHealth()
}
// Update alloc
h.alloc = req.Alloc
return h.init()
}
func (h *allocHealthWatcherHook) Destroy() error {
h.hookLock.Lock()
defer h.hookLock.Unlock()
h.cancelFn()
h.listener.Close()
// Wait until the watcher exits
<-h.watchDone
return nil
}
// watchHealth watches alloc health until it is set, the alloc is stopped, or
// the context is canceled. watchHealth will be canceled and restarted on
// Updates so calls are serialized with a lock.
func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker, done chan<- struct{}) {
defer close(done)
select {
case <-ctx.Done():
return
case <-tracker.AllocStoppedCh():
return
case healthy := <-tracker.HealthyCh():
// If this is an unhealthy deployment emit events for tasks
var taskEvents map[string]*structs.TaskEvent
if !healthy && h.isDeploy {
taskEvents = tracker.TaskEvents()
}
h.healthSetter.SetHealth(healthy, h.isDeploy, taskEvents)
}
}
// getHealthParams returns the health watcher parameters which vary based on
// whether this allocation is in a deployment or migration.
func getHealthParams(now time.Time, tg *structs.TaskGroup, isDeploy bool) (deadline time.Time, useChecks bool, minHealthyTime time.Duration) {
if isDeploy {
deadline = now.Add(tg.Update.HealthyDeadline)
minHealthyTime = tg.Update.MinHealthyTime
useChecks = tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
} else {
strategy := tg.Migrate
if strategy == nil {
// For backwards compat with pre-0.8 allocations that
// don't have a migrate strategy set.
strategy = structs.DefaultMigrateStrategy()
}
deadline = now.Add(strategy.HealthyDeadline)
minHealthyTime = strategy.MinHealthyTime
useChecks = strategy.HealthCheck == structs.MigrateStrategyHealthChecks
}
return
}
// noopAllocHealthWatcherHook is an empty hook implementation returned by
// newAllocHealthWatcherHook when an allocation will never need its health
// monitored.
type noopAllocHealthWatcherHook struct{}
func (noopAllocHealthWatcherHook) Name() string {
return "alloc_health_watcher"
}
package allocrunner
import (
"context"
"sync"
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// statically assert health hook implements the expected interfaces
var _ interfaces.RunnerPrerunHook = (*allocHealthWatcherHook)(nil)
var _ interfaces.RunnerUpdateHook = (*allocHealthWatcherHook)(nil)
var _ interfaces.RunnerDestroyHook = (*allocHealthWatcherHook)(nil)
// allocHealth is emitted to a chan whenever SetHealth is called
type allocHealth struct {
healthy bool
taskEvents map[string]*structs.TaskEvent
}
// mockHealthSetter implements healthSetter that stores health internally
type mockHealthSetter struct {
setCalls int
clearCalls int
healthy *bool
isDeploy *bool
taskEvents map[string]*structs.TaskEvent
mu sync.Mutex
healthCh chan allocHealth
}
// newMockHealthSetter returns a mock HealthSetter that emits all SetHealth
// calls on a buffered chan. Callers who do need need notifications of health
// changes may just create the struct directly.
func newMockHealthSetter() *mockHealthSetter {
return &mockHealthSetter{
healthCh: make(chan allocHealth, 1),
}
}
func (m *mockHealthSetter) SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.setCalls++
m.healthy = &healthy
m.isDeploy = &isDeploy
m.taskEvents = taskEvents
if m.healthCh != nil {
m.healthCh <- allocHealth{healthy, taskEvents}
}
}
func (m *mockHealthSetter) ClearHealth() {
m.mu.Lock()
defer m.mu.Unlock()
m.clearCalls++
m.healthy = nil
m.isDeploy = nil
m.taskEvents = nil
}
// TestHealthHook_PrerunDestroy asserts a health hook does not error if it is run and destroyed.
func TestHealthHook_PrerunDestroy(t *testing.T) {
t.Parallel()
require := require.New(t)
b := cstructs.NewAllocBroadcaster()
defer b.Close()
logger := testlog.HCLogger(t)
consul := consul.NewMockConsulServiceClient(t, logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul)
// Assert we implemented the right interfaces
prerunh, ok := h.(interfaces.RunnerPrerunHook)
require.True(ok)
_, ok = h.(interfaces.RunnerUpdateHook)
require.True(ok)
destroyh, ok := h.(interfaces.RunnerDestroyHook)
require.True(ok)
// Prerun
require.NoError(prerunh.Prerun(context.Background()))
// Assert isDeploy is false (other tests peek at isDeploy to determine
// if an Update applied)
ahw := h.(*allocHealthWatcherHook)
ahw.hookLock.Lock()
assert.False(t, ahw.isDeploy)
ahw.hookLock.Unlock()
// Destroy
require.NoError(destroyh.Destroy())
}
// TestHealthHook_PrerunUpdateDestroy asserts Updates may be applied concurrently.
func TestHealthHook_PrerunUpdateDestroy(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
b := cstructs.NewAllocBroadcaster()
defer b.Close()
logger := testlog.HCLogger(t)
consul := consul.NewMockConsulServiceClient(t, logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Prerun
require.NoError(h.Prerun(context.Background()))
// Update multiple times in a goroutine to mimic Client behavior
// (Updates are concurrent with alloc runner but are applied serially).
errs := make(chan error, 2)
go func() {
defer close(errs)
for i := 0; i < cap(errs); i++ {
alloc.AllocModifyIndex++
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc.Copy()})
}
}()
for err := range errs {
assert.NoError(t, err)
}
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_UpdatePrerunDestroy asserts that a hook may have Update
// called before Prerun.
func TestHealthHook_UpdatePrerunDestroy(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
b := cstructs.NewAllocBroadcaster()
defer b.Close()
logger := testlog.HCLogger(t)
consul := consul.NewMockConsulServiceClient(t, logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Set a DeploymentID to cause ClearHealth to be called
alloc.DeploymentID = uuid.Generate()
// Update in a goroutine to mimic Client behavior (Updates are
// concurrent with alloc runner).
errs := make(chan error, 1)
go func(alloc *structs.Allocation) {
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc})
close(errs)
}(alloc.Copy())
for err := range errs {
assert.NoError(t, err)
}
// Prerun should be a noop
require.NoError(h.Prerun(context.Background()))
// Assert that the Update took affect by isDeploy being true
h.hookLock.Lock()
assert.True(t, h.isDeploy)
h.hookLock.Unlock()
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_Destroy asserts that a hook may have only Destroy called.
func TestHealthHook_Destroy(t *testing.T) {
t.Parallel()
require := require.New(t)
b := cstructs.NewAllocBroadcaster()
defer b.Close()
logger := testlog.HCLogger(t)
consul := consul.NewMockConsulServiceClient(t, logger)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(logger, mock.Alloc(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_SetHealth asserts SetHealth is called when health status is
// set. Uses task state and health checks.
func TestHealthHook_SetHealth(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.TaskRegistration{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}
b := cstructs.NewAllocBroadcaster()
defer b.Close()
logger := testlog.HCLogger(t)
// Don't reply on the first call
called := false
consul := consul.NewMockConsulServiceClient(t, logger)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if !called {
called = true
return nil, nil
}
reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
hs := newMockHealthSetter()
h := newAllocHealthWatcherHook(logger, alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Prerun
require.NoError(h.Prerun(context.Background()))
// Wait for health to be set (healthy)
select {
case <-time.After(5 * time.Second):
t.Fatalf("timeout waiting for health to be set")
case health := <-hs.healthCh:
require.True(health.healthy)
// Healthy allocs shouldn't emit task events
ev := health.taskEvents[task.Name]
require.Nilf(ev, "%#v", health.taskEvents)
}
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_SystemNoop asserts that system jobs return the noop tracker.
func TestHealthHook_SystemNoop(t *testing.T) {
t.Parallel()
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.SystemAlloc(), nil, nil, nil)
// Assert that it's the noop impl
_, ok := h.(noopAllocHealthWatcherHook)
require.True(t, ok)
// Assert the noop impl does not implement any hooks
_, ok = h.(interfaces.RunnerPrerunHook)
require.False(t, ok)
_, ok = h.(interfaces.RunnerUpdateHook)
require.False(t, ok)
_, ok = h.(interfaces.RunnerDestroyHook)
require.False(t, ok)
}
// TestHealthHook_BatchNoop asserts that batch jobs return the noop tracker.
func TestHealthHook_BatchNoop(t *testing.T) {
t.Parallel()
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.BatchAlloc(), nil, nil, nil)
// Assert that it's the noop impl
_, ok := h.(noopAllocHealthWatcherHook)
require.True(t, ok)
}
package interfaces
import (
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// AllocRunner is the interface for an allocation runner.
type AllocRunner interface {
// ID returns the ID of the allocation being run.
ID() string
// Run starts the runner and begins executing all the tasks as part of the
// allocation.
Run()
// State returns a copy of the runners state object
State() *state.State
TaskStateHandler
}
// TaskStateHandler exposes a handler to be called when a task's state changes
type TaskStateHandler interface {
// TaskStateUpdated is used to emit updated task state
TaskStateUpdated(task string, state *structs.TaskState)
}
// AllocStatsReporter gives acess to the latest resource usage from the
// allocation
type AllocStatsReporter interface {
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
}
package interfaces
import (
"context"
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// RunnnerHook is a lifecycle hook into the life cycle of an allocation runner.
type RunnerHook interface {
Name() string
}
type RunnerPrerunHook interface {
RunnerHook
Prerun(context.Context) error
}
type RunnerPostrunHook interface {
RunnerHook
Postrun() error
}
type RunnerDestroyHook interface {
RunnerHook
Destroy() error
}
type RunnerUpdateHook interface {
RunnerHook
Update(*RunnerUpdateRequest) error
}
type RunnerUpdateRequest struct {
Alloc *structs.Allocation
}
// XXX Not sure yet
type RunnerHookFactory func(target HookTarget) (RunnerHook, error)
type HookTarget interface {
// State retrieves a copy of the target alloc runners state.
State() *state.State
}
package interfaces
import (
"context"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
/*
Restart
+--------------------------------------------------------+
| |
| *Update |
| +-------+ |
| | | |
| | | |
| +---v-------+----+ |
+----v----+ | Running | +----+-----+ +--------------+
| | *Prestart |----------------| *Exited | | *Stop | |
| Pending +-------------> *Poststart run +---^-----------> Exited +-----------> Terminal |
| | | upon entering | | | | NoRestart | |
+---------+ | running | | +----------+ +--------------+
| | |
+--------+-------+ |
| |
+-----------+
*Kill
(forces terminal)
Link: http://stable.ascii-flow.appspot.com/#Draw4489375405966393064/1824429135
*/
// TaskHook is a lifecycle hook into the life cycle of a task runner.
type TaskHook interface {
Name() string
}
type TaskPrestartRequest struct {
// HookData is previously set data by the hook
HookData map[string]string
// Task is the task to run
Task *structs.Task
// Vault token may optionally be set if a Vault token is available
VaultToken string
// TaskDir contains the task's directory tree on the host
TaskDir *allocdir.TaskDir
// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
}
type TaskPrestartResponse struct {
// Env is the environment variables to set for the task
Env map[string]string
// HookData allows the hook to emit data to be passed in the next time it is
// run
HookData map[string]string
// Done lets the hook indicate that it should only be run once
Done bool
}
type TaskPrestartHook interface {
TaskHook
// Prestart is called before the task is started.
Prestart(context.Context, *TaskPrestartRequest, *TaskPrestartResponse) error
}
// DriverStats is the interface implemented by DriverHandles to return task stats.
type DriverStats interface {
Stats() (*cstructs.TaskResourceUsage, error)
}
type TaskPoststartRequest struct {
// Exec hook (may be nil)
DriverExec interfaces.ScriptExecutor
// Network info (may be nil)
DriverNetwork *cstructs.DriverNetwork
// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
// Stats collector
DriverStats DriverStats
}
type TaskPoststartResponse struct{}
type TaskPoststartHook interface {
TaskHook
// Poststart is called after the task has started.
Poststart(context.Context, *TaskPoststartRequest, *TaskPoststartResponse) error
}
type TaskKillRequest struct{}
type TaskKillResponse struct{}
type TaskKillHook interface {
TaskHook
// Kill is called when a task is going to be killed.
Kill(context.Context, *TaskKillRequest, *TaskKillResponse) error
}
type TaskExitedRequest struct{}
type TaskExitedResponse struct{}
type TaskExitedHook interface {
TaskHook
// Exited is called when a task exits and may or may not be restarted.
Exited(context.Context, *TaskExitedRequest, *TaskExitedResponse) error
}
type TaskUpdateRequest struct {
VaultToken string
// Alloc is the current version of the allocation (may have been
// updated since the hook was created)
Alloc *structs.Allocation
// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
}
type TaskUpdateResponse struct{}
type TaskUpdateHook interface {
TaskHook
Update(context.Context, *TaskUpdateRequest, *TaskUpdateResponse) error
}
type TaskStopRequest struct{}
type TaskStopResponse struct{}
type TaskStopHook interface {
TaskHook
// Stop is called after the task has exited and will not be started again.
Stop(context.Context, *TaskStopRequest, *TaskStopResponse) error
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment