Commit 404626cd authored by Alex Dadgar's avatar Alex Dadgar Committed by GitHub
Browse files

Merge pull request #2575 from hashicorp/f-job-revert

Job revert
parents af5a6787 8d01a4b8
Showing with 458 additions and 22 deletions
+458 -22
......@@ -52,7 +52,7 @@ func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *Write
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
var resp JobRegisterResponse
req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
......@@ -65,7 +65,7 @@ func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
var resp JobRegisterResponse
req := &RegisterJobRequest{
Job: job,
......@@ -153,7 +153,7 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query
// is deregistered and purged from the system versus still being queryable and
// eventually GC'ed from the system. Most callers should not specify purge.
func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) {
var resp deregisterJobResponse
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", jobID, purge), &resp, q)
if err != nil {
return "", nil, err
......@@ -163,7 +163,7 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W
// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
var resp JobRegisterResponse
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", nil, &resp, q)
if err != nil {
return "", nil, err
......@@ -223,6 +223,25 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}
// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
func (j *Jobs) Revert(jobID string, version uint64, enforcePriorVersion *uint64,
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
var resp JobRegisterResponse
req := &JobRevertRequest{
JobID: jobID,
JobVersion: version,
EnforcePriorVersion: enforcePriorVersion,
}
wm, err := j.client.write("/v1/job/"+jobID+"/revert", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
......@@ -539,6 +558,21 @@ type JobValidateResponse struct {
Error string
}
// JobRevertRequest is used to revert a job to a prior version.
type JobRevertRequest struct {
// JobID is the ID of the job being reverted
JobID string
// JobVersion the version to revert to.
JobVersion uint64
// EnforcePriorVersion if set will enforce that the job is at the given
// version before reverting.
EnforcePriorVersion *uint64
WriteRequest
}
// JobUpdateRequest is used to update a job
type JobRegisterRequest struct {
Job *Job
......@@ -558,14 +592,20 @@ type RegisterJobRequest struct {
JobModifyIndex uint64 `json:",omitempty"`
}
// registerJobResponse is used to deserialize a job response
type registerJobResponse struct {
EvalID string
// JobRegisterResponse is used to respond to a job registration
type JobRegisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}
// deregisterJobResponse is used to decode a deregister response
type deregisterJobResponse struct {
EvalID string
// JobDeregisterResponse is used to respond to a job deregistration
type JobDeregisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}
type JobPlanRequest struct {
......
......@@ -487,6 +487,54 @@ func TestJobs_EnforceRegister(t *testing.T) {
assertWriteMeta(t, wm)
}
func TestJobs_Revert(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
// Register twice
job := testJob()
eval, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
eval, wm, err = jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
// Fail revert at incorrect enforce
_, wm, err = jobs.Revert(*job.ID, 0, helper.Uint64ToPtr(10), nil)
if err == nil || !strings.Contains(err.Error(), "enforcing version") {
t.Fatalf("expected enforcement error: %v", err)
}
// Works at correct index
revertResp, wm, err := jobs.Revert(*job.ID, 0, helper.Uint64ToPtr(1), nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if revertResp.EvalID == "" {
t.Fatalf("missing eval id")
}
if revertResp.EvalCreateIndex == 0 {
t.Fatalf("bad eval create index")
}
if revertResp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index")
}
assertWriteMeta(t, wm)
}
func TestJobs_Info(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
......
......@@ -67,6 +67,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/versions"):
jobName := strings.TrimSuffix(path, "/versions")
return s.jobVersions(resp, req, jobName)
case strings.HasSuffix(path, "/revert"):
jobName := strings.TrimSuffix(path, "/revert")
return s.jobRevert(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
......@@ -360,6 +363,35 @@ func (s *HTTPServer) jobVersions(resp http.ResponseWriter, req *http.Request,
return out.Versions, nil
}
func (s *HTTPServer) jobRevert(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}
var revertRequest structs.JobRevertRequest
if err := decodeBody(req, &revertRequest); err != nil {
return nil, CodedError(400, err.Error())
}
if revertRequest.JobID == "" {
return nil, CodedError(400, "JobID must be specified")
}
if revertRequest.JobID != jobName {
return nil, CodedError(400, "Job ID does not match")
}
s.parseRegion(req, &revertRequest.Region)
var out structs.JobRegisterResponse
if err := s.agent.RPC("Job.Revert", &revertRequest, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out, nil
}
func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,
......
......@@ -789,6 +789,56 @@ func TestHTTP_JobDispatch(t *testing.T) {
})
}
func TestHTTP_JobRevert(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job and register it twice
job := mock.Job()
regReq := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var regResp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}
args := structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf := encodeReq(args)
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/revert", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the response
revertResp := obj.(structs.JobRegisterResponse)
if revertResp.EvalID == "" {
t.Fatalf("bad: %v", revertResp)
}
// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
})
}
func TestJobs_ApiJobToStructsJob(t *testing.T) {
apiJob := &api.Job{
Stop: helper.BoolToPtr(true),
......
......@@ -286,8 +286,8 @@ func (j *Job) Summary(args *structs.JobSummaryRequest,
}
// Validate validates a job
func (j *Job) Validate(args *structs.JobValidateRequest,
reply *structs.JobValidateResponse) error {
func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValidateResponse) error {
defer metrics.MeasureSince([]string{"nomad", "job", "validate"}, time.Now())
if err := validateJob(args.Job); err != nil {
if merr, ok := err.(*multierror.Error); ok {
......@@ -300,10 +300,69 @@ func (j *Job) Validate(args *structs.JobValidateRequest,
reply.Error = err.Error()
}
}
reply.DriverConfigValidated = true
return nil
}
// Revert is used to revert the job to a prior version
func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Revert", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "revert"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
cur, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}
if cur == nil {
return fmt.Errorf("job %q not found", args.JobID)
}
if args.JobVersion == cur.Version {
return fmt.Errorf("can't revert to current version")
}
jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion)
if err != nil {
return err
}
if jobV == nil {
return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion)
}
// Build the register request
reg := &structs.JobRegisterRequest{
Job: jobV.Copy(),
WriteRequest: args.WriteRequest,
}
// If the request is enforcing the existing version do a check.
if args.EnforcePriorVersion != nil {
if cur.Version != *args.EnforcePriorVersion {
return fmt.Errorf("Current job has version %d; enforcing version %d", cur.Version, *args.EnforcePriorVersion)
}
reg.EnforceIndex = true
reg.JobModifyIndex = cur.JobModifyIndex
}
// Register the version.
return j.Register(reg, reply)
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
......
......@@ -9,6 +9,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
......@@ -687,6 +688,155 @@ func TestJobEndpoint_Register_Vault_Policies(t *testing.T) {
}
}
func TestJobEndpoint_Revert(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the initial register request
job := mock.Job()
job.Priority = 100
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Reregister again to get another version
job2 := job.Copy()
job2.Priority = 1
req = &structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Create revert request and enforcing it be at an incorrect version
revertReq := &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
EnforcePriorVersion: helper.Uint64ToPtr(10),
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp)
if err == nil || !strings.Contains(err.Error(), "enforcing version 10") {
t.Fatalf("expected enforcement error")
}
// Create revert request and enforcing it be at the current version
revertReq = &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
err = msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp)
if err == nil || !strings.Contains(err.Error(), "current version") {
t.Fatalf("expected current version err: %v", err)
}
// Create revert request and enforcing it be at version 1
revertReq = &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
EnforcePriorVersion: helper.Uint64ToPtr(1),
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
if resp.EvalID == "" || resp.EvalCreateIndex == 0 {
t.Fatalf("bad created eval: %+v", resp)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index: %d", resp.JobModifyIndex)
}
// Create revert request and don't enforce
revertReq = &structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
if resp.EvalID == "" || resp.EvalCreateIndex == 0 {
t.Fatalf("bad created eval: %+v", resp)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index: %d", resp.JobModifyIndex)
}
// Check that the job is at the correct version and that the eval was
// created
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.Priority != job.Priority {
t.Fatalf("priority mis-match")
}
if out.Version != 3 {
t.Fatalf("got version %d; want %d", out.Version, 3)
}
eout, err := state.EvalByID(ws, resp.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eout == nil {
t.Fatalf("expected eval")
}
if eout.JobID != job.ID {
t.Fatalf("job id mis-match")
}
versions, err := state.JobVersionsByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(versions) != 4 {
t.Fatalf("got %d versions; want %d", len(versions), 4)
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
......
......@@ -528,7 +528,8 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb
return nil
}
// JobByID is used to lookup a job by its ID
// JobByID is used to lookup a job by its ID. JobByID returns the current/latest job
// version.
func (s *StateStore) JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) {
txn := s.db.Txn(false)
......@@ -602,6 +603,24 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin
return all, nil
}
// JobByIDAndVersion returns the job identified by its ID and Version
func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, id string, version uint64) (*structs.Job, error) {
txn := s.db.Txn(false)
watchCh, existing, err := txn.FirstWatch("job_versions", "id", id, version)
if err != nil {
return nil, err
}
ws.Add(watchCh)
if existing != nil {
job := existing.(*structs.Job)
return job, nil
}
return nil, nil
}
// Jobs returns an iterator over all the jobs
func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
......
......@@ -433,6 +433,16 @@ func TestStateStore_UpsertJob_Job(t *testing.T) {
if a := allVersions[0]; a.ID != job.ID || a.Version != 0 {
t.Fatalf("bad: %v", a)
}
// Test the looking up the job by version returns the same results
vout, err := state.JobByIDAndVersion(ws, job.ID, 0)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, vout) {
t.Fatalf("bad: %#v %#v", out, vout)
}
}
func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
......@@ -478,6 +488,9 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
if out.ModifyIndex != 1001 {
t.Fatalf("bad: %#v", out)
}
if out.Version != 1 {
t.Fatalf("bad: %#v", out)
}
index, err := state.Index("jobs")
if err != nil {
......@@ -487,6 +500,16 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
t.Fatalf("bad: %d", index)
}
// Test the looking up the job by version returns the same results
vout, err := state.JobByIDAndVersion(ws, job.ID, 1)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, vout) {
t.Fatalf("bad: %#v %#v", out, vout)
}
// Test that the job summary remains the same if the job is updated but
// count remains same
summary, err := state.JobSummaryByID(ws, job.ID)
......
......@@ -294,17 +294,19 @@ type JobValidateRequest struct {
WriteRequest
}
// JobValidateResponse is the response from validate request
type JobValidateResponse struct {
// DriverConfigValidated indicates whether the agent validated the driver
// config
DriverConfigValidated bool
// JobRevertRequest is used to revert a job to a prior version.
type JobRevertRequest struct {
// JobID is the ID of the job being reverted
JobID string
// ValidationErrors is a list of validation errors
ValidationErrors []string
// JobVersion the version to revert to.
JobVersion uint64
// Error is a string version of any error that may have occured
Error string
// EnforcePriorVersion if set will enforce that the job is at the given
// version before reverting.
EnforcePriorVersion *uint64
WriteRequest
}
// NodeListRequest is used to parameterize a list request
......@@ -493,6 +495,19 @@ type JobDeregisterResponse struct {
QueryMeta
}
// JobValidateResponse is the response from validate request
type JobValidateResponse struct {
// DriverConfigValidated indicates whether the agent validated the driver
// config
DriverConfigValidated bool
// ValidationErrors is a list of validation errors
ValidationErrors []string
// Error is a string version of any error that may have occured
Error string
}
// NodeUpdateResponse is used to respond to a node update
type NodeUpdateResponse struct {
HeartbeatTTL time.Duration
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment