Commit a0bcdfc3 authored by Mahmood Ali's avatar Mahmood Ali
Browse files

allow reuse of buffer in api

parent 92bc50dc
Showing with 22 additions and 33 deletions
+22 -33
......@@ -8,6 +8,7 @@ import (
"net"
"sort"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
......@@ -77,10 +78,9 @@ func (a *Allocations) Exec(ctx context.Context,
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
inputCh := make(chan *ExecStreamingInput)
errCh := make(chan error, 1)
frames := a.execFrames(ctx, alloc, task, tty, command, inputCh, errCh, q)
sender, output := a.execFrames(ctx, alloc, task, tty, command, errCh, q)
select {
case err := <-errCh:
......@@ -91,18 +91,18 @@ func (a *Allocations) Exec(ctx context.Context,
// forwarding stdin
go func() {
bytes := make([]byte, 2048)
for {
if ctx.Err() != nil {
return
}
input := ExecStreamingInput{Stdin: &FileOperation{}}
bytes := make([]byte, 2048)
n, err := stdin.Read(bytes)
if err == io.EOF {
input.Stdin.Close = true
inputCh <- &input
sender(&input)
return
} else if err != nil {
errCh <- err
......@@ -110,7 +110,7 @@ func (a *Allocations) Exec(ctx context.Context,
}
input.Stdin.Data = bytes[:n]
inputCh <- &input
sender(&input)
}
}()
......@@ -127,11 +127,11 @@ func (a *Allocations) Exec(ctx context.Context,
continue
}
resizeInput.TTYSize = &size
inputCh <- &resizeInput
sender(&resizeInput)
// heartbeat message
case <-time.After(10 * time.Second):
inputCh <- &ExecStreamingInputHeartbeat
sender(&ExecStreamingInputHeartbeat)
}
}
......@@ -143,7 +143,7 @@ func (a *Allocations) Exec(ctx context.Context,
return -1, err
case <-ctx.Done():
return -1, ctx.Err()
case frame, ok := <-frames:
case frame, ok := <-output:
if !ok {
return -1, nil
}
......@@ -167,13 +167,12 @@ func (a *Allocations) Exec(ctx context.Context,
}
func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
inputCh <-chan *ExecStreamingInput, errCh chan<- error,
q *QueryOptions) <-chan *ExecStreamingOutput {
errCh chan<- error, q *QueryOptions) (sendFn func(interface{}) error, output <-chan *ExecStreamingOutput) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil
return nil, nil
}
if q == nil {
......@@ -186,7 +185,7 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st
commandBytes, err := json.Marshal(command)
if err != nil {
errCh <- fmt.Errorf("failed to marshal command: %s", err)
return nil
return nil, nil
}
q.Params["tty"] = strconv.FormatBool(tty)
......@@ -200,34 +199,16 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil
return nil, nil
}
conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err
return nil
return nil, nil
}
}
go func() {
for {
select {
case <-ctx.Done():
return
case input, ok := <-inputCh:
if !ok {
return
}
err := conn.WriteJSON(input)
if err != nil {
// TODO: handle this
}
}
}
}()
// Create the output channel
frames := make(chan *ExecStreamingOutput, 10)
......@@ -256,7 +237,15 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st
}
}()
return frames
var sendLock sync.Mutex
send := func(v interface{}) error {
sendLock.Lock()
defer sendLock.Unlock()
return conn.WriteJSON(v)
}
return send, frames
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment