Commit 223e9e12 authored by Alex Dadgar's avatar Alex Dadgar
Browse files

Fix followers not creating periodic launch

Fix an issue in which periodic launches wouldn't be made on followers.
parent 82cbb70b
Showing with 181 additions and 69 deletions
+181 -69
......@@ -351,8 +351,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
added, err := n.periodicDispatcher.Add(req.Job)
if err != nil {
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}
......@@ -360,12 +359,12 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// Create a watch set
ws := memdb.NewWatchSet()
// If it is periodic, record the time it was inserted. This is necessary for
// recovering during leader election. It is possible that from the time it
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if added {
// If it is an active periodic job, record the time it was inserted. This is
// necessary for recovering during leader election. It is possible that from
// the time it is added to when it was suppose to launch, leader election
// occurs and the job was not launched. In this case, we use the insertion
// time to determine if a launch was missed.
if req.Job.IsPeriodicActive() {
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
......
......@@ -331,6 +331,65 @@ func TestFSM_RegisterJob(t *testing.T) {
}
}
func TestFSM_RegisterPeriodicJob_NonLeader(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Disable the dispatcher
fsm.periodicDispatcher.SetEnabled(false)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("not found!")
}
if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", jobOut.CreateIndex)
}
// Verify it wasn't added to the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job added to periodic runner")
}
// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}
func TestFSM_RegisterJob_BadNamespace(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
......
......@@ -360,14 +360,12 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}
added, err := s.periodicDispatcher.Add(job)
if err != nil {
if err := s.periodicDispatcher.Add(job); err != nil {
return err
}
// We did not add the job to the tracker, this can be for a variety of
// reasons, but it means that we do not need to force run it.
if !added {
// We do not need to force run the job since it isn't active.
if !job.IsPeriodicActive() {
continue
}
......@@ -375,9 +373,13 @@ func (s *Server) restorePeriodicDispatcher() error {
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || launch == nil {
if err != nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
if launch == nil {
return fmt.Errorf("no recorded periodic launch time for job %q in namespace %q",
job.ID, job.Namespace)
}
// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
......
......@@ -192,18 +192,18 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return false, nil
return nil
}
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()
disabled := !job.IsPeriodicActive()
tuple := structs.NamespacedID{
ID: job.ID,
......@@ -216,7 +216,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
}
// If the job is disabled and we aren't tracking it, do nothing.
return false, nil
return nil
}
// Add or update the job.
......@@ -224,12 +224,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if tracked {
if err := p.heap.Update(job, next); err != nil {
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
} else {
if err := p.heap.Push(job, next); err != nil {
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
}
......@@ -240,7 +240,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
default:
}
return true, nil
return nil
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
......
......@@ -77,7 +77,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
job := mock.PeriodicJob()
job.Periodic.ProhibitOverlap = true // Shouldn't affect anything.
assert.Nil(state.UpsertJob(100, job))
_, err := s1.periodicDispatcher.Add(job)
err := s1.periodicDispatcher.Add(job)
assert.Nil(err)
// Force launch it.
......
......@@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) {
// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
......@@ -130,10 +130,8 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.Job()
if added, err := p.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
} else if added {
t.Fatalf("Add of non-periodic job happened, expect no-op")
}
tracked := p.Tracked()
......@@ -147,8 +145,8 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add of periodic parameterized job failed: %v", err)
}
tracked := p.Tracked()
......@@ -162,8 +160,8 @@ func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.Stop = true
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of stopped periodic job failed: %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add of stopped periodic job failed: %v", err)
}
tracked := p.Tracked()
......@@ -176,8 +174,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
......@@ -187,8 +185,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
// Update the job and add it again.
job.Periodic.Spec = "foo"
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed: %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
......@@ -208,13 +206,9 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) {
job := mock.PeriodicJob()
job2 := mock.PeriodicJob()
job2.Namespace = "test"
added, err := p.Add(job)
assert.Nil(err)
assert.True(added)
assert.Nil(p.Add(job))
added, err = p.Add(job2)
assert.Nil(err)
assert.True(added)
assert.Nil(p.Add(job2))
assert.Len(p.Tracked(), 2)
......@@ -227,8 +221,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
......@@ -238,8 +232,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
// Update the job to be non-periodic and add it again.
job.Periodic = nil
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
......@@ -256,15 +250,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Update it to be sooner and re-add.
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Check that nothing is created.
......@@ -304,8 +298,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
......@@ -331,8 +325,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(1 * time.Second))
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Remove the job.
......@@ -370,8 +364,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// ForceRun the job
......@@ -402,8 +396,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) {
job.Periodic.ProhibitOverlap = true
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
......@@ -431,8 +425,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
job := testPeriodicJob(launch1, launch2)
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
......@@ -463,11 +457,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
job2 := testPeriodicJob(launch)
// Add them.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
if l := len(p.Tracked()); l != 2 {
......@@ -503,11 +497,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) {
job2.Namespace = "test"
// Add them.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
if l := len(p.Tracked()); l != 2 {
......@@ -587,8 +581,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
shuffle(toDelete)
for _, job := range jobs {
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
}
......
......@@ -1939,6 +1939,12 @@ func (j *Job) IsPeriodic() bool {
return j.Periodic != nil
}
// IsPeriodicActive returns whether the job is an active periodic job that will
// create child jobs
func (j *Job) IsPeriodicActive() bool {
return j.IsPeriodic() && j.Periodic.Enabled && !j.Stopped() && !j.IsParameterized()
}
// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil
......
......@@ -637,6 +637,58 @@ func TestJob_IsPeriodic(t *testing.T) {
}
}
func TestJob_IsPeriodicActive(t *testing.T) {
cases := []struct {
job *Job
active bool
}{
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: true,
},
},
active: true,
},
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: false,
},
},
active: false,
},
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: true,
},
Stop: true,
},
active: false,
},
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: false,
},
ParameterizedJob: &ParameterizedJobConfig{},
},
active: false,
},
}
for i, c := range cases {
if act := c.job.IsPeriodicActive(); act != c.active {
t.Fatalf("case %d failed: got %v; want %v", i, act, c.active)
}
}
}
func TestJob_SystemJob_Validate(t *testing.T) {
j := testJob()
j.Type = JobTypeSystem
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment