Unverified Commit 61a3b73d authored by Mahmood Ali's avatar Mahmood Ali Committed by GitHub
Browse files

drivers: Capture exit code when task is killed (#10494)

This commit ensures Nomad captures the task code more reliably even when the task is killed. This issue affect to `raw_exec` driver, as noted in https://github.com/hashicorp/nomad/issues/10430 .

We fix this issue by ensuring that the TaskRunner only calls `driver.WaitTask` once. The TaskRunner monitors the completion of the task by calling `driver.WaitTask` which should return the task exit code on completion. However, it also could return a "context canceled" error if the agent/executor is shutdown.

Previously, when a task is to be stopped, the killTask path makes two WaitTask calls, and the second returns "context canceled" occasionally because of a "race" in task shutting down and depending on driver, and how fast it shuts down after task completes.

By having a single WaitTask call and consistently waiting for the task, we ensure we capture the exit code reliably before the executor is shutdown or the contexts expired.

I opted to chang...
Showing with 91 additions and 20 deletions
+91 -20
......@@ -36,7 +36,7 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
}
// Kill the task using an exponential backoff in-case of failures.
if err := tr.killTask(handle); err != nil {
if _, err := tr.killTask(handle, waitCh); err != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err)
}
......
......@@ -569,7 +569,7 @@ MAIN:
case <-tr.killCtx.Done():
// We can go through the normal should restart check since
// the restart tracker knowns it is killed
result = tr.handleKill()
result = tr.handleKill(resultCh)
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
......@@ -616,7 +616,7 @@ MAIN:
// that should be terminal, so if the handle still exists we should
// kill it here.
if tr.getDriverHandle() != nil {
if result = tr.handleKill(); result != nil {
if result = tr.handleKill(nil); result != nil {
tr.emitExitResultEvent(result)
}
......@@ -883,7 +883,7 @@ func (tr *TaskRunner) initDriver() error {
// handleKill is used to handle the a request to kill a task. It will return
// the handle exit result if one is available and store any error in the task
// runner killErr value.
func (tr *TaskRunner) handleKill() *drivers.ExitResult {
func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.ExitResult {
// Run the pre killing hooks
tr.preKill()
......@@ -892,7 +892,12 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
// before waiting to kill task
if delay := tr.Task().ShutdownDelay; delay != 0 {
tr.logger.Debug("waiting before killing task", "shutdown_delay", delay)
time.Sleep(delay)
select {
case result := <-resultCh:
return result
case <-time.After(delay):
}
}
// Tell the restart tracker that the task has been killed so it doesn't
......@@ -900,35 +905,48 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
tr.restartTracker.SetKilled()
// Check it is running
select {
case result := <-resultCh:
return result
default:
}
handle := tr.getDriverHandle()
if handle == nil {
return nil
}
// Kill the task using an exponential backoff in-case of failures.
killErr := tr.killTask(handle)
result, killErr := tr.killTask(handle, resultCh)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
tr.setKillErr(killErr)
}
if result != nil {
return result
}
// Block until task has exited.
waitCh, err := handle.WaitCh(tr.shutdownCtx)
if resultCh == nil {
var err error
resultCh, err = handle.WaitCh(tr.shutdownCtx)
// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
return nil
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
tr.setKillErr(killErr)
return nil
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
tr.setKillErr(killErr)
return nil
}
select {
case result := <-waitCh:
case result := <-resultCh:
return result
case <-tr.shutdownCtx.Done():
return nil
......@@ -938,14 +956,14 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
// killTask kills the task handle. In the case that killing fails,
// killTask will retry with an exponential backoff and will give up at a
// given limit. Returns an error if the task could not be killed.
func (tr *TaskRunner) killTask(handle *DriverHandle) error {
func (tr *TaskRunner) killTask(handle *DriverHandle, resultCh <-chan *drivers.ExitResult) (*drivers.ExitResult, error) {
// Cap the number of times we attempt to kill the task.
var err error
for i := 0; i < killFailureLimit; i++ {
if err = handle.Kill(); err != nil {
if err == drivers.ErrTaskNotFound {
tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID())
return nil
return nil, nil
}
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
......@@ -954,13 +972,17 @@ func (tr *TaskRunner) killTask(handle *DriverHandle) error {
}
tr.logger.Error("failed to kill task", "backoff", backoff, "error", err)
time.Sleep(backoff)
select {
case result := <-resultCh:
return result, nil
case <-time.After(backoff):
}
} else {
// Kill was successful
return nil
return nil, nil
}
}
return err
return nil, err
}
// persistLocalState persists local state to disk synchronously.
......
......@@ -202,6 +202,55 @@ func TestTaskRunner_BuildTaskConfig_CPU_Memory(t *testing.T) {
}
}
// TestTaskRunner_Stop_ExitCode asserts that the exit code is captured on a task, even if it's stopped
func TestTaskRunner_Stop_ExitCode(t *testing.T) {
ctestutil.ExecCompatible(t)
t.Parallel()
alloc := mock.BatchAlloc()
alloc.Job.TaskGroups[0].Count = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
task.KillSignal = "SIGTERM"
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"1000"},
}
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
defer cleanup()
// Run the first TaskRunner
tr, err := NewTaskRunner(conf)
require.NoError(t, err)
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
// Wait for it to be running
testWaitForTaskToStart(t, tr)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = tr.Kill(ctx, structs.NewTaskEvent("shutdown"))
require.NoError(t, err)
var exitEvent *structs.TaskEvent
state := tr.TaskState()
for _, e := range state.Events {
if e.Type == structs.TaskTerminated {
exitEvent = e
break
}
}
require.NotNilf(t, exitEvent, "exit event not found: %v", state.Events)
require.Equal(t, 143, exitEvent.ExitCode)
require.Equal(t, 15, exitEvent.Signal)
}
// TestTaskRunner_Restore_Running asserts restoring a running task does not
// rerun the task.
func TestTaskRunner_Restore_Running(t *testing.T) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment