"vendor/cloud.google.com/git@git.gitsec.cn:baidan/vault.git" did not exist on "d660b45ee413a3e344e9df285682e887a5f86ee2"
Commit 4cf84a11 authored by Alex Dadgar's avatar Alex Dadgar
Browse files

job deployment endpoint + api

parent 919c2f8d
Showing with 378 additions and 1 deletion
+378 -1
......@@ -138,8 +138,31 @@ func (j *Jobs) Allocations(jobID string, allAllocs bool, q *QueryOptions) ([]*Al
return resp, qm, nil
}
// Evaluations is used to query the evaluations associated with
// Deployments is used to query the deployments associated with the given job
// ID.
func (j *Jobs) Deployments(jobID string, q *QueryOptions) ([]*Deployment, *QueryMeta, error) {
var resp []*Deployment
qm, err := j.client.query("/v1/job/"+jobID+"/deployments", &resp, q)
if err != nil {
return nil, nil, err
}
sort.Sort(DeploymentIndexSort(resp))
return resp, qm, nil
}
// LatestDeployment is used to query for the latest deployment associated with
// the given job ID.
func (j *Jobs) LatestDeployment(jobID string, q *QueryOptions) (*Deployment, *QueryMeta, error) {
var resp *Deployment
qm, err := j.client.query("/v1/job/"+jobID+"/deployment", &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}
// Evaluations is used to query the evaluations associated with the given job
// ID.
func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *QueryMeta, error) {
var resp []*Evaluation
qm, err := j.client.query("/v1/job/"+jobID+"/evaluations", &resp, q)
......
......@@ -70,6 +70,12 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/revert"):
jobName := strings.TrimSuffix(path, "/revert")
return s.jobRevert(resp, req, jobName)
case strings.HasSuffix(path, "/deployments"):
jobName := strings.TrimSuffix(path, "/deployments")
return s.jobDeployments(resp, req, jobName)
case strings.HasSuffix(path, "/deployment"):
jobName := strings.TrimSuffix(path, "/deployment")
return s.jobLatestDeployment(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
......@@ -231,6 +237,51 @@ func (s *HTTPServer) jobEvaluations(resp http.ResponseWriter, req *http.Request,
return out.Evaluations, nil
}
func (s *HTTPServer) jobDeployments(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.JobSpecificRequest{
JobID: jobName,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.DeploymentListResponse
if err := s.agent.RPC("Job.Deployments", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Deployments == nil {
out.Deployments = make([]*structs.Deployment, 0)
}
return out.Deployments, nil
}
func (s *HTTPServer) jobLatestDeployment(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.JobSpecificRequest{
JobID: jobName,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.SingleDeploymentResponse
if err := s.agent.RPC("Job.LatestDeployment", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Deployment, nil
}
func (s *HTTPServer) jobCRUD(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
switch req.Method {
......
......@@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
)
func TestHTTP_JobsList(t *testing.T) {
......@@ -600,6 +601,82 @@ func TestHTTP_JobAllocations(t *testing.T) {
})
}
func TestHTTP_JobDeployments(t *testing.T) {
assert := assert.New(t)
httpTest(t, nil, func(s *TestServer) {
// Create the job
j := mock.Job()
args := structs.JobRegisterRequest{
Job: j,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.JobRegisterResponse
assert.Nil(s.Agent.RPC("Job.Register", &args, &resp), "JobRegister")
// Directly manipulate the state
state := s.Agent.server.State()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertDeployment(1000, d, false), "UpsertDeployment")
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/job/"+j.ID+"/deployments", nil)
assert.Nil(err, "HTTP")
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
assert.Nil(err, "JobSpecificRequest")
// Check the response
deploys := obj.([]*structs.Deployment)
assert.Len(deploys, 1, "deployments")
assert.Equal(d.ID, deploys[0].ID, "deployment id")
assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index"), "missing index")
assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader")
assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact")
})
}
func TestHTTP_JobDeployment(t *testing.T) {
assert := assert.New(t)
httpTest(t, nil, func(s *TestServer) {
// Create the job
j := mock.Job()
args := structs.JobRegisterRequest{
Job: j,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.JobRegisterResponse
assert.Nil(s.Agent.RPC("Job.Register", &args, &resp), "JobRegister")
// Directly manipulate the state
state := s.Agent.server.State()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertDeployment(1000, d, false), "UpsertDeployment")
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/job/"+j.ID+"/deployment", nil)
assert.Nil(err, "HTTP")
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
assert.Nil(err, "JobSpecificRequest")
// Check the response
out := obj.(*structs.Deployment)
assert.NotNil(out, "deployment")
assert.Equal(d.ID, out.ID, "deployment id")
assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index"), "missing index")
assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader")
assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact")
})
}
func TestHTTP_JobVersions(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job
......
......@@ -3,6 +3,7 @@ package nomad
import (
"context"
"fmt"
"sort"
"strings"
"time"
......@@ -718,6 +719,81 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest,
return j.srv.blockingRPC(&opts)
}
// Deployments is used to list the deployments for a job
func (j *Job) Deployments(args *structs.JobSpecificRequest,
reply *structs.DeploymentListResponse) error {
if done, err := j.srv.forward("Job.Deployments", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "deployments"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the deployments
deploys, err := state.DeploymentsByJobID(ws, args.JobID)
if err != nil {
return err
}
// Use the last index that affected the deployment table
index, err := state.Index("deployment")
if err != nil {
return err
}
reply.Index = index
reply.Deployments = deploys
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// LatestDeployment is used to retrieve the latest deployment for a job
func (j *Job) LatestDeployment(args *structs.JobSpecificRequest,
reply *structs.SingleDeploymentResponse) error {
if done, err := j.srv.forward("Job.LatestDeployment", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "latest_deployment"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the deployments
deploys, err := state.DeploymentsByJobID(ws, args.JobID)
if err != nil {
return err
}
// Use the last index that affected the deployment table
index, err := state.Index("deployment")
if err != nil {
return err
}
reply.Index = index
if len(deploys) > 0 {
sort.Slice(deploys, func(i, j int) bool {
return deploys[i].CreateIndex > deploys[j].CreateIndex
})
reply.Deployment = deploys[0]
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Plan is used to cause a dry-run evaluation of the Job and return the results
// with a potential diff containing annotations.
func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) error {
......
......@@ -14,6 +14,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
)
func TestJobEndpoint_Register(t *testing.T) {
......@@ -2063,6 +2064,155 @@ func TestJobEndpoint_Evaluations_Blocking(t *testing.T) {
}
}
func TestJobEndpoint_Deployments(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
assert := assert.New(t)
// Create the register request
j := mock.Job()
d1 := mock.Deployment()
d2 := mock.Deployment()
d1.JobID = j.ID
d2.JobID = j.ID
assert.Nil(state.UpsertJob(1000, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1001, d1, false), "UpsertDeployment")
assert.Nil(state.UpsertDeployment(1002, d2, false), "UpsertDeployment")
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: j.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.DeploymentListResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp), "RPC")
assert.EqualValues(1002, resp.Index, "response index")
assert.Len(resp.Deployments, 2, "deployments for job")
}
func TestJobEndpoint_Deployments_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
assert := assert.New(t)
// Create the register request
j := mock.Job()
d1 := mock.Deployment()
d2 := mock.Deployment()
d2.JobID = j.ID
assert.Nil(state.UpsertJob(50, j), "UpsertJob")
// First upsert an unrelated eval
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.UpsertDeployment(100, d1, false), "UpsertDeployment")
})
// Upsert an eval for the job we are interested in later
time.AfterFunc(200*time.Millisecond, func() {
assert.Nil(state.UpsertDeployment(200, d2, false), "UpsertDeployment")
})
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: d2.JobID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
},
}
var resp structs.DeploymentListResponse
start := time.Now()
assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp), "RPC")
assert.EqualValues(200, resp.Index, "response index")
assert.Len(resp.Deployments, 1, "deployments for job")
assert.Equal(d2.ID, resp.Deployments[0], "returned deployment")
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
}
func TestJobEndpoint_LatestDeployment(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
assert := assert.New(t)
// Create the register request
j := mock.Job()
d1 := mock.Deployment()
d2 := mock.Deployment()
d1.JobID = j.ID
d2.JobID = j.ID
d2.CreateIndex = d1.CreateIndex + 100
d2.ModifyIndex = d2.CreateIndex + 100
assert.Nil(state.UpsertJob(1000, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1001, d1, false), "UpsertDeployment")
assert.Nil(state.UpsertDeployment(1002, d2, false), "UpsertDeployment")
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: j.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.SingleDeploymentResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp), "RPC")
assert.EqualValues(1002, resp.Index, "response index")
assert.NotNil(resp.Deployment, "want a deployment")
assert.Equal(d2.ID, resp.Deployment.ID, "latest deployment for job")
}
func TestJobEndpoint_LatestDeployment_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
assert := assert.New(t)
// Create the register request
j := mock.Job()
d1 := mock.Deployment()
d2 := mock.Deployment()
d2.JobID = j.ID
assert.Nil(state.UpsertJob(50, j), "UpsertJob")
// First upsert an unrelated eval
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.UpsertDeployment(100, d1, false), "UpsertDeployment")
})
// Upsert an eval for the job we are interested in later
time.AfterFunc(200*time.Millisecond, func() {
assert.Nil(state.UpsertDeployment(200, d2, false), "UpsertDeployment")
})
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: d2.JobID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
},
}
var resp structs.SingleDeploymentResponse
start := time.Now()
assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp), "RPC")
assert.EqualValues(200, resp.Index, "response index")
assert.NotNil(resp.Deployment, "deployment for job")
assert.Equal(d2.ID, resp.Deployment.ID, "returned deployment")
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
}
func TestJobEndpoint_Plan_WithDiff(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment