Commit c55f3ed0 authored by Mahmood Ali's avatar Mahmood Ali
Browse files

per-task restart policy

parent 10bdc6f3
Showing with 300 additions and 40 deletions
+300 -40
......@@ -157,9 +157,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
......@@ -222,9 +223,10 @@ func TestJobs_Canonicalize(t *testing.T) {
},
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultBatchJobRestartPolicy(),
},
},
},
......@@ -316,10 +318,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
......@@ -363,6 +366,10 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
// inherit other values from TG
Attempts: intToPtr(20),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
......@@ -486,6 +493,12 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
Interval: timeToPtr(5 * time.Minute),
Attempts: intToPtr(20),
Delay: timeToPtr(25 * time.Second),
Mode: stringToPtr("delay"),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
......@@ -712,10 +725,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
......@@ -753,12 +767,187 @@ func TestJobs_Canonicalize(t *testing.T) {
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
},
},
},
{
name: "restart_merge",
input: &Job{
Name: stringToPtr("foo"),
ID: stringToPtr("bar"),
ParentID: stringToPtr("lol"),
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
},
},
},
},
{
Name: stringToPtr("baz"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
},
},
},
},
},
expected: &Job{
Namespace: stringToPtr(DefaultNamespace),
ID: stringToPtr("bar"),
Name: stringToPtr("foo"),
Region: stringToPtr("global"),
Type: stringToPtr("service"),
ParentID: stringToPtr("lol"),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
ConsulToken: stringToPtr(""),
VaultToken: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
{
Name: stringToPtr("baz"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
......
......@@ -453,9 +453,6 @@ func (g *TaskGroup) Canonicalize(job *Job) {
if g.Scaling != nil {
g.Scaling.Canonicalize(*g.Count)
}
for _, t := range g.Tasks {
t.Canonicalize(g, job)
}
if g.EphemeralDisk == nil {
g.EphemeralDisk = DefaultEphemeralDisk()
} else {
......@@ -515,23 +512,9 @@ func (g *TaskGroup) Canonicalize(job *Job) {
var defaultRestartPolicy *RestartPolicy
switch *job.Type {
case "service", "system":
// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultServiceJobRestartPolicy()
default:
// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultBatchJobRestartPolicy()
}
if g.RestartPolicy != nil {
......@@ -539,6 +522,10 @@ func (g *TaskGroup) Canonicalize(job *Job) {
}
g.RestartPolicy = defaultRestartPolicy
for _, t := range g.Tasks {
t.Canonicalize(g, job)
}
for _, spread := range g.Spreads {
spread.Canonicalize()
}
......@@ -553,6 +540,28 @@ func (g *TaskGroup) Canonicalize(job *Job) {
}
}
// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
func defaultServiceJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
}
// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
func defaultBatchJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
}
// Constrain is used to add a constraint to a task group.
func (g *TaskGroup) Constrain(c *Constraint) *TaskGroup {
g.Constraints = append(g.Constraints, c)
......@@ -645,6 +654,7 @@ type Task struct {
Env map[string]string
Services []*Service
Resources *Resources
RestartPolicy *RestartPolicy
Meta map[string]string
KillTimeout *time.Duration `mapstructure:"kill_timeout"`
LogConfig *LogConfig `mapstructure:"logs"`
......@@ -697,6 +707,14 @@ func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
if t.CSIPluginConfig != nil {
t.CSIPluginConfig.Canonicalize()
}
if t.RestartPolicy == nil {
t.RestartPolicy = tg.RestartPolicy
} else {
tgrp := &RestartPolicy{}
*tgrp = *tg.RestartPolicy
tgrp.Merge(t.RestartPolicy)
t.RestartPolicy = tgrp
}
}
// TaskArtifact is used to download artifacts before running a task.
......
......@@ -340,12 +340,16 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tr.taskResources = tres
// Build the restart tracker.
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
if tg == nil {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
rp := config.Task.RestartPolicy
if rp == nil {
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
if tg == nil {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
}
rp = tg.RestartPolicy
}
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type, config.Task.Lifecycle)
tr.restartTracker = restarts.NewRestartTracker(rp, tr.alloc.Job.Type, config.Task.Lifecycle)
// Get the driver
if err := tr.initDriver(); err != nil {
......
......@@ -906,6 +906,15 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
structsTask.Affinities = ApiAffinitiesToStructs(apiTask.Affinities)
structsTask.CSIPluginConfig = ApiCSIPluginConfigToStructsCSIPluginConfig(apiTask.CSIPluginConfig)
if apiTask.RestartPolicy != nil {
structsTask.RestartPolicy = &structs.RestartPolicy{
Attempts: *apiTask.RestartPolicy.Attempts,
Interval: *apiTask.RestartPolicy.Interval,
Delay: *apiTask.RestartPolicy.Delay,
Mode: *apiTask.RestartPolicy.Mode,
}
}
if l := len(apiTask.VolumeMounts); l != 0 {
structsTask.VolumeMounts = make([]*structs.VolumeMount, l)
for i, mount := range apiTask.VolumeMounts {
......
......@@ -1674,7 +1674,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Weight: helper.Int8ToPtr(50),
},
},
RestartPolicy: &api.RestartPolicy{
Interval: helper.TimeToPtr(2 * time.Second),
Attempts: helper.IntToPtr(10),
Delay: helper.TimeToPtr(20 * time.Second),
Mode: helper.StringToPtr("delay"),
},
Services: []*api.Service{
{
Id: "id",
......@@ -2023,6 +2028,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Env: map[string]string{
"hello": "world",
},
RestartPolicy: &structs.RestartPolicy{
Interval: 2 * time.Second,
Attempts: 10,
Delay: 20 * time.Second,
Mode: "delay",
},
Services: []*structs.Service{
{
Name: "serviceA",
......@@ -2375,6 +2386,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
RestartPolicy: &structs.RestartPolicy{
Interval: 1 * time.Second,
Attempts: 5,
Delay: 10 * time.Second,
Mode: "delay",
},
Meta: map[string]string{
"lol": "code",
},
......
......@@ -66,6 +66,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) {
"logs",
"meta",
"resources",
"restart",
"service",
"shutdown_delay",
"template",
......@@ -94,6 +95,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) {
delete(m, "logs")
delete(m, "meta")
delete(m, "resources")
delete(m, "restart")
delete(m, "service")
delete(m, "template")
delete(m, "vault")
......@@ -215,6 +217,13 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) {
t.Resources = &r
}
// Parse restart policy
if o := listVal.Filter("restart"); len(o.Items) > 0 {
if err := parseRestartPolicy(&t.RestartPolicy, o); err != nil {
return nil, multierror.Prefix(err, "restart ->")
}
}
// If we have logs then parse that
if o := listVal.Filter("logs"); len(o.Items) > 0 {
if len(o.Items) > 1 {
......
......@@ -236,6 +236,9 @@ func TestParse(t *testing.T) {
Weight: helper.Int8ToPtr(25),
},
},
RestartPolicy: &api.RestartPolicy{
Attempts: helper.IntToPtr(10),
},
Services: []*api.Service{
{
Tags: []string{"foo", "bar"},
......
......@@ -178,6 +178,10 @@ job "binstore-storagelocker" {
destination = "/mnt/foo"
}
restart {
attempts = 10
}
logs {
max_files = 14
max_file_size = 101
......
......@@ -5095,7 +5095,7 @@ type TaskGroup struct {
// Scaling is the list of autoscaling policies for the TaskGroup
Scaling *ScalingPolicy
//RestartPolicy of a TaskGroup
// RestartPolicy of a TaskGroup
RestartPolicy *RestartPolicy
// Tasks are the collection of tasks that this task group needs to run
......@@ -5744,6 +5744,9 @@ type Task struct {
// Resources is the resources needed by this task
Resources *Resources
// RestartPolicy of a TaskGroup
RestartPolicy *RestartPolicy
// DispatchPayload configures how the task retrieves its input from a dispatch
DispatchPayload *DispatchPayloadConfig
......@@ -5884,6 +5887,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) {
t.Resources.Canonicalize()
}
if t.RestartPolicy == nil {
t.RestartPolicy = tg.RestartPolicy
}
// Set the default timeout if it is not specified.
if t.KillTimeout == 0 {
t.KillTimeout = DefaultKillTimeout
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment