Unverified Commit 80dcae72 authored by James Rasell's avatar James Rasell Committed by GitHub
Browse files

core: allow setting and propagation of eval priority on job de/registration (#11532)

This change modifies the Nomad job register and deregister RPCs to
accept an updated option set which includes eval priority. This
param is optional and override the use of the job priority to set
the eval priority.

In order to ensure all evaluations as a result of the request use
the same eval priority, the priority is shared to the
allocReconciler and deploymentWatcher. This creates a new
distinction between eval priority and job priority.

The Nomad agent HTTP API has been modified to allow setting the
eval priority on job update and delete. To keep consistency with
the current v1 API, job update accepts this as a payload param;
job delete accepts this as a query param.

Any user supplied value is validated within the agent HTTP handler
removing the need to pass invalid requests to the server.

The register and deregister opts functions now all for setting
the eval priority on requests.

The change includes a small change to the DeregisterOpts function
which handles nil opts. This brings the function inline with the
RegisterOpts.
Showing with 854 additions and 44 deletions
+854 -44
```release-note:improvement
core: allow setting and propagation of eval priority on job de/registration
```
......@@ -91,6 +91,7 @@ type RegisterOptions struct {
ModifyIndex uint64
PolicyOverride bool
PreserveCounts bool
EvalPriority int
}
// Register is used to register a new job. It returns the ID
......@@ -105,8 +106,8 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
return j.RegisterOpts(job, &opts, q)
}
// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
// RegisterOpts is used to register a new job with the passed RegisterOpts. It
// returns the ID of the evaluation, along with any errors encountered.
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
// Format the request
req := &JobRegisterRequest{
......@@ -119,6 +120,7 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*
}
req.PolicyOverride = opts.PolicyOverride
req.PreserveCounts = opts.PreserveCounts
req.EvalPriority = opts.EvalPriority
}
var resp JobRegisterResponse
......@@ -290,14 +292,31 @@ type DeregisterOptions struct {
// If Global is set to true, all regions of a multiregion job will be
// stopped.
Global bool
// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job deregistration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job deregistration
// in busy clusters with a large evaluation backlog.
EvalPriority int
}
// DeregisterOpts is used to remove an existing job. See DeregisterOptions
// for parameters.
func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t",
url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q)
// The base endpoint to add query params to.
endpoint := "/v1/job/" + url.PathEscape(jobID)
// Protect against nil opts. url.Values expects a string, and so using
// fmt.Sprintf is the best way to do this.
if opts != nil {
endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v",
opts.Purge, opts.Global, opts.EvalPriority)
}
wm, err := j.client.delete(endpoint, &resp, q)
if err != nil {
return "", nil, err
}
......@@ -1170,6 +1189,14 @@ type JobRegisterRequest struct {
PolicyOverride bool `json:",omitempty"`
PreserveCounts bool `json:",omitempty"`
// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job registration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job registration in
// busy clusters with a large evaluation backlog. This avoids needing to
// change the job priority which also impacts preemption.
EvalPriority int `json:",omitempty"`
WriteRequest
}
......
......@@ -187,6 +187,60 @@ func TestJobs_Register_NoPreserveCounts(t *testing.T) {
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified
}
func TestJobs_Register_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)
// Create a job and register it with an eval priority.
job := testJob()
registerResp, wm, err := c.Jobs().RegisterOpts(job, &RegisterOptions{EvalPriority: 99}, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)
// Check the created job evaluation has a priority that matches our desired
// value.
evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(99, evalInfo.Priority)
}
func TestJobs_Register_NoEvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)
// Create a job and register it with an eval priority.
job := testJob()
registerResp, wm, err := c.Jobs().RegisterOpts(job, nil, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)
// Check the created job evaluation has a priority that matches the job
// priority.
evalInfo, _, err := c.Evaluations().Info(registerResp.EvalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(*job.Priority, evalInfo.Priority)
}
func TestJobs_Validate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
......@@ -1628,6 +1682,68 @@ func TestJobs_Deregister(t *testing.T) {
}
}
func TestJobs_Deregister_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)
// Create a job and register it.
job := testJob()
registerResp, wm, err := c.Jobs().Register(job, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)
// Deregister the job with an eval priority.
evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{EvalPriority: 97}, nil)
requireAssert.NoError(err)
requireAssert.NotEmpty(t, evalID)
// Lookup the eval and check the priority on it.
evalInfo, _, err := c.Evaluations().Info(evalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(97, evalInfo.Priority)
}
func TestJobs_Deregister_NoEvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// Listing jobs before registering returns nothing
listResp, _, err := c.Jobs().List(nil)
requireAssert.Nil(err)
requireAssert.Len(listResp, 0)
// Create a job and register it.
job := testJob()
registerResp, wm, err := c.Jobs().Register(job, nil)
requireAssert.Nil(err)
requireAssert.NotNil(registerResp)
requireAssert.NotEmpty(registerResp.EvalID)
assertWriteMeta(t, wm)
// Deregister the job with an eval priority.
evalID, _, err := c.Jobs().DeregisterOpts(*job.ID, &DeregisterOptions{}, nil)
requireAssert.NoError(err)
requireAssert.NotEmpty(t, evalID)
// Lookup the eval and check the priority on it.
evalInfo, _, err := c.Evaluations().Info(evalID, nil)
requireAssert.NoError(err)
requireAssert.Equal(*job.Priority, evalInfo.Priority)
}
func TestJobs_ForceEvaluate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
......
......@@ -687,6 +687,19 @@ func parseBool(req *http.Request, field string) (*bool, error) {
return nil, nil
}
// parseInt parses a query parameter to a int or returns (nil, nil) if the
// parameter is not present.
func parseInt(req *http.Request, field string) (*int, error) {
if str := req.URL.Query().Get(field); str != "" {
param, err := strconv.Atoi(str)
if err != nil {
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a int: %v", field, str, err)
}
return &param, nil
}
return nil, nil
}
// parseToken is used to parse the X-Nomad-Token param
func (s *HTTPServer) parseToken(req *http.Request, token *string) {
if other := req.Header.Get("X-Nomad-Token"); other != "" {
......
......@@ -574,6 +574,53 @@ func TestParseBool(t *testing.T) {
}
}
func Test_parseInt(t *testing.T) {
t.Parallel()
cases := []struct {
Input string
Expected *int
Err bool
}{
{
Input: "",
Expected: nil,
},
{
Input: "13",
Expected: helper.IntToPtr(13),
},
{
Input: "99",
Expected: helper.IntToPtr(99),
},
{
Input: "ten",
Err: true,
},
}
for i := range cases {
tc := cases[i]
t.Run("Input-"+tc.Input, func(t *testing.T) {
testURL, err := url.Parse("http://localhost/foo?eval_priority=" + tc.Input)
require.NoError(t, err)
req := &http.Request{
URL: testURL,
}
result, err := parseInt(req, "eval_priority")
if tc.Err {
require.Error(t, err)
require.Nil(t, result)
} else {
require.NoError(t, err)
require.Equal(t, tc.Expected, result)
}
})
}
}
func TestParsePagination(t *testing.T) {
t.Parallel()
s := makeHTTPServer(t, nil)
......
......@@ -390,6 +390,15 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
}
}
// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
if args.EvalPriority != 0 {
if err := validateEvalPriorityOpt(args.EvalPriority); err != nil {
return nil, err
}
}
sJob, writeReq := s.apiJobAndRequestToStructs(args.Job, req, args.WriteRequest)
regReq := structs.JobRegisterRequest{
Job: sJob,
......@@ -397,6 +406,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
PreserveCounts: args.PreserveCounts,
EvalPriority: args.EvalPriority,
WriteRequest: *writeReq,
}
......@@ -411,6 +421,9 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
args := structs.JobDeregisterRequest{JobID: jobName}
// Identify the purge query param and parse.
purgeStr := req.URL.Query().Get("purge")
var purgeBool bool
if purgeStr != "" {
......@@ -420,7 +433,9 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err)
}
}
args.Purge = purgeBool
// Identify the global query param and parse.
globalStr := req.URL.Query().Get("global")
var globalBool bool
if globalStr != "" {
......@@ -430,12 +445,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "global", globalStr, err)
}
}
args.Global = globalBool
args := structs.JobDeregisterRequest{
JobID: jobName,
Purge: purgeBool,
Global: globalBool,
// Parse the eval priority from the request URL query if present.
evalPriority, err := parseInt(req, "eval_priority")
if err != nil {
return nil, err
}
// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
if evalPriority != nil && *evalPriority > 0 {
if err := validateEvalPriorityOpt(*evalPriority); err != nil {
return nil, err
}
args.EvalPriority = *evalPriority
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.JobDeregisterResponse
......@@ -1661,3 +1688,12 @@ func ApiSpreadToStructs(a1 *api.Spread) *structs.Spread {
}
return ret
}
// validateEvalPriorityOpt ensures the supplied evaluation priority override
// value is within acceptable bounds.
func validateEvalPriorityOpt(priority int) HTTPCodedError {
if priority < 1 || priority > 100 {
return CodedError(400, "Eval priority must be between 1 and 100 inclusively")
}
return nil
}
......@@ -4,6 +4,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"testing"
"time"
......@@ -596,6 +597,101 @@ func TestHTTP_JobUpdate(t *testing.T) {
})
}
func TestHTTP_JobUpdate_EvalPriority(t *testing.T) {
t.Parallel()
testCases := []struct {
inputEvalPriority int
expectedError bool
name string
}{
{
inputEvalPriority: 95,
expectedError: false,
name: "valid input eval priority",
},
{
inputEvalPriority: 99999999999,
expectedError: true,
name: "invalid input eval priority",
},
{
inputEvalPriority: 0,
expectedError: false,
name: "no input eval priority",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
// Create the job
job := MockJob()
args := api.JobRegisterRequest{
Job: job,
WriteRequest: api.WriteRequest{
Region: "global",
Namespace: api.DefaultNamespace,
},
}
// Add our eval priority query param if set.
if tc.inputEvalPriority > 0 {
args.EvalPriority = tc.inputEvalPriority
}
buf := encodeReq(args)
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/job/"+*job.ID, buf)
assert.Nil(t, err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if tc.expectedError {
assert.NotNil(t, err)
return
} else {
assert.Nil(t, err)
}
// Check the response
regResp := obj.(structs.JobRegisterResponse)
assert.NotEmpty(t, regResp.EvalID)
assert.NotEmpty(t, respW.Result().Header.Get("X-Nomad-Index"))
// Check the job is registered
getReq := structs.JobSpecificRequest{
JobID: *job.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var getResp structs.SingleJobResponse
assert.Nil(t, s.Agent.RPC("Job.GetJob", &getReq, &getResp))
assert.NotNil(t, getResp.Job)
// Check the evaluation that resulted from the job register.
evalInfoReq, err := http.NewRequest("GET", "/v1/evaluation/"+regResp.EvalID, nil)
assert.Nil(t, err)
respW.Flush()
evalRaw, err := s.Server.EvalSpecificRequest(respW, evalInfoReq)
assert.Nil(t, err)
evalRespObj := evalRaw.(*structs.Evaluation)
if tc.inputEvalPriority > 0 {
assert.Equal(t, tc.inputEvalPriority, evalRespObj.Priority)
} else {
assert.Equal(t, *job.Priority, evalRespObj.Priority)
}
})
})
}
}
func TestHTTP_JobUpdateRegion(t *testing.T) {
t.Parallel()
......@@ -797,6 +893,117 @@ func TestHTTP_JobDelete(t *testing.T) {
})
}
func TestHTTP_JobDelete_EvalPriority(t *testing.T) {
t.Parallel()
testCases := []struct {
inputEvalPriority int
expectedError bool
name string
}{
{
inputEvalPriority: 95,
expectedError: false,
name: "valid input eval priority",
},
{
inputEvalPriority: 99999999999,
expectedError: true,
name: "invalid input eval priority",
},
{
inputEvalPriority: 0,
expectedError: false,
name: "no input eval priority",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
// Create the job
job := MockJob()
args := api.JobRegisterRequest{
Job: job,
WriteRequest: api.WriteRequest{
Region: "global",
Namespace: api.DefaultNamespace,
},
}
buf := encodeReq(args)
// Make the HTTP request
regReq, err := http.NewRequest("PUT", "/v1/job/"+*job.ID, buf)
assert.Nil(t, err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, regReq)
assert.Nil(t, err)
// Check the response
regResp := obj.(structs.JobRegisterResponse)
assert.NotEmpty(t, regResp.EvalID)
assert.NotEmpty(t, respW.Result().Header.Get("X-Nomad-Index"))
// Check the job is registered
getReq := structs.JobSpecificRequest{
JobID: *job.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var getResp structs.SingleJobResponse
assert.Nil(t, s.Agent.RPC("Job.GetJob", &getReq, &getResp))
assert.NotNil(t, getResp.Job)
// Delete the job.
deleteReq, err := http.NewRequest("DELETE", "/v1/job/"+*job.ID+"?purge=true", nil)
assert.Nil(t, err)
respW.Flush()
// Add our eval priority query param if set.
if tc.inputEvalPriority > 0 {
q := deleteReq.URL.Query()
q.Add("eval_priority", strconv.Itoa(tc.inputEvalPriority))
deleteReq.URL.RawQuery = q.Encode()
}
// Make the request
obj, err = s.Server.JobSpecificRequest(respW, deleteReq)
if tc.expectedError {
assert.NotNil(t, err)
return
} else {
assert.Nil(t, err)
}
// Check the response
dereg := obj.(structs.JobDeregisterResponse)
assert.NotEmpty(t, dereg.EvalID)
assert.NotEmpty(t, respW.Result().Header.Get("X-Nomad-Index"))
// Check the evaluation that resulted from the job register.
evalInfoReq, err := http.NewRequest("GET", "/v1/evaluation/"+dereg.EvalID, nil)
assert.Nil(t, err)
respW.Flush()
evalRaw, err := s.Server.EvalSpecificRequest(respW, evalInfoReq)
assert.Nil(t, err)
evalRespObj := evalRaw.(*structs.Evaluation)
if tc.inputEvalPriority > 0 {
assert.Equal(t, tc.inputEvalPriority, evalRespObj.Priority)
} else {
assert.Equal(t, *job.Priority, evalRespObj.Priority)
}
})
})
}
}
func TestHTTP_Job_ScaleTaskGroup(t *testing.T) {
t.Parallel()
......
......@@ -86,6 +86,10 @@ Run Options:
the evaluation ID will be printed to the screen, which can be used to
examine the evaluation using the eval-status command.
-eval-priority
Override the priority of the evaluations produced as a result of this job
submission. By default, this is set to the priority of the job.
-hcl1
Parses the job file as HCLv1.
......@@ -152,6 +156,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags {
"-hcl2-strict": complete.PredictNothing,
"-var": complete.PredictAnything,
"-var-file": complete.PredictFiles("*.var"),
"-eval-priority": complete.PredictNothing,
})
}
......@@ -165,6 +170,7 @@ func (c *JobRunCommand) Run(args []string) int {
var detach, verbose, output, override, preserveCounts, hcl2Strict bool
var checkIndexStr, consulToken, consulNamespace, vaultToken, vaultNamespace string
var varArgs, varFiles flaghelper.StringFlag
var evalPriority int
flagSet := c.Meta.FlagSet(c.Name(), FlagSetClient)
flagSet.Usage = func() { c.Ui.Output(c.Help()) }
......@@ -182,6 +188,7 @@ func (c *JobRunCommand) Run(args []string) int {
flagSet.StringVar(&vaultNamespace, "vault-namespace", "", "")
flagSet.Var(&varArgs, "var", "")
flagSet.Var(&varFiles, "var-file", "")
flagSet.IntVar(&evalPriority, "eval-priority", 0, "")
if err := flagSet.Parse(args); err != nil {
return 1
......@@ -282,13 +289,15 @@ func (c *JobRunCommand) Run(args []string) int {
}
// Set the register options
opts := &api.RegisterOptions{}
opts := &api.RegisterOptions{
PolicyOverride: override,
PreserveCounts: preserveCounts,
EvalPriority: evalPriority,
}
if enforce {
opts.EnforceIndex = true
opts.ModifyIndex = checkIndex
}
opts.PolicyOverride = override
opts.PreserveCounts = preserveCounts
// Submit the job
resp, _, err := client.Jobs().RegisterOpts(job, opts, nil)
......
......@@ -39,6 +39,10 @@ Stop Options:
screen, which can be used to examine the evaluation using the eval-status
command.
-eval-priority
Override the priority of the evaluations produced as a result of this job
deregistration. By default, this is set to the priority of the job.
-purge
Purge is used to stop the job and purge it from the system. If not set, the
job will still be queryable and will be purged by the garbage collector.
......@@ -63,11 +67,12 @@ func (c *JobStopCommand) Synopsis() string {
func (c *JobStopCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-detach": complete.PredictNothing,
"-purge": complete.PredictNothing,
"-global": complete.PredictNothing,
"-yes": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-detach": complete.PredictNothing,
"-eval-priority": complete.PredictNothing,
"-purge": complete.PredictNothing,
"-global": complete.PredictNothing,
"-yes": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}
......@@ -90,6 +95,7 @@ func (c *JobStopCommand) Name() string { return "job stop" }
func (c *JobStopCommand) Run(args []string) int {
var detach, purge, verbose, global, autoYes bool
var evalPriority int
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
......@@ -98,6 +104,7 @@ func (c *JobStopCommand) Run(args []string) int {
flags.BoolVar(&global, "global", false, "")
flags.BoolVar(&autoYes, "yes", false, "")
flags.BoolVar(&purge, "purge", false, "")
flags.IntVar(&evalPriority, "eval-priority", 0, "")
if err := flags.Parse(args); err != nil {
return 1
......@@ -192,7 +199,7 @@ func (c *JobStopCommand) Run(args []string) int {
}
// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global}
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
......
......@@ -10,12 +10,12 @@ import (
_ "github.com/hashicorp/nomad/e2e/affinities"
_ "github.com/hashicorp/nomad/e2e/clientstate"
_ "github.com/hashicorp/nomad/e2e/connect"
_ "github.com/hashicorp/nomad/e2e/consul"
_ "github.com/hashicorp/nomad/e2e/consultemplate"
_ "github.com/hashicorp/nomad/e2e/csi"
_ "github.com/hashicorp/nomad/e2e/deployment"
_ "github.com/hashicorp/nomad/e2e/eval_priority"
_ "github.com/hashicorp/nomad/e2e/events"
_ "github.com/hashicorp/nomad/e2e/example"
_ "github.com/hashicorp/nomad/e2e/isolation"
......
......@@ -12,7 +12,24 @@ import (
// Register registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func Register(jobID, jobFilePath string) error {
cmd := exec.Command("nomad", "job", "run", "-detach", "-")
return register(jobID, jobFilePath, exec.Command("nomad", "job", "run", "-detach", "-"))
}
// RegisterWithArgs registers a jobspec from a file but with a unique ID. The
// optional args are added to the run command. The caller is responsible for
// recording that ID for later cleanup.
func RegisterWithArgs(jobID, jobFilePath string, args ...string) error {
baseArgs := []string{"job", "run", "-detach"}
for i := range args {
baseArgs = append(baseArgs, args[i])
}
baseArgs = append(baseArgs, "-")
return register(jobID, jobFilePath, exec.Command("nomad", baseArgs...))
}
func register(jobID, jobFilePath string, cmd *exec.Cmd) error {
stdin, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("could not open stdin?: %w", err)
......
package eval_priority
import (
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
)
type EvalPriorityTest struct {
framework.TC
jobIDs []string
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "EvalPriority",
CanRunLocal: true,
Cases: []framework.TestCase{
new(EvalPriorityTest),
},
})
}
func (tc *EvalPriorityTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1)
}
func (tc *EvalPriorityTest) AfterEach(f *framework.F) {
for _, id := range tc.jobIDs {
_, _, err := tc.Nomad().Jobs().Deregister(id, true, nil)
f.NoError(err)
}
tc.jobIDs = []string{}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}
// TestEvalPrioritySet performs a test which registers, updates, and
// deregsiters a job setting the eval priority on every call.
func (tc *EvalPriorityTest) TestEvalPrioritySet(f *framework.F) {
// Generate a jobID and attempt to register the job using the eval
// priority. In case there is a problem found here and the job registers,
// we need to ensure it gets cleaned up.
jobID := "test-eval-priority-" + uuid.Generate()[0:8]
f.NoError(e2eutil.RegisterWithArgs(jobID, "eval_priority/inputs/thirteen_job_priority.nomad",
"-eval-priority=80"))
tc.jobIDs = append(tc.jobIDs, jobID)
// Wait for the deployment to finish.
f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil))
// Pull the job evaluation list from the API and ensure that this didn't
// error and contains two evals.
//
// Eval 1: the job registration eval.
// Eval 2: the deployment watcher eval.
registerEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil)
f.NoError(err)
f.Len(registerEvals, 2, "job expected to have two evals")
// seenEvals tracks the evaluations we have tested for priority quality
// against our expected value. This allows us to easily perform multiple
// checks with confidence.
seenEvals := map[string]bool{}
// All evaluations should have the priority set to the overridden priority.
for _, eval := range registerEvals {
f.Equal(80, eval.Priority)
seenEvals[eval.ID] = true
}
// Update the job image and set an eval priority higher than the job
// priority.
f.NoError(e2eutil.RegisterWithArgs(jobID, "eval_priority/inputs/thirteen_job_priority.nomad",
"-eval-priority=7", "-var", "image=busybox:1.34"))
f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful",
&e2eutil.WaitConfig{Retries: 200}))
// Pull the latest list of evaluations for the job which will include those
// as a result of the job update.
updateEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil)
f.NoError(err)
f.NotNil(updateEvals, "expected non-nil evaluation list response")
f.NotEmpty(updateEvals, "expected non-empty evaluation list response")
// Iterate the evals, ignoring those we have already seen and check their
// priority is as expected.
for _, eval := range updateEvals {
if ok := seenEvals[eval.ID]; ok {
continue
}
f.Equal(7, eval.Priority)
seenEvals[eval.ID] = true
}
// Deregister the job using an increased priority.
deregOpts := api.DeregisterOptions{EvalPriority: 100, Purge: true}
deregEvalID, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &deregOpts, nil)
f.NoError(err)
f.NotEmpty(deregEvalID, "expected non-empty evaluation ID")
// Detail the deregistration evaluation and check its priority.
evalInfo, _, err := tc.Nomad().Evaluations().Info(deregEvalID, nil)
f.NoError(err)
f.Equal(100, evalInfo.Priority)
// If the job was successfully purged, clear the test suite state.
if err == nil {
tc.jobIDs = []string{}
}
}
// TestEvalPriorityNotSet performs a test which registers, updates, and
// deregsiters a job never setting the eval priority.
func (tc *EvalPriorityTest) TestEvalPriorityNotSet(f *framework.F) {
// Generate a jobID and attempt to register the job using the eval
// priority. In case there is a problem found here and the job registers,
// we need to ensure it gets cleaned up.
jobID := "test-eval-priority-" + uuid.Generate()[0:8]
f.NoError(e2eutil.Register(jobID, "eval_priority/inputs/thirteen_job_priority.nomad"))
tc.jobIDs = append(tc.jobIDs, jobID)
// Wait for the deployment to finish.
f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful", nil))
// Pull the job evaluation list from the API and ensure that this didn't
// error and contains two evals.
//
// Eval 1: the job registration eval.
// Eval 2: the deployment watcher eval.
registerEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil)
f.NoError(err)
f.Len(registerEvals, 2, "job expected to have two evals")
// seenEvals tracks the evaluations we have tested for priority quality
// against our expected value. This allows us to easily perform multiple
// checks with confidence.
seenEvals := map[string]bool{}
// All evaluations should have the priority set to the job priority.
for _, eval := range registerEvals {
f.Equal(13, eval.Priority)
seenEvals[eval.ID] = true
}
// Update the job image without setting an eval priority.
f.NoError(e2eutil.RegisterWithArgs(jobID, "eval_priority/inputs/thirteen_job_priority.nomad",
"-var", "image=busybox:1.34"))
f.NoError(e2eutil.WaitForLastDeploymentStatus(jobID, "default", "successful",
&e2eutil.WaitConfig{Retries: 200}))
// Pull the latest list of evaluations for the job which will include those
// as a result of the job update.
updateEvals, _, err := tc.Nomad().Jobs().Evaluations(jobID, nil)
f.NoError(err)
f.NotNil(updateEvals, "expected non-nil evaluation list response")
f.NotEmpty(updateEvals, "expected non-empty evaluation list response")
// Iterate the evals, ignoring those we have already seen and check their
// priority is as expected.
for _, eval := range updateEvals {
if ok := seenEvals[eval.ID]; ok {
continue
}
f.Equal(13, eval.Priority)
seenEvals[eval.ID] = true
}
// Deregister the job without setting an eval priority.
deregOpts := api.DeregisterOptions{Purge: true}
deregEvalID, _, err := tc.Nomad().Jobs().DeregisterOpts(jobID, &deregOpts, nil)
f.NoError(err)
f.NotEmpty(deregEvalID, "expected non-empty evaluation ID")
// Detail the deregistration evaluation and check its priority.
evalInfo, _, err := tc.Nomad().Evaluations().Info(deregEvalID, nil)
f.NoError(err)
f.Equal(13, evalInfo.Priority)
// If the job was successfully purged, clear the test suite state.
if err == nil {
tc.jobIDs = []string{}
}
}
variable "image" { default = "busybox:1" }
job "networking" {
datacenters = ["dc1", "dc2"]
priority = 13
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "bridged" {
task "sleep" {
driver = "docker"
config {
image = var.image
command = "/bin/sleep"
args = ["300"]
}
}
}
}
......@@ -824,10 +824,20 @@ func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forI
// getEval returns an evaluation suitable for the deployment
func (w *deploymentWatcher) getEval() *structs.Evaluation {
now := time.Now().UTC().UnixNano()
// During a server upgrade it's possible we end up with deployments created
// on the previous version that are then "watched" on a leader that's on
// the new version. This would result in an eval with its priority set to
// zero which would be bad. This therefore protects against that.
priority := w.d.EvalPriority
if priority == 0 {
priority = w.j.Priority
}
return &structs.Evaluation{
ID: uuid.Generate(),
Namespace: w.j.Namespace,
Priority: w.j.Priority,
Priority: priority,
Type: w.j.Type,
TriggeredBy: structs.EvalTriggerDeploymentWatcher,
JobID: w.j.ID,
......
......@@ -357,10 +357,18 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
// Initially set the eval priority to that of the job priority. If the
// user supplied an eval priority override, we subsequently use this.
evalPriority := args.Job.Priority
if args.EvalPriority > 0 {
evalPriority = args.EvalPriority
}
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Priority: evalPriority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
......@@ -829,22 +837,23 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// priority even if the job was.
now := time.Now().UnixNano()
// Set our default priority initially, but update this to that configured
// within the job if possible. It is reasonable from a user perspective
// that jobs with a higher priority have their deregister evaluated before
// those of a lower priority.
//
// Alternatively, the previous behaviour was to set the eval priority to
// the default value. Jobs with a lower than default register priority
// would therefore have their deregister eval priorities higher than
// expected.
priority := structs.JobDefaultPriority
if job != nil {
priority = job.Priority
}
// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
// The evaluation priority is determined by several factors. It
// defaults to the job default priority and is overridden by the
// priority set on the job specification.
//
// If the user supplied an eval priority override, we subsequently
// use this.
priority := structs.JobDefaultPriority
if job != nil {
priority = job.Priority
}
if args.EvalPriority > 0 {
priority = args.EvalPriority
}
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
......
......@@ -169,6 +169,38 @@ func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec
}
func TestJobEndpoint_Register_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)
s1, cleanupS1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.TaskGroups[0].Name = "group1"
job.Canonicalize()
// Register the job.
requireAssert.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
EvalPriority: 99,
}, &structs.JobRegisterResponse{}))
// Grab the eval from the state, and check its priority is as expected.
state := s1.fsm.State()
out, err := state.EvalsByJob(nil, job.Namespace, job.ID)
requireAssert.NoError(err)
requireAssert.Len(out, 1)
requireAssert.Equal(99, out[0].Priority)
}
func TestJobEndpoint_Register_Connect(t *testing.T) {
t.Parallel()
require := require.New(t)
......@@ -3365,6 +3397,48 @@ func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
}
}
func TestJobEndpoint_Deregister_EvalPriority(t *testing.T) {
t.Parallel()
requireAssert := require.New(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Canonicalize()
// Register the job.
requireAssert.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &structs.JobRegisterResponse{}))
// Create the deregister request.
deregReq := &structs.JobDeregisterRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
EvalPriority: 99,
}
var deregResp structs.JobDeregisterResponse
requireAssert.NoError(msgpackrpc.CallWithCodec(codec, "Job.Deregister", deregReq, &deregResp))
// Grab the eval from the state, and check its priority is as expected.
out, err := s1.fsm.State().EvalByID(nil, deregResp.EvalID)
requireAssert.NoError(err)
requireAssert.Equal(99, out.Priority)
}
func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
t.Parallel()
......
......@@ -6781,7 +6781,7 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) {
}
// Insert a deployment
d := structs.NewDeployment(job)
d := structs.NewDeployment(job, 50)
if err := state.UpsertDeployment(2, d); err != nil {
t.Fatalf("bad: %v", err)
}
......
......@@ -586,6 +586,14 @@ type JobRegisterRequest struct {
// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool
// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job registration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job registration in
// busy clusters with a large evaluation backlog. This avoids needing to
// change the job priority which also impacts preemption.
EvalPriority int
// Eval is the evaluation that is associated with the job registration
Eval *Evaluation
......@@ -606,6 +614,13 @@ type JobDeregisterRequest struct {
// deregistered. It is ignored for single-region jobs.
Global bool
// EvalPriority is an optional priority to use on any evaluation created as
// a result on this job deregistration. This value must be between 1-100
// inclusively, where a larger value corresponds to a higher priority. This
// is useful when an operator wishes to push through a job deregistration
// in busy clusters with a large evaluation backlog.
EvalPriority int
// Eval is the evaluation to create that's associated with job deregister
Eval *Evaluation
......@@ -8847,12 +8862,18 @@ type Deployment struct {
// status.
StatusDescription string
// EvalPriority tracks the priority of the evaluation which lead to the
// creation of this Deployment object. Any additional evaluations created
// as a result of this deployment can therefore inherit this value, which
// is not guaranteed to be that of the job priority parameter.
EvalPriority int
CreateIndex uint64
ModifyIndex uint64
}
// NewDeployment creates a new deployment given the job.
func NewDeployment(job *Job) *Deployment {
func NewDeployment(job *Job, evalPriority int) *Deployment {
return &Deployment{
ID: uuid.Generate(),
Namespace: job.Namespace,
......@@ -8865,6 +8886,7 @@ func NewDeployment(job *Job) *Deployment {
Status: DeploymentStatusRunning,
StatusDescription: DeploymentStatusDescriptionRunning,
TaskGroups: make(map[string]*DeploymentState, len(job.TaskGroups)),
EvalPriority: evalPriority,
}
}
......
......@@ -351,7 +351,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
reconciler := NewAllocReconciler(s.logger,
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID)
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID, s.eval.Priority)
results := reconciler.Compute()
s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", results))
......
......@@ -73,8 +73,10 @@ type allocReconciler struct {
// existingAllocs is non-terminal existing allocations
existingAllocs []*structs.Allocation
// evalID is the ID of the evaluation that triggered the reconciler
evalID string
// evalID and evalPriority is the ID and Priority of the evaluation that
// triggered the reconciler.
evalID string
evalPriority int
// now is the time used when determining rescheduling eligibility
// defaults to time.Now, and overidden in unit tests
......@@ -160,7 +162,8 @@ func (r *reconcileResults) Changes() int {
// the changes required to bring the cluster state inline with the declared jobspec
func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool,
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler {
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int) *allocReconciler {
return &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
......@@ -171,6 +174,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
evalPriority: evalPriority,
now: time.Now(),
result: &reconcileResults{
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
......@@ -555,7 +559,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
// A previous group may have made the deployment already
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job)
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
......@@ -942,7 +946,7 @@ func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedReschedule
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.job.Priority,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
......@@ -963,7 +967,7 @@ func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedReschedule
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.job.Priority,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment