Commit bba56450 authored by Alex Dadgar's avatar Alex Dadgar
Browse files

Deployments list

parent 818414bc
Showing with 281 additions and 0 deletions
+281 -0
......@@ -14,6 +14,56 @@ type Deployment struct {
srv *Server
}
// TODO http endpoint and api
// List returns the list of deployments in the system
func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error {
if done, err := d.srv.forward("Deployment.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "deployment", "List"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the deployments
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.DeploymentsByIDPrefix(ws, prefix)
} else {
iter, err = state.Deployments(ws)
}
if err != nil {
return err
}
var deploys []*structs.Deployment
for {
raw := iter.Next()
if raw == nil {
break
}
deploy := raw.(*structs.Deployment)
deploys = append(deploys, deploy)
}
reply.Deployments = deploys
// Use the last index that affected the jobs table
index, err := state.Index("deployment")
if err != nil {
return err
}
reply.Index = index
// Set the query response
d.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return d.srv.blockingRPC(&opts)
}
// TODO http endpoint and api
// Allocations returns the list of allocations that are a part of the deployment
func (d *Deployment) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error {
......
......@@ -10,6 +10,127 @@ import (
"github.com/hashicorp/nomad/testutil"
)
func TestDeploymentEndpoint_List(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
deployment := mock.Deployment()
state := s1.fsm.State()
if err := state.UpsertDeployment(1000, deployment, false); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the deployments
get := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.DeploymentListResponse
if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
}
if len(resp.Deployments) != 1 {
t.Fatalf("bad: %#v", resp.Deployments)
}
if resp.Deployments[0].ID != deployment.ID {
t.Fatalf("bad: %#v", resp.Deployments[0])
}
// Lookup the deploys by prefix
get = &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{Region: "global", Prefix: deployment.ID[:4]},
}
var resp2 structs.DeploymentListResponse
if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
}
if len(resp2.Deployments) != 1 {
t.Fatalf("bad: %#v", resp2.Deployments)
}
if resp2.Deployments[0].ID != deployment.ID {
t.Fatalf("bad: %#v", resp2.Deployments[0])
}
}
func TestDeploymentEndpoint_List_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the deployment
deployment := mock.Deployment()
// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertDeployment(3, deployment, false); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.DeploymentListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.DeploymentListResponse
if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 3 {
t.Fatalf("Bad index: %d %d", resp.Index, 3)
}
if len(resp.Deployments) != 1 || resp.Deployments[0].ID != deployment.ID {
t.Fatalf("bad: %#v", resp.Deployments)
}
// Deployment updates trigger watches
deployment2 := deployment.Copy()
deployment2.Status = structs.DeploymentStatusPaused
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertDeployment(5, deployment2, false); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 3
start = time.Now()
var resp2 structs.DeploymentListResponse
if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 5 {
t.Fatalf("Bad index: %d %d", resp2.Index, 5)
}
if len(resp2.Deployments) != 1 || resp.Deployments[0].ID != deployment2.ID ||
resp2.Deployments[0].Status != structs.DeploymentStatusPaused {
t.Fatalf("bad: %#v", resp2.Deployments)
}
}
func TestDeploymentEndpoint_Allocations(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
......
......@@ -312,6 +312,19 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error
return iter, nil
}
func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, deploymentID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// Walk the entire deployments table
iter, err := txn.Get("deployment", "id_prefix", deploymentID)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
return s.deploymentByIDImpl(ws, deploymentID, txn)
......
......@@ -511,6 +511,92 @@ func TestStateStore_Deployments(t *testing.T) {
}
}
func TestStateStore_DeploymentsByIDPrefix(t *testing.T) {
state := testStateStore(t)
deploy := mock.Deployment()
deploy.ID = "11111111-662e-d0ab-d1c9-3e434af7bdb4"
err := state.UpsertDeployment(1000, deploy, false)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a watchset so we can test that getters don't cause it to fire
ws := memdb.NewWatchSet()
iter, err := state.DeploymentsByIDPrefix(ws, deploy.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
gatherDeploys := func(iter memdb.ResultIterator) []*structs.Deployment {
var deploys []*structs.Deployment
for {
raw := iter.Next()
if raw == nil {
break
}
deploy := raw.(*structs.Deployment)
deploys = append(deploys, deploy)
}
return deploys
}
deploys := gatherDeploys(iter)
if len(deploys) != 1 {
t.Fatalf("err: %v", err)
}
if watchFired(ws) {
t.Fatalf("bad")
}
iter, err = state.DeploymentsByIDPrefix(ws, "11")
if err != nil {
t.Fatalf("err: %v", err)
}
deploys = gatherDeploys(iter)
if len(deploys) != 1 {
t.Fatalf("err: %v", err)
}
deploy = mock.Deployment()
deploy.ID = "11222222-662e-d0ab-d1c9-3e434af7bdb4"
err = state.UpsertDeployment(1001, deploy, false)
if err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
iter, err = state.DeploymentsByIDPrefix(ws, "11")
if err != nil {
t.Fatalf("err: %v", err)
}
deploys = gatherDeploys(iter)
if len(deploys) != 2 {
t.Fatalf("err: %v", err)
}
iter, err = state.DeploymentsByIDPrefix(ws, "1111")
if err != nil {
t.Fatalf("err: %v", err)
}
deploys = gatherDeploys(iter)
if len(deploys) != 1 {
t.Fatalf("err: %v", err)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_UpsertNode_Node(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
......
......@@ -488,6 +488,11 @@ type GenericRequest struct {
QueryOptions
}
// DeploymentListRequest is used to list the deployments
type DeploymentListRequest struct {
QueryOptions
}
// DeploymentStatusUpdateRequest is used to update the status of a deployment as
// well as optionally creating an evaluation atomically.
type DeploymentStatusUpdateRequest struct {
......@@ -787,6 +792,12 @@ type AllocListResponse struct {
QueryMeta
}
// DeploymentListResponse is used for a list request
type DeploymentListResponse struct {
Deployments []*Deployment
QueryMeta
}
// EvalListResponse is used for a list request
type EvalListResponse struct {
Evaluations []*Evaluation
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment