Commit 1957d726 authored by Alex Dadgar's avatar Alex Dadgar
Browse files

Job endpoint handles periodic jobs

parent 326bc4d4
Branches unavailable v1.4.3 v1.4.2 v1.4.1 v1.4.0 v1.4.0-rc.1 v1.4.0-beta.1 v1.3.8 v1.3.7 v1.3.6 v1.3.5 v1.3.4 v1.3.3 v1.3.2 v1.3.1 v1.3.0 v1.3.0-rc.1 v1.3.0-beta.1 v1.2.15 v1.2.14 v1.2.13 v1.2.12 v1.2.11 v1.2.10 v1.2.9 v1.2.8 v1.2.7 v1.2.6 v1.2.5 v1.2.4 v1.2.3 v1.2.2 v1.2.1 v1.2.0 v1.2.0-rc1 v1.2.0-beta1 v1.1.18 v1.1.17 v1.1.16 v1.1.15 v1.1.14 v1.1.13 v1.1.12 v1.1.11 v1.1.10 v1.1.9 v1.1.8 v1.1.7 v1.1.6 v1.1.5 v1.1.4 v1.1.3 v1.1.2 v1.1.1 v1.1.0 v1.1.0-rc1 v1.1.0-beta1 v1.0.18 v1.0.17 v1.0.16 v1.0.15 v1.0.14 v1.0.13 v1.0.12 v1.0.11 v1.0.10 v1.0.9 v1.0.8 v1.0.7 v1.0.6 v1.0.5 v1.0.4 v1.0.3 v1.0.2 v1.0.1 v1.0.0 v1.0.0-rc1 v1.0.0-beta3 v1.0.0-beta2 v0.12.12 v0.12.11 v0.12.10 v0.12.9 v0.12.8 v0.12.7 v0.12.6 v0.12.5 v0.12.4 v0.12.4-rc1 v0.12.3 v0.12.2 v0.12.1 v0.12.0 v0.12.0-rc1 v0.12.0-beta2 v0.12.0-beta1 v0.11.8 v0.11.7 v0.11.6 v0.11.5 v0.11.4 v0.11.3 v0.11.2 v0.11.1 v0.11.0 v0.11.0-rc1 v0.11.0-beta2 v0.11.0-beta1 v0.10.9 v0.10.8 v0.10.7 v0.10.6 v0.10.5 v0.10.4 v0.10.4-rc1 v0.10.3 v0.10.2 v0.10.2-rc1 v0.10.1 v0.10.0 v0.10.0-rc1 v0.10.0-connect1 v0.10.0-beta1 v0.9.7 v0.9.6 v0.9.5 v0.9.4 v0.9.4-rc1 v0.9.3 v0.9.2 v0.9.2-rc1 v0.9.1 v0.9.1-rc1 v0.9.0 v0.9.0-rc2 v0.9.0-rc1 v0.9.0-beta3 v0.9.0-beta2 v0.9.0-beta1 v0.8.7 v0.8.7-rc1 v0.8.6 v0.8.5 v0.8.4 v0.8.4-rc1 v0.8.3 v0.8.2 v0.8.1 v0.8.0 v0.8.0-rc1 v0.7.1 v0.7.1-rc1 v0.7.1-rc1+pro v0.7.1-rc1+ent v0.7.0 v0.7.0+pro v0.7.0+ent v0.7.0-rc3 v0.7.0-rc2 v0.7.0-rc1 v0.7.0-beta1 v0.6.3 v0.6.3-rc1 v0.6.2 v0.6.1 v0.6.0 v0.6.0-rc2 v0.6.0-rc1 v0.5.6 v0.5.6-rc1 v0.5.5 v0.5.5-rc2 v0.5.5-rc1 v0.5.4 v0.5.3 v0.5.3-rc1 v0.5.2 v0.5.2-rc1 v0.5.1 v0.5.1-rc2 v0.5.1-rc1 v0.5.0 v0.5.0-rc2 v0.5.0-rc1 v0.4.3 v0.4.2 v0.4.1 v0.4.1-rc1 v0.4.0 v0.4.0-rc2 v0.4.0-rc1 v0.3.2 v0.3.2-rc2 v0.3.2-rc1 v0.3.1 v0.3.0 v0.3.0-rc2 v0.3rc1 nightly
No related merge requests found
Showing with 178 additions and 4 deletions
+178 -4
......@@ -49,6 +49,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if args.Job.IsPeriodic() {
return nil
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
......@@ -73,10 +81,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
// Setup the reply
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = index
reply.Index = evalIndex
return nil
}
......@@ -116,6 +123,10 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
return fmt.Errorf("job not found")
}
if job.IsPeriodic() {
return fmt.Errorf("can't evaluate periodic job")
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
......@@ -153,6 +164,24 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
if job == nil {
return fmt.Errorf("job not found")
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
......@@ -160,6 +189,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if job.IsPeriodic() {
return nil
}
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
......@@ -185,10 +222,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
// Setup the reply
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = index
reply.Index = evalIndex
return nil
}
......
......@@ -233,6 +233,55 @@ func TestJobEndpoint_Register_GC_Set(t *testing.T) {
}
}
func TestJobEndpoint_Register_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request for a periodic job.
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
serviceName := out.TaskGroups[0].Tasks[0].Services[0].Name
expectedServiceName := "web-frontend"
if serviceName != expectedServiceName {
t.Fatalf("Expected Service Name: %s, Actual: %s", expectedServiceName, serviceName)
}
if resp.EvalID != "" {
t.Fatalf("Register created an eval for a periodic job")
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
......@@ -304,6 +353,44 @@ func TestJobEndpoint_Evaluate(t *testing.T) {
}
}
func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Force a re-evaluation
reEval := &structs.JobEvaluateRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp); err == nil {
t.Fatal("expect an err")
}
}
func TestJobEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
......@@ -380,6 +467,57 @@ func TestJobEndpoint_Deregister(t *testing.T) {
}
}
func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
reg := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Deregister
dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.JobDeregisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected job")
}
if resp.EvalID != "" {
t.Fatalf("Deregister created an eval for a periodic job")
}
}
func TestJobEndpoint_GetJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment