Commit 5cfda487 authored by Mahmood Ali's avatar Mahmood Ali
Browse files

add basic alloc endpoint

parent 4c3e28f7
Branches unavailable
No related merge requests found
Showing with 169 additions and 0 deletions
+169 -0
package nomad
import (
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
func TestClientAllocations_GarbageCollectAll_Local(t *testing.T) {
......@@ -785,3 +794,163 @@ func TestClientAllocations_Stats_Remote(t *testing.T) {
require.Nil(err)
require.NotNil(resp.Stats)
}
// TestAlloc_ExecStreaming asserts that exec task requests are forwarded
// to appropriate server or remote regions
func TestAlloc_ExecStreaming(t *testing.T) {
t.Parallel()
////// Nomad clusters topology - not specific to test
localServer := TestServer(t, nil)
defer localServer.Shutdown()
remoteServer := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer remoteServer.Shutdown()
remoteRegionServer := TestServer(t, func(c *Config) {
c.Region = "two"
})
defer remoteRegionServer.Shutdown()
TestJoin(t, localServer, remoteServer)
TestJoin(t, localServer, remoteRegionServer)
testutil.WaitForLeader(t, localServer.RPC)
testutil.WaitForLeader(t, remoteServer.RPC)
testutil.WaitForLeader(t, remoteRegionServer.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{localServer.config.RPCAddr.String()}
})
defer cleanup()
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
nodes := remoteServer.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.NoError(t, err, "failed to have a client")
})
// Force remove the connection locally in case it exists
remoteServer.nodeConnsLock.Lock()
delete(remoteServer.nodeConns, c.NodeID())
remoteServer.nodeConnsLock.Unlock()
///// Start task
a := mock.BatchAlloc()
a.NodeID = c.NodeID()
a.Job.Type = structs.JobTypeBatch
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "20s",
"exec_command": map[string]interface{}{
"run_for": "1ms",
"stdout_string": "expected output",
"exit_code": 3,
},
}
// Upsert the allocation
localState := localServer.State()
require.Nil(t, localState.UpsertJob(999, a.Job))
require.Nil(t, localState.UpsertAllocs(1003, []*structs.Allocation{a}))
remoteState := remoteServer.State()
require.Nil(t, remoteState.UpsertJob(999, a.Job))
require.Nil(t, remoteState.UpsertAllocs(1003, []*structs.Allocation{a}))
testutil.WaitForRunning(t, localServer.RPC, a.Job)
///////// Actually run query now
cases := []struct {
name string
rpc func(string) (structs.StreamingRpcHandler, error)
}{
{"client", c.StreamingRpcHandler},
{"local_server", localServer.StreamingRpcHandler},
{"remote_server", remoteServer.StreamingRpcHandler},
{"remote_region", remoteRegionServer.StreamingRpcHandler},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Make the request
req := &cstructs.AllocExecRequest{
AllocID: a.ID,
Task: a.Job.TaskGroups[0].Tasks[0].Name,
Tty: true,
Cmd: []string{"placeholder command"},
QueryOptions: nstructs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := tc.rpc("Allocations.Exec")
require.Nil(t, err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
frames := make(chan *sframer.StreamFrame)
// Start the handler
go handler(p2)
go decodeFrames(t, p1, frames, errCh)
// Send the request
encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
require.FailNow(t, "timed out before getting exit code")
case err := <-errCh:
require.NoError(t, err)
case f := <-frames:
if f.FileEvent == "exit-code" {
code, err := strconv.Atoi(string(f.Data))
require.NoError(t, err)
require.Equal(t, 3, code)
break OUTER
}
}
}
})
}
}
func decodeFrames(t *testing.T, p1 net.Conn, frames chan<- *sframer.StreamFrame, errCh chan<- error) {
// Start the decoder
decoder := codec.NewDecoder(p1, nstructs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
t.Logf("received error decoding: %#v", err)
errCh <- fmt.Errorf("error decoding: %v", err)
return
}
if msg.Error != nil {
errCh <- msg.Error
continue
}
var frame sframer.StreamFrame
json.Unmarshal(msg.Payload, &frame)
t.Logf("received message: %#v", msg)
frames <- &frame
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment