Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
小 白蛋
Nomad
Commits
e22393ae
Commit
e22393ae
authored
8 years ago
by
Alex Dadgar
Browse files
Options
Download
Email Patches
Plain Diff
Restore state + upgrade path
parent
85a81f47
No related merge requests found
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
client/alloc_runner.go
+90
-30
client/alloc_runner.go
client/client.go
+54
-19
client/client.go
client/state_database.go
+118
-23
client/state_database.go
client/task_runner.go
+50
-13
client/task_runner.go
client/util.go
+4
-6
client/util.go
demo/vagrant/README.md
+0
-0
demo/vagrant/README.md
with
316 additions
and
91 deletions
+316
-91
client/alloc_runner.go
+
90
-
30
View file @
e22393ae
...
...
@@ -88,6 +88,7 @@ type AllocRunner struct {
allocDirPersisted
bool
}
// COMPAT: Remove in 0.7.0
// allocRunnerState is used to snapshot the state of the alloc runner
type
allocRunnerState
struct
{
Version
string
...
...
@@ -149,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
return
ar
}
// stateFilePath returns the path to our state file
func
(
r
*
AllocRunner
)
stateFilePath
()
string
{
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func
(
r
*
AllocRunner
)
pre060StateFilePath
()
string
{
r
.
allocLock
.
Lock
()
defer
r
.
allocLock
.
Unlock
()
path
:=
filepath
.
Join
(
r
.
config
.
StateDir
,
"alloc"
,
r
.
alloc
.
ID
,
"state.json"
)
...
...
@@ -159,29 +162,72 @@ func (r *AllocRunner) stateFilePath() string {
// RestoreState is used to restore the state of the alloc runner
func
(
r
*
AllocRunner
)
RestoreState
()
error
{
// Load the snapshot
// Check if the old snapshot is there
oldPath
:=
r
.
pre060StateFilePath
()
var
snap
allocRunnerState
if
err
:=
restoreState
(
r
.
stateFilePath
(),
&
snap
);
err
!=
nil
{
return
err
}
if
err
:=
pre060RestoreState
(
oldPath
,
&
snap
);
err
==
nil
{
// Restore fields
r
.
alloc
=
snap
.
Alloc
r
.
allocDir
=
snap
.
AllocDir
r
.
allocClientStatus
=
snap
.
AllocClientStatus
r
.
allocClientDescription
=
snap
.
AllocClientDescription
if
r
.
alloc
!=
nil
{
r
.
taskStates
=
snap
.
Alloc
.
TaskStates
}
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if
snap
.
AllocDir
==
nil
&&
snap
.
Context
!=
nil
{
r
.
logger
.
Printf
(
"[DEBUG] client: migrating state snapshot for alloc %q"
,
r
.
alloc
.
ID
)
snap
.
AllocDir
=
allocdir
.
NewAllocDir
(
r
.
logger
,
snap
.
Context
.
AllocDir
.
AllocDir
)
for
taskName
:=
range
snap
.
Context
.
AllocDir
.
TaskDirs
{
snap
.
AllocDir
.
NewTaskDir
(
taskName
)
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if
snap
.
AllocDir
==
nil
&&
snap
.
Context
!=
nil
{
r
.
logger
.
Printf
(
"[DEBUG] client: migrating state snapshot for alloc %q"
,
r
.
alloc
.
ID
)
snap
.
AllocDir
=
allocdir
.
NewAllocDir
(
r
.
logger
,
snap
.
Context
.
AllocDir
.
AllocDir
)
for
taskName
:=
range
snap
.
Context
.
AllocDir
.
TaskDirs
{
snap
.
AllocDir
.
NewTaskDir
(
taskName
)
}
}
}
// XXX needs to be updated and handle the upgrade path
// Delete the old state
os
.
RemoveAll
(
oldPath
)
}
else
if
!
os
.
IsNotExist
(
err
)
{
// Something corrupt in the old state file
return
err
}
else
{
// We are doing a normal restore
err
:=
r
.
stateDB
.
View
(
func
(
tx
*
bolt
.
Tx
)
error
{
bkt
,
err
:=
getAllocationBucket
(
tx
,
r
.
alloc
.
ID
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to get allocation bucket: %v"
,
err
)
}
// Restore fields
r
.
alloc
=
snap
.
Alloc
r
.
allocDir
=
snap
.
AllocDir
r
.
allocClientStatus
=
snap
.
AllocClientStatus
r
.
allocClientDescription
=
snap
.
AllocClientDescription
// Get the state objects
var
mutable
allocRunnerMutableState
var
immutable
allocRunnerImmutableState
var
allocDir
allocdir
.
AllocDir
if
err
:=
getObject
(
bkt
,
allocRunnerStateImmutableKey
,
&
immutable
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to read alloc runner immutable state: %v"
,
err
)
}
if
err
:=
getObject
(
bkt
,
allocRunnerStateMutableKey
,
&
mutable
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to read alloc runner mutable state: %v"
,
err
)
}
if
err
:=
getObject
(
bkt
,
allocRunnerStateAllocDirKey
,
&
allocDir
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to read alloc runner alloc_dir state: %v"
,
err
)
}
// Populate the fields
r
.
alloc
=
immutable
.
Alloc
r
.
allocDir
=
&
allocDir
r
.
allocClientStatus
=
mutable
.
AllocClientStatus
r
.
allocClientDescription
=
mutable
.
AllocClientDescription
r
.
taskStates
=
mutable
.
TaskStates
return
nil
})
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to read allocation state: %v"
,
err
)
}
}
var
snapshotErrors
multierror
.
Error
if
r
.
alloc
==
nil
{
...
...
@@ -194,11 +240,17 @@ func (r *AllocRunner) RestoreState() error {
return
e
}
r
.
taskStates
=
snap
.
Alloc
.
TaskStates
tg
:=
r
.
alloc
.
Job
.
LookupTaskGroup
(
r
.
alloc
.
TaskGroup
)
if
tg
==
nil
{
return
fmt
.
Errorf
(
"restored allocation doesn't contain task group %q"
,
r
.
alloc
.
TaskGroup
)
}
// Restore the task runners
var
mErr
multierror
.
Error
for
name
,
state
:=
range
r
.
taskStates
{
for
_
,
task
:=
range
tg
.
Tasks
{
name
:=
task
.
Name
state
:=
r
.
taskStates
[
name
]
// Mark the task as restored.
r
.
restored
[
name
]
=
struct
{}{}
...
...
@@ -210,7 +262,6 @@ func (r *AllocRunner) RestoreState() error {
return
err
}
task
:=
&
structs
.
Task
{
Name
:
name
}
tr
:=
NewTaskRunner
(
r
.
logger
,
r
.
config
,
r
.
stateDB
,
r
.
setTaskState
,
td
,
r
.
Alloc
(),
task
,
r
.
vaultClient
,
r
.
consulClient
)
r
.
tasks
[
name
]
=
tr
...
...
@@ -231,11 +282,6 @@ func (r *AllocRunner) RestoreState() error {
return
mErr
.
ErrorOrNil
()
}
// GetAllocDir returns the alloc dir for the alloc runner
func
(
r
*
AllocRunner
)
GetAllocDir
()
*
allocdir
.
AllocDir
{
return
r
.
allocDir
}
// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
...
...
@@ -330,7 +376,12 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
// DestroyState is used to cleanup after ourselves
func
(
r
*
AllocRunner
)
DestroyState
()
error
{
return
os
.
RemoveAll
(
filepath
.
Dir
(
r
.
stateFilePath
()))
return
r
.
stateDB
.
Update
(
func
(
tx
*
bolt
.
Tx
)
error
{
if
err
:=
deleteAllocationBucket
(
tx
,
r
.
alloc
.
ID
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to delete allocation bucket: %v"
,
err
)
}
return
nil
})
}
// DestroyContext is used to destroy the context
...
...
@@ -338,6 +389,11 @@ func (r *AllocRunner) DestroyContext() error {
return
r
.
allocDir
.
Destroy
()
}
// GetAllocDir returns the alloc dir for the alloc runner
func
(
r
*
AllocRunner
)
GetAllocDir
()
*
allocdir
.
AllocDir
{
return
r
.
allocDir
}
// copyTaskStates returns a copy of the passed task states.
func
copyTaskStates
(
states
map
[
string
]
*
structs
.
TaskState
)
map
[
string
]
*
structs
.
TaskState
{
copy
:=
make
(
map
[
string
]
*
structs
.
TaskState
,
len
(
states
))
...
...
@@ -543,7 +599,7 @@ func (r *AllocRunner) Run() {
go
r
.
dirtySyncState
()
// Find the task group to run in the allocation
alloc
:=
r
.
a
lloc
alloc
:=
r
.
A
lloc
()
tg
:=
alloc
.
Job
.
LookupTaskGroup
(
alloc
.
TaskGroup
)
if
tg
==
nil
{
r
.
logger
.
Printf
(
"[ERR] client: alloc '%s' for missing task group '%s'"
,
alloc
.
ID
,
alloc
.
TaskGroup
)
...
...
@@ -629,6 +685,10 @@ OUTER:
for
_
,
tr
:=
range
runners
{
tr
.
Update
(
update
)
}
if
err
:=
r
.
syncStatus
();
err
!=
nil
{
r
.
logger
.
Printf
(
"[WARN] client: failed to sync status upon receiving alloc update: %v"
,
err
)
}
case
<-
r
.
destroyCh
:
taskDestroyEvent
=
structs
.
NewTaskEvent
(
structs
.
TaskKilled
)
break
OUTER
...
...
This diff is collapsed.
Click to expand it.
client/client.go
+
54
-
19
View file @
e22393ae
...
...
@@ -608,27 +608,56 @@ func (c *Client) restoreState() error {
return
nil
}
// XXX Needs to be updated and handle the upgrade case
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
// Allocs holds the IDs of the allocations being restored
var
allocs
[]
string
// Upgrading tracks whether this is a pre 0.6.0 upgrade path
var
upgrading
bool
// Scan the directory
list
,
err
:=
ioutil
.
ReadDir
(
filepath
.
Join
(
c
.
config
.
StateDir
,
"alloc"
))
if
err
!=
nil
&&
os
.
IsNotExist
(
err
)
{
return
nil
}
else
if
err
!=
nil
{
allocDir
:=
filepath
.
Join
(
c
.
config
.
StateDir
,
"alloc"
)
list
,
err
:=
ioutil
.
ReadDir
(
allocDir
)
if
err
!=
nil
&&
!
os
.
IsNotExist
(
err
)
{
return
fmt
.
Errorf
(
"failed to list alloc state: %v"
,
err
)
}
else
if
err
==
nil
&&
len
(
list
)
!=
0
{
upgrading
=
true
for
_
,
entry
:=
range
list
{
allocs
=
append
(
allocs
,
entry
.
Name
())
}
}
else
{
// Normal path
err
:=
c
.
stateDB
.
View
(
func
(
tx
*
bolt
.
Tx
)
error
{
allocs
,
err
=
getAllAllocationIDs
(
tx
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to list allocations: %v"
,
err
)
}
return
nil
})
if
err
!=
nil
{
return
err
}
}
// Load each alloc back
var
mErr
multierror
.
Error
for
_
,
entry
:=
range
list
{
id
:=
entry
.
Name
()
for
_
,
id
:=
range
allocs
{
alloc
:=
&
structs
.
Allocation
{
ID
:
id
}
c
.
configLock
.
RLock
()
ar
:=
NewAllocRunner
(
c
.
logger
,
c
.
configCopy
,
c
.
stateDB
,
c
.
updateAllocStatus
,
alloc
,
c
.
vaultClient
,
c
.
consulService
)
c
.
configLock
.
RUnlock
()
c
.
allocLock
.
Lock
()
c
.
allocs
[
id
]
=
ar
c
.
allocLock
.
Unlock
()
if
err
:=
ar
.
RestoreState
();
err
!=
nil
{
c
.
logger
.
Printf
(
"[ERR] client: failed to restore state for alloc %s: %v"
,
id
,
err
)
mErr
.
Errors
=
append
(
mErr
.
Errors
,
err
)
...
...
@@ -636,6 +665,14 @@ func (c *Client) restoreState() error {
go
ar
.
Run
()
}
}
// Delete all the entries
if
upgrading
{
if
err
:=
os
.
RemoveAll
(
allocDir
);
err
!=
nil
{
mErr
.
Errors
=
append
(
mErr
.
Errors
,
err
)
}
}
return
mErr
.
ErrorOrNil
()
}
...
...
@@ -653,10 +690,9 @@ func (c *Client) saveState(blocking bool) error {
runners
:=
c
.
getAllocRunners
()
wg
.
Add
(
len
(
runners
))
for
id
,
ar
:=
range
c
.
getAllocRunners
()
{
go
func
()
{
local
:=
ar
err
:=
local
.
SaveState
()
for
id
,
ar
:=
range
runners
{
go
func
(
id
string
,
ar
*
AllocRunner
)
{
err
:=
ar
.
SaveState
()
if
err
!=
nil
{
c
.
logger
.
Printf
(
"[ERR] client: failed to save state for alloc %s: %v"
,
id
,
err
)
l
.
Lock
()
...
...
@@ -664,7 +700,7 @@ func (c *Client) saveState(blocking bool) error {
l
.
Unlock
()
}
wg
.
Done
()
}()
}(
id
,
ar
)
}
if
blocking
{
...
...
@@ -1505,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Remove the old allocations
for
_
,
remove
:=
range
diff
.
removed
{
if
err
:=
c
.
removeAlloc
(
remove
);
err
!=
nil
{
c
.
logger
.
Printf
(
"[ERR] client: failed to remove alloc '%s': %v"
,
remove
.
ID
,
err
)
c
.
logger
.
Printf
(
"[ERR] client: failed to remove alloc '%s': %v"
,
remove
.
ID
,
err
)
}
}
...
...
@@ -1581,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) {
add
.
ID
,
err
)
}
}
// Persist our state
if
err
:=
c
.
saveState
(
false
);
err
!=
nil
{
c
.
logger
.
Printf
(
"[ERR] client: failed to save state: %v"
,
err
)
}
}
// blockForRemoteAlloc blocks until the previous allocation of an allocation has
...
...
@@ -1934,6 +1964,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
ar
:=
NewAllocRunner
(
c
.
logger
,
c
.
configCopy
,
c
.
stateDB
,
c
.
updateAllocStatus
,
alloc
,
c
.
vaultClient
,
c
.
consulService
)
ar
.
SetPreviousAllocDir
(
prevAllocDir
)
c
.
configLock
.
RUnlock
()
if
err
:=
ar
.
SaveState
();
err
!=
nil
{
c
.
logger
.
Printf
(
"[WARN] client: initial save state for alloc %q failed: %v"
,
alloc
.
ID
,
err
)
}
go
ar
.
Run
()
// Store the alloc runner.
...
...
This diff is collapsed.
Click to expand it.
client/state_database.go
+
118
-
23
View file @
e22393ae
...
...
@@ -55,24 +55,54 @@ func putData(bkt *bolt.Bucket, key, value []byte) error {
return
nil
}
func
getObject
(
bkt
*
bolt
.
Bucket
,
key
[]
byte
,
obj
interface
{})
error
{
// Get the data
data
:=
bkt
.
Get
(
key
)
if
data
==
nil
{
return
fmt
.
Errorf
(
"no data at key %v"
,
string
(
key
))
}
// Deserialize the object
if
err
:=
codec
.
NewDecoderBytes
(
data
,
structs
.
MsgpackHandle
)
.
Decode
(
obj
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to decode data into passed object: %v"
,
err
)
}
return
nil
}
// getAllocationBucket returns the bucket used to persist state about a
// particular allocation. If the root allocation bucket or the specific
// allocation bucket doesn't exist, it will be created.
// allocation bucket doesn't exist, it will be created as long as the
// transaction is writable.
func
getAllocationBucket
(
tx
*
bolt
.
Tx
,
allocID
string
)
(
*
bolt
.
Bucket
,
error
)
{
if
!
tx
.
Writable
()
{
return
nil
,
fmt
.
Errorf
(
"transaction must be writable"
)
}
var
err
error
w
:=
tx
.
Writable
()
// Retrieve the root allocations bucket
allocations
,
err
:=
tx
.
CreateBucketIfNotExists
(
allocationsBucket
)
if
err
!=
nil
{
return
nil
,
err
allocations
:=
tx
.
Bucket
(
allocationsBucket
)
if
allocations
==
nil
{
if
!
w
{
return
nil
,
fmt
.
Errorf
(
"Allocations bucket doesn't exist and transaction is not writable"
)
}
allocations
,
err
=
tx
.
CreateBucket
(
allocationsBucket
)
if
err
!=
nil
{
return
nil
,
err
}
}
// Retrieve the specific allocations bucket
alloc
,
err
:=
allocations
.
CreateBucketIfNotExists
([]
byte
(
allocID
))
if
err
!=
nil
{
return
nil
,
err
key
:=
[]
byte
(
allocID
)
alloc
:=
allocations
.
Bucket
(
key
)
if
alloc
==
nil
{
if
!
w
{
return
nil
,
fmt
.
Errorf
(
"Allocation bucket doesn't exist and transaction is not writable"
)
}
alloc
,
err
=
allocations
.
CreateBucket
(
key
)
if
err
!=
nil
{
return
nil
,
err
}
}
return
alloc
,
nil
...
...
@@ -80,29 +110,94 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
// getTaskBucket returns the bucket used to persist state about a
// particular task. If the root allocation bucket, the specific
// allocation or task bucket doesn't exist, they will be created.
// allocation or task bucket doesn't exist, they will be created as long as the
// transaction is writable.
func
getTaskBucket
(
tx
*
bolt
.
Tx
,
allocID
,
taskName
string
)
(
*
bolt
.
Bucket
,
error
)
{
alloc
,
err
:=
getAllocationBucket
(
tx
,
allocID
)
if
err
!=
nil
{
return
nil
,
err
}
// Retrieve the specific task bucket
w
:=
tx
.
Writable
()
key
:=
[]
byte
(
taskName
)
task
:=
alloc
.
Bucket
(
key
)
if
task
==
nil
{
if
!
w
{
return
nil
,
fmt
.
Errorf
(
"Task bucket doesn't exist and transaction is not writable"
)
}
task
,
err
=
alloc
.
CreateBucket
(
key
)
if
err
!=
nil
{
return
nil
,
err
}
}
return
task
,
nil
}
// deleteAllocationBucket is used to delete an allocation bucket if it exists.
func
deleteAllocationBucket
(
tx
*
bolt
.
Tx
,
allocID
string
)
error
{
if
!
tx
.
Writable
()
{
return
nil
,
fmt
.
Errorf
(
"transaction must be writable"
)
return
fmt
.
Errorf
(
"transaction must be writable"
)
}
// Retrieve the root allocations bucket
allocations
,
err
:=
tx
.
CreateBucketIfNotExists
(
allocationsBucket
)
if
err
!=
nil
{
return
nil
,
err
allocations
:=
tx
.
Bucket
(
allocationsBucket
)
if
allocations
==
nil
{
return
nil
}
// Check if the bucket exists
key
:=
[]
byte
(
allocID
)
if
allocBkt
:=
allocations
.
Bucket
(
key
);
allocBkt
==
nil
{
return
nil
}
return
allocations
.
DeleteBucket
(
key
)
}
// deleteTaskBucket is used to delete a task bucket if it exists.
func
deleteTaskBucket
(
tx
*
bolt
.
Tx
,
allocID
,
taskName
string
)
error
{
if
!
tx
.
Writable
()
{
return
fmt
.
Errorf
(
"transaction must be writable"
)
}
// Retrieve the root allocations bucket
allocations
:=
tx
.
Bucket
(
allocationsBucket
)
if
allocations
==
nil
{
return
nil
}
// Retrieve the specific allocations bucket
alloc
,
err
:=
allocations
.
CreateBucketIfNotExists
([]
byte
(
allocID
))
if
err
!
=
nil
{
return
nil
,
err
alloc
:=
allocations
.
Bucket
([]
byte
(
allocID
))
if
alloc
=
=
nil
{
return
nil
}
//
Retrieve the specific task bucket
task
,
err
:=
alloc
.
CreateBucketIfNotExists
(
[]
byte
(
taskName
)
)
if
err
!
=
nil
{
return
nil
,
err
//
Check if the bucket exists
key
:=
[]
byte
(
taskName
)
if
taskBkt
:=
alloc
.
Bucket
(
key
);
taskBkt
=
=
nil
{
return
nil
}
return
task
,
nil
return
alloc
.
DeleteBucket
(
key
)
}
func
getAllAllocationIDs
(
tx
*
bolt
.
Tx
)
([]
string
,
error
)
{
allocationsBkt
:=
tx
.
Bucket
(
allocationsBucket
)
if
allocationsBkt
==
nil
{
return
nil
,
nil
}
// Create a cursor for iteration.
var
allocIDs
[]
string
c
:=
allocationsBkt
.
Cursor
()
// Iterate over all the buckets
for
k
,
_
:=
c
.
First
();
k
!=
nil
;
k
,
_
=
c
.
Next
()
{
allocIDs
=
append
(
allocIDs
,
string
(
k
))
}
return
allocIDs
,
nil
}
This diff is collapsed.
Click to expand it.
client/task_runner.go
+
50
-
13
View file @
e22393ae
...
...
@@ -249,34 +249,61 @@ func (r *TaskRunner) WaitCh() <-chan struct{} {
return
r
.
waitCh
}
// stateFilePath returns the path to our state file
func
(
r
*
TaskRunner
)
stateFilePath
()
string
{
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func
(
r
*
TaskRunner
)
pre060StateFilePath
()
string
{
// Get the MD5 of the task name
hashVal
:=
md5
.
Sum
([]
byte
(
r
.
task
.
Name
))
hashHex
:=
hex
.
EncodeToString
(
hashVal
[
:
])
dirName
:=
fmt
.
Sprintf
(
"task-%s"
,
hashHex
)
// Generate the path
path
:=
filepath
.
Join
(
r
.
config
.
StateDir
,
"alloc"
,
r
.
alloc
.
ID
,
dirName
,
"state.json"
)
return
path
return
filepath
.
Join
(
r
.
config
.
StateDir
,
"alloc"
,
r
.
alloc
.
ID
,
dirName
,
"state.json"
)
}
// RestoreState is used to restore our state
func
(
r
*
TaskRunner
)
RestoreState
()
error
{
// XXX needs to be updated and handle the upgrade path
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
// Load the snapshot
var
snap
taskRunnerState
if
err
:=
restoreState
(
r
.
stateFilePath
(),
&
snap
);
err
!=
nil
{
// Check if the old snapshot is there
oldPath
:=
r
.
pre060StateFilePath
()
if
err
:=
pre060RestoreState
(
oldPath
,
&
snap
);
err
==
nil
{
// Delete the old state
os
.
RemoveAll
(
oldPath
)
}
else
if
!
os
.
IsNotExist
(
err
)
{
// Something corrupt in the old state file
return
err
}
else
{
// We are doing a normal restore
err
:=
r
.
stateDB
.
View
(
func
(
tx
*
bolt
.
Tx
)
error
{
bkt
,
err
:=
getTaskBucket
(
tx
,
r
.
alloc
.
ID
,
r
.
task
.
Name
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to get task bucket: %v"
,
err
)
}
if
err
:=
getObject
(
bkt
,
taskRunnerStateAllKey
,
&
snap
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to read task runner state: %v"
,
err
)
}
return
nil
})
if
err
!=
nil
{
return
err
}
}
// Restore fields
// Restore fields
from the snapshot
r
.
artifactsDownloaded
=
snap
.
ArtifactDownloaded
r
.
taskDirBuilt
=
snap
.
TaskDirBuilt
r
.
payloadRendered
=
snap
.
PayloadRendered
r
.
setCreatedResources
(
snap
.
CreatedResources
)
if
err
:=
r
.
setTaskEnv
();
err
!=
nil
{
...
...
@@ -333,14 +360,13 @@ func (r *TaskRunner) RestoreState() error {
r
.
running
=
true
r
.
runningLock
.
Unlock
()
}
return
nil
}
// SaveState is used to snapshot our state
func
(
r
*
TaskRunner
)
SaveState
()
error
{
// XXX needs to be updated
r
.
persistLock
.
Lock
()
snap
:=
taskRunnerState
{
Version
:
r
.
config
.
Version
,
ArtifactDownloaded
:
r
.
artifactsDownloaded
,
...
...
@@ -355,6 +381,7 @@ func (r *TaskRunner) SaveState() error {
}
r
.
handleLock
.
Unlock
()
// If nothing has changed avoid the write
h
:=
snap
.
Hash
()
if
bytes
.
Equal
(
h
,
r
.
persistedHash
)
{
r
.
persistLock
.
Unlock
()
...
...
@@ -394,11 +421,21 @@ func (r *TaskRunner) DestroyState() error {
r
.
persistLock
.
Lock
()
defer
r
.
persistLock
.
Unlock
()
return
os
.
RemoveAll
(
r
.
stateFilePath
())
return
r
.
stateDB
.
Update
(
func
(
tx
*
bolt
.
Tx
)
error
{
if
err
:=
deleteTaskBucket
(
tx
,
r
.
alloc
.
ID
,
r
.
task
.
Name
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to delete task bucket: %v"
,
err
)
}
return
nil
})
}
// setState is used to update the state of the task runner
func
(
r
*
TaskRunner
)
setState
(
state
string
,
event
*
structs
.
TaskEvent
)
{
// Persist our state to disk.
if
err
:=
r
.
SaveState
();
err
!=
nil
{
r
.
logger
.
Printf
(
"[ERR] client: failed to save state of Task Runner for task %q: %v"
,
r
.
task
.
Name
,
err
)
}
// Indicate the task has been updated.
r
.
updater
(
r
.
task
.
Name
,
state
,
event
)
}
...
...
This diff is collapsed.
Click to expand it.
client/util.go
+
4
-
6
View file @
e22393ae
...
...
@@ -106,14 +106,12 @@ func persistState(path string, data interface{}) error {
return
nil
}
// restoreState is used to read back in the persisted state
func
restoreState
(
path
string
,
data
interface
{})
error
{
// pre060RestoreState is used to read back in the persisted state for pre v0.6.0
// state
func
pre060RestoreState
(
path
string
,
data
interface
{})
error
{
buf
,
err
:=
ioutil
.
ReadFile
(
path
)
if
err
!=
nil
{
if
os
.
IsNotExist
(
err
)
{
return
nil
}
return
fmt
.
Errorf
(
"failed to read state: %v"
,
err
)
return
err
}
if
err
:=
json
.
Unmarshal
(
buf
,
data
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to decode state: %v"
,
err
)
...
...
This diff is collapsed.
Click to expand it.
demo/vagrant/README.md
+
0
-
0
View file @
e22393ae
No preview for this file type
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment
Menu
Projects
Groups
Snippets
Help