Commit 15f7e03b authored by Michael Schurter's avatar Michael Schurter Committed by Alex Dadgar
Browse files

client: implement all-or-nothing alloc restoration

Restoring calls NewAR -> Restore -> Run

NewAR now calls NewTR
AR.Restore calls TR.Restore
AR.Run calls TR.Run
parent 366bcd1b
Branches unavailable
No related merge requests found
Showing with 213 additions and 121 deletions
+213 -121
......@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
"github.com/hashicorp/nomad/client/config"
clientstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -57,12 +58,18 @@ type allocRunner struct {
}
// NewAllocRunner returns a new allocation runner.
func NewAllocRunner(config *Config) *allocRunner {
func NewAllocRunner(config *Config) (*allocRunner, error) {
alloc := config.Alloc
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup)
}
ar := &allocRunner{
alloc: alloc,
clientConfig: config.ClientConfig,
vaultClient: config.Vault,
alloc: config.Alloc,
tasks: make(map[string]*taskrunner.TaskRunner),
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation),
stateDB: config.StateDB,
......@@ -70,15 +77,44 @@ func NewAllocRunner(config *Config) *allocRunner {
// Create alloc dir
//XXX update AllocDir to hc log
ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, config.Alloc.ID))
ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, alloc.ID))
// Create the logger based on the allocation ID
ar.logger = config.Logger.With("alloc_id", config.Alloc.ID)
ar.logger = config.Logger.With("alloc_id", alloc.ID)
// Initialize the runners hooks.
ar.initRunnerHooks()
return ar
// Create the TaskRunners
if err := ar.initTaskRunners(tg.Tasks); err != nil {
return nil, err
}
return ar, nil
}
// initTaskRunners creates task runners but does *not* run them.
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
for _, task := range tasks {
config := &taskrunner.Config{
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
VaultClient: ar.vaultClient,
}
// Create, but do not Run, the task runner
tr, err := taskrunner.NewTaskRunner(config)
if err != nil {
return fmt.Errorf("failed creating runner for task %q: %v", task.Name, err)
}
ar.tasks[task.Name] = tr
}
return nil
}
func (ar *allocRunner) WaitCh() <-chan struct{} {
......@@ -91,7 +127,6 @@ func (ar *allocRunner) Run() {
// Close the wait channel
defer close(ar.waitCh)
var err error
var taskWaitCh <-chan struct{}
// Run the prestart hooks
......@@ -102,10 +137,7 @@ func (ar *allocRunner) Run() {
}
// Run the runners
taskWaitCh, err = ar.runImpl()
if err != nil {
ar.logger.Error("starting tasks failed", "error", err)
}
taskWaitCh = ar.runImpl()
for {
select {
......@@ -130,20 +162,9 @@ POST:
}
// runImpl is used to run the runners.
func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
// Grab the task group
alloc := ar.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
// XXX Fail and exit
ar.logger.Error("failed to lookup task group", "task_group", alloc.TaskGroup)
return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup)
}
for _, task := range tg.Tasks {
if err := ar.runTask(alloc, task); err != nil {
return nil, err
}
func (ar *allocRunner) runImpl() <-chan struct{} {
for _, task := range ar.tasks {
go task.Run()
}
// Return a combined WaitCh that is closed when all task runners have
......@@ -156,32 +177,7 @@ func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
}
}()
return waitCh, nil
}
// runTask is used to run a task.
func (ar *allocRunner) runTask(alloc *structs.Allocation, task *structs.Task) error {
// Create the runner
config := &taskrunner.Config{
Alloc: alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
VaultClient: ar.vaultClient,
}
tr, err := taskrunner.NewTaskRunner(config)
if err != nil {
return err
}
// Start the runner
go tr.Run()
// Store the runner
ar.tasks[task.Name] = tr
return nil
return waitCh
}
// Alloc returns the current allocation being run by this runner.
......@@ -193,9 +189,29 @@ func (ar *allocRunner) Alloc() *structs.Allocation {
}
// SaveState does all the state related stuff. Who knows. FIXME
//XXX
//XXX do we need to do periodic syncing? if Saving is only called *before* Run
// *and* within Run -- *and* Updates are applid within Run -- we may be able to
// skip quite a bit of locking? maybe?
func (ar *allocRunner) SaveState() error {
return nil
return ar.stateDB.Update(func(tx *bolt.Tx) error {
//XXX Track EvalID to only write alloc on change?
// Write the allocation
return clientstate.PutAllocation(tx, ar.Alloc())
})
}
// Restore state from database. Must be called after NewAllocRunner but before
// Run.
func (ar *allocRunner) Restore() error {
return ar.stateDB.View(func(tx *bolt.Tx) error {
// Restore task runners
for _, tr := range ar.tasks {
if err := tr.Restore(tx); err != nil {
return err
}
}
return nil
})
}
// Update the running allocation with a new version received from the server.
......
......@@ -5,6 +5,13 @@ import (
"github.com/hashicorp/nomad/helper"
)
var (
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
//XXX refactor out of client/state and taskrunner
taskRunnerStateAllKey = []byte("simple-all")
)
// LocalState is Task state which is persisted for use when restarting Nomad
// agents.
type LocalState struct {
......
......@@ -18,7 +18,7 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
oldstate "github.com/hashicorp/nomad/client/state"
clientstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
......@@ -42,6 +42,7 @@ const (
var (
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
//XXX refactor out of clientstate and new state
taskRunnerStateAllKey = []byte("simple-all")
)
......@@ -446,16 +447,15 @@ func (tr *TaskRunner) persistLocalState() error {
return nil
}
// Start the transaction.
return tr.stateDB.Batch(func(tx *bolt.Tx) error {
return tr.stateDB.Update(func(tx *bolt.Tx) error {
// Grab the task bucket
//XXX move into new state pkg
taskBkt, err := oldstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
taskBkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := oldstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
if err := clientstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
......@@ -468,6 +468,24 @@ func (tr *TaskRunner) persistLocalState() error {
})
}
// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner
// but before Run.
func (tr *TaskRunner) Restore(tx *bolt.Tx) error {
bkt, err := clientstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
if err != nil {
return fmt.Errorf("failed to get task %q bucket: %v", tr.taskName, err)
}
//XXX set persisted hash to avoid immediate write on first use?
var ls state.LocalState
if err := clientstate.GetObject(bkt, taskRunnerStateAllKey, &ls); err != nil {
return fmt.Errorf("failed to read task runner state: %v", err)
}
tr.localState = &ls
return nil
}
// SetState sets the task runners allocation state.
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
// Ensure the event is populated with human readable strings
......
......@@ -124,8 +124,9 @@ func (tr *TaskRunner) prestart() error {
}
tr.localStateLock.Unlock()
// Persist local state if the hook state has changed
// Store and persist local state if the hook state has changed
if !hookState.Equal(origHookState) {
tr.localState.Hooks[name] = hookState
if err := tr.persistLocalState(); err != nil {
return err
}
......
......@@ -32,7 +32,7 @@ import (
"github.com/hashicorp/nomad/client/allocrunnerv2"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/state"
clientstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
......@@ -110,6 +110,8 @@ type AllocRunner interface {
SaveState() error
Update(*structs.Allocation)
Alloc() *structs.Allocation
Restore() error
Run()
}
// Client is used to implement the client interaction with Nomad. Clients
......@@ -728,6 +730,7 @@ func (c *Client) restoreState() error {
return nil
}
//XXX REMOVED! make a note in backward compat / upgrading doc
// COMPAT: Remove in 0.7.0
// 0.6.0 transitioned from individual state files to a single bolt-db.
// The upgrade path is to:
......@@ -735,74 +738,72 @@ func (c *Client) restoreState() error {
// If so, restore from that and delete old state
// Restore using state database
// Allocs holds the IDs of the allocations being restored
var allocs []string
// Upgrading tracks whether this is a pre 0.6.0 upgrade path
var upgrading bool
// Scan the directory
allocDir := filepath.Join(c.config.StateDir, "alloc")
list, err := ioutil.ReadDir(allocDir)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to list alloc state: %v", err)
} else if err == nil && len(list) != 0 {
upgrading = true
for _, entry := range list {
allocs = append(allocs, entry.Name())
}
} else {
// Normal path
err := c.stateDB.View(func(tx *bolt.Tx) error {
allocs, err = state.GetAllAllocationIDs(tx)
if err != nil {
return fmt.Errorf("failed to list allocations: %v", err)
}
return nil
})
// Restore allocations
var allocs []*structs.Allocation
var err error
err = c.stateDB.View(func(tx *bolt.Tx) error {
allocs, err = clientstate.GetAllAllocations(tx)
if err != nil {
return err
return fmt.Errorf("failed to list allocations: %v", err)
}
return nil
})
if err != nil {
return err
}
// Load each alloc back
var mErr multierror.Error
for _, id := range allocs {
alloc := &structs.Allocation{ID: id}
// don't worry about blocking/migrating when restoring
watcher := allocrunner.NoopPrevAlloc{}
for _, alloc := range allocs {
//XXX FIXME create a root logger
logger := hclog.New(&hclog.LoggerOptions{
Name: "nomad",
Level: hclog.LevelFromString(c.configCopy.LogLevel),
TimeFormat: time.RFC3339,
})
c.configLock.RLock()
ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
arConf := &allocrunnerv2.Config{
Alloc: alloc,
Logger: logger,
ClientConfig: c.config,
StateDB: c.stateDB,
}
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
c.allocLock.Unlock()
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %q: %v", id, err)
ar, err := allocrunnerv2.NewAllocRunner(arConf)
if err != nil {
c.logger.Printf("[ERR] client: failed to create alloc %q: %v", alloc.ID, err)
mErr.Errors = append(mErr.Errors, err)
} else {
go ar.Run()
if upgrading {
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", id, err)
}
}
continue
}
}
// Delete all the entries
if upgrading {
if err := os.RemoveAll(allocDir); err != nil {
// Restore state
if err := ar.Restore(); err != nil {
c.logger.Printf("[ERR] client: failed to restore alloc %q: %v", alloc.ID, err)
mErr.Errors = append(mErr.Errors, err)
continue
}
//XXX is this locking necessary?
c.allocLock.Lock()
c.allocs[alloc.ID] = ar
c.allocLock.Unlock()
}
return mErr.ErrorOrNil()
// Don't run any allocs if there were any failures
//XXX removing this check would switch from all-or-nothing restores to
// best-effort. went with all-or-nothing for now
if err := mErr.ErrorOrNil(); err != nil {
return err
}
// All allocs restored successfully, run them!
for _, ar := range c.allocs {
go ar.Run()
}
return nil
}
// saveState is used to snapshot our state into the data dir.
......@@ -1956,10 +1957,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
// The long term fix is to pass in the config and node separately and then
// we don't have to do a copy.
//ar := allocrunner.NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
//XXX FIXME
//XXX FIXME create a root logger
logger := hclog.New(&hclog.LoggerOptions{
Name: "nomad",
Level: hclog.LevelFromString(c.configCopy.LogLevel),
Name: "nomad",
Level: hclog.LevelFromString(c.configCopy.LogLevel),
TimeFormat: time.RFC3339,
})
c.configLock.RLock()
......@@ -1972,12 +1974,15 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
}
c.configLock.RUnlock()
ar := allocrunnerv2.NewAllocRunner(arConf)
ar, err := allocrunnerv2.NewAllocRunner(arConf)
if err != nil {
return err
}
// Store the alloc runner.
c.allocs[alloc.ID] = ar
//XXX(schmichael) Why do we do this?
// Initialize local state
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}
......
......@@ -184,20 +184,65 @@ func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
return alloc.DeleteBucket(key)
}
func GetAllAllocationIDs(tx *bolt.Tx) ([]string, error) {
//XXX duplicated in arv2?!
var (
// The following are the key paths written to the state database
allocRunnerStateAllocKey = []byte("alloc")
)
type allocRunnerAllocState struct {
Alloc *structs.Allocation
}
func GetAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, error) {
allocationsBkt := tx.Bucket(allocationsBucket)
if allocationsBkt == nil {
// No allocs
return nil, nil
}
var allocs []*structs.Allocation
// Create a cursor for iteration.
var allocIDs []string
c := allocationsBkt.Cursor()
// Iterate over all the buckets
// Iterate over all the allocation buckets
for k, _ := c.First(); k != nil; k, _ = c.Next() {
allocIDs = append(allocIDs, string(k))
allocID := string(k)
allocBkt := allocationsBkt.Bucket(k)
if allocBkt == nil {
//XXX merr?
return nil, fmt.Errorf("alloc %q missing", allocID)
}
var allocState allocRunnerAllocState
if err := GetObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil {
//XXX merr?
return nil, fmt.Errorf("failed to restore alloc %q: %v", allocID, err)
}
allocs = append(allocs, allocState.Alloc)
}
return allocs, nil
}
// PutAllocation stores an allocation given a writable transaction.
func PutAllocation(tx *bolt.Tx, alloc *structs.Allocation) error {
// Retrieve the root allocations bucket
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket)
if err != nil {
return err
}
return allocIDs, nil
// Retrieve the specific allocations bucket
key := []byte(alloc.ID)
allocBkt, err := allocsBkt.CreateBucketIfNotExists(key)
if err != nil {
return err
}
allocState := allocRunnerAllocState{
Alloc: alloc,
}
return PutObject(allocBkt, allocRunnerStateAllocKey, &allocState)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment