Commit bf8a7428 authored by Mahmood Ali's avatar Mahmood Ali
Browse files

executor: scaffolding for executor grpc handling

Prepare executor to handle streaming exec API calls that reuse drivers protobuf
structs.
parent f6969b3d
Showing with 226 additions and 78 deletions
+226 -78
......@@ -13,7 +13,9 @@ import (
hclog "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/helper/pluginutils/grpcutils"
"github.com/hashicorp/nomad/plugins/drivers"
dproto "github.com/hashicorp/nomad/plugins/drivers/proto"
)
var _ Executor = (*grpcExecutorClient)(nil)
......@@ -181,3 +183,62 @@ func (c *grpcExecutorClient) Exec(deadline time.Time, cmd string, args []string)
return resp.Output, int(resp.ExitCode), nil
}
func (d *grpcExecutorClient) ExecStreaming(ctx context.Context,
command []string,
tty bool,
execStream drivers.ExecTaskStream) error {
stream, err := d.client.ExecStreaming(ctx)
if err != nil {
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
err = stream.Send(&dproto.ExecTaskStreamingRequest{
Setup: &dproto.ExecTaskStreamingRequest_Setup{
Command: command,
Tty: tty,
},
})
if err != nil {
return grpcutils.HandleGrpcErr(err, d.doneCtx)
}
errCh := make(chan error, 1)
go func() {
for {
m, err := execStream.Recv()
if err == io.EOF {
return
} else if err != nil {
errCh <- err
return
}
if err := stream.Send(m); err != nil {
errCh <- err
return
}
}
}()
for {
select {
case err := <-errCh:
return err
default:
}
m, err := stream.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
if err := execStream.Send(m); err != nil {
return err
}
}
}
......@@ -77,6 +77,9 @@ type Executor interface {
// Exec executes the given command and args inside the executor context
// and returns the output and exit code.
Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error)
ExecStreaming(ctx context.Context, cmd []string, tty bool,
stream drivers.ExecTaskStream) error
}
// ExecCommand holds the user command, args, and other isolation related
......
This diff is collapsed.
......@@ -14,6 +14,7 @@ service Executor {
rpc Stats(StatsRequest) returns (stream StatsResponse) {}
rpc Signal(SignalRequest) returns (SignalResponse) {}
rpc Exec(ExecRequest) returns (ExecResponse) {}
rpc ExecStreaming(stream hashicorp.nomad.plugins.drivers.proto.ExecTaskStreamingRequest) returns (stream hashicorp.nomad.plugins.drivers.proto.ExecTaskStreamingResponse) {}
}
message LaunchRequest {
......
package executor
import (
"fmt"
"syscall"
"time"
......@@ -154,3 +155,18 @@ func (s *grpcExecutorServer) Exec(ctx context.Context, req *proto.ExecRequest) (
ExitCode: int32(exit),
}, nil
}
func (s *grpcExecutorServer) ExecStreaming(server proto.Executor_ExecStreamingServer) error {
msg, err := server.Recv()
if err != nil {
return fmt.Errorf("failed to receive initial message: %v", err)
}
if msg.Setup == nil {
return fmt.Errorf("first message should always be setup")
}
return s.impl.ExecStreaming(server.Context(),
msg.Setup.Command, msg.Setup.Tty,
server)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment