Commit da77767f authored by Alex Dadgar's avatar Alex Dadgar
Browse files

Logs over RPC w/ lots to touch up

parent 721fb650
Showing with 1554 additions and 1056 deletions
+1554 -1056
......@@ -2,6 +2,7 @@ package allocdir
import (
"archive/tar"
"context"
"fmt"
"io"
"io/ioutil"
......@@ -10,11 +11,10 @@ import (
"path/filepath"
"time"
"gopkg.in/tomb.v1"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hpcloud/tail/watch"
tomb "gopkg.in/tomb.v1"
)
const (
......@@ -90,8 +90,8 @@ type AllocDirFS interface {
Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64) (io.ReadCloser, error)
Snapshot(w io.Writer) error
BlockUntilExists(path string, t *tomb.Tomb) (chan error, error)
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
BlockUntilExists(ctx context.Context, path string) (chan error, error)
ChangeEvents(ctx context.Context, path string, curOffset int64) (*watch.FileChanges, error)
}
// NewAllocDir initializes the AllocDir struct with allocDir as base path for
......@@ -411,8 +411,8 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
}
// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) (chan error, error) {
// directory exists. The block can be cancelled with the passed context.
func (d *AllocDir) BlockUntilExists(ctx context.Context, path string) (chan error, error) {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
......@@ -423,6 +423,11 @@ func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) (chan error, erro
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
returnCh := make(chan error, 1)
t := &tomb.Tomb{}
go func() {
<-ctx.Done()
t.Kill(nil)
}()
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
......@@ -431,15 +436,21 @@ func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) (chan error, erro
}
// ChangeEvents watches for changes to the passed path relative to the
// allocation directory. The offset should be the last read offset. The tomb is
// allocation directory. The offset should be the last read offset. The context is
// used to clean up the watch.
func (d *AllocDir) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) {
func (d *AllocDir) ChangeEvents(ctx context.Context, path string, curOffset int64) (*watch.FileChanges, error) {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
}
t := &tomb.Tomb{}
go func() {
<-ctx.Done()
t.Kill(nil)
}()
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
......
......@@ -3,6 +3,7 @@ package allocdir
import (
"archive/tar"
"bytes"
"context"
"io"
"io/ioutil"
"log"
......@@ -12,8 +13,6 @@ import (
"syscall"
"testing"
tomb "gopkg.in/tomb.v1"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -314,13 +313,12 @@ func TestAllocDir_EscapeChecking(t *testing.T) {
}
// BlockUntilExists
tomb := tomb.Tomb{}
if _, err := d.BlockUntilExists("../foo", &tomb); err == nil || !strings.Contains(err.Error(), "escapes") {
if _, err := d.BlockUntilExists(context.Background(), "../foo"); err == nil || !strings.Contains(err.Error(), "escapes") {
t.Fatalf("BlockUntilExists of escaping path didn't error: %v", err)
}
// ChangeEvents
if _, err := d.ChangeEvents("../foo", 0, &tomb); err == nil || !strings.Contains(err.Error(), "escapes") {
if _, err := d.ChangeEvents(context.Background(), "../foo", 0); err == nil || !strings.Contains(err.Error(), "escapes") {
t.Fatalf("ChangeEvents of escaping path didn't error: %v", err)
}
}
......
package client
import (
"bytes"
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/client/allocdir"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hpcloud/tail/watch"
"github.com/ugorji/go/codec"
)
var (
allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id")
fileNameNotPresentErr = fmt.Errorf("must provide a file name")
taskNotPresentErr = fmt.Errorf("must provide task name")
logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)")
clientNotRunning = fmt.Errorf("node is not running a Nomad Client")
invalidOrigin = fmt.Errorf("origin must be start or end")
)
const (
// streamFrameSize is the maximum number of bytes to send in a single frame
streamFrameSize = 64 * 1024
// streamHeartbeatRate is the rate at which a heartbeat will occur to detect
// a closed connection without sending any additional data
streamHeartbeatRate = 1 * time.Second
// streamBatchWindow is the window in which file content is batched before
// being flushed if the frame size has not been hit.
streamBatchWindow = 200 * time.Millisecond
// nextLogCheckRate is the rate at which we check for a log entry greater
// than what we are watching for. This is to handle the case in which logs
// rotate faster than we can detect and we have to rely on a normal
// directory listing.
nextLogCheckRate = 100 * time.Millisecond
// deleteEvent and truncateEvent are the file events that can be sent in a
// StreamFrame
deleteEvent = "file deleted"
truncateEvent = "file truncated"
// OriginStart and OriginEnd are the available parameters for the origin
// argument when streaming a file. They respectively offset from the start
// and end of a file.
OriginStart = "start"
OriginEnd = "end"
)
// FileSystem endpoint is used for accessing the logs and filesystem of
......@@ -15,20 +70,611 @@ type FileSystem struct {
c *Client
}
func (f *FileSystem) Register() {
f.c.streamingRpcs.Register("FileSystem.Logs", f.Logs)
}
func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *codec.Encoder) {
// Nothing to do as the conn is closed
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
if sendErr := encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(err, code),
}); sendErr != nil {
f.c.logger.Printf("[WARN] client.fs_endpoint: failed to send error %q: %v", err, sendErr)
}
}
// Stats is used to retrieve the Clients stats.
func (fs *FileSystem) Logs(args *structs.ClientStatsRequest, reply *structs.ClientStatsResponse) error {
defer metrics.MeasureSince([]string{"client", "client_stats", "stats"}, time.Now())
func (f *FileSystem) Logs(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "file_system", "logs"}, time.Now())
defer conn.Close()
f.c.logger.Printf("--------- FileSystem: Logs called")
// Decode the arguments
var req cstructs.FsLogsRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&req); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
f.c.logger.Printf("--------- FileSystem: Read request: %+v", req)
// Check node read permissions
if aclObj, err := fs.c.ResolveToken(args.AuthToken); err != nil {
return err
if aclObj, err := f.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
} else if aclObj != nil {
readfs := aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS)
logs := aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadLogs)
readfs := aclObj.AllowNsOp(req.QueryOptions.Namespace, acl.NamespaceCapabilityReadFS)
logs := aclObj.AllowNsOp(req.QueryOptions.Namespace, acl.NamespaceCapabilityReadLogs)
if !readfs && !logs {
return nstructs.ErrPermissionDenied
f.handleStreamResultError(structs.ErrPermissionDenied, nil, encoder)
return
}
}
// Validate the arguments
if req.AllocID == "" {
f.handleStreamResultError(allocIDNotPresentErr, helper.Int64ToPtr(400), encoder)
return
}
if req.Task == "" {
f.handleStreamResultError(taskNotPresentErr, helper.Int64ToPtr(400), encoder)
return
}
switch req.LogType {
case "stdout", "stderr":
default:
f.handleStreamResultError(logTypeNotPresentErr, helper.Int64ToPtr(400), encoder)
return
}
switch req.Origin {
case "start", "end":
case "":
req.Origin = "start"
default:
f.handleStreamResultError(invalidOrigin, helper.Int64ToPtr(400), encoder)
return
}
fs, err := f.c.GetAllocFS(req.AllocID)
if err != nil {
var code *int64
if strings.Contains(err.Error(), "unknown allocation") {
code = helper.Int64ToPtr(404)
} else {
code = helper.Int64ToPtr(500)
}
f.handleStreamResultError(err, code, encoder)
return
}
alloc, err := f.c.GetClientAlloc(req.AllocID)
if err != nil {
var code *int64
if strings.Contains(err.Error(), "unknown allocation") {
code = helper.Int64ToPtr(404)
} else {
code = helper.Int64ToPtr(500)
}
f.handleStreamResultError(err, code, encoder)
return
}
// Check that the task is there
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
f.handleStreamResultError(fmt.Errorf("Failed to lookup task group for allocation"),
helper.Int64ToPtr(500), encoder)
return
} else if taskStruct := tg.LookupTask(req.Task); taskStruct == nil {
f.handleStreamResultError(
fmt.Errorf("task group %q does not have task with name %q", alloc.TaskGroup, req.Task),
helper.Int64ToPtr(400),
encoder)
return
}
state, ok := alloc.TaskStates[req.Task]
if !ok || state.StartedAt.IsZero() {
f.handleStreamResultError(fmt.Errorf("task %q not started yet. No logs available", req.Task),
helper.Int64ToPtr(404), encoder)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
frames := make(chan *sframer.StreamFrame, 32)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
// Start streaming
go func() {
if err := f.logs(ctx, req.Follow, req.PlainText,
req.Offset, req.Origin, req.Task, req.LogType, fs, frames); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
f.c.logger.Printf("--------- FileSystem: logs finished, err \"%v\" sent", err)
} else {
f.c.logger.Printf("--------- FileSystem: logs finished")
}
}()
var streamErr error
OUTER:
for {
select {
case streamErr = <-errCh:
break OUTER
case frame, ok := <-frames:
if !ok {
f.c.logger.Printf("--------- FileSystem: Frame stream closed")
break OUTER
} else if frame == nil {
f.c.logger.Printf("--------- FileSystem: Got nil frame")
}
f.c.logger.Printf("--------- FileSystem: Got frame w/ payload size %d", len(frame.Data))
var resp cstructs.StreamErrWrapper
if req.PlainText {
resp.Payload = frame.Data
} else {
if err = frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
f.c.logger.Printf("--------- FileSystem: filled payload with %d bytes", len(resp.Payload))
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
f.c.logger.Printf("--------- FileSystem: Sent frame with payload of size: %d", len(resp.Payload))
}
}
if streamErr != nil {
f.c.logger.Printf("--------- FileSystem: Logs finished w/ err: %v", streamErr)
f.handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
f.c.logger.Printf("--------- FileSystem: Logs finished with no error")
}
func (f *FileSystem) logs(ctx context.Context, follow, plain bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, frames chan<- *sframer.StreamFrame) error {
// Create the framer
framer := sframer.NewStreamFramer(frames, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
// Path to the logs
logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName)
// nextIdx is the next index to read logs from
var nextIdx int64
switch origin {
case "start":
nextIdx = 0
case "end":
nextIdx = math.MaxInt64
offset *= -1
default:
return invalidOrigin
}
for {
// Logic for picking next file is:
// 1) List log files
// 2) Pick log file closest to desired index
// 3) Open log file at correct offset
// 3a) No error, read contents
// 3b) If file doesn't exist, goto 1 as it may have been rotated out
entries, err := fs.List(logPath)
if err != nil {
return fmt.Errorf("failed to list entries: %v", err)
}
// If we are not following logs, determine the max index for the logs we are
// interested in so we can stop there.
maxIndex := int64(math.MaxInt64)
if !follow {
_, idx, _, err := findClosest(entries, maxIndex, 0, task, logType)
if err != nil {
return err
}
maxIndex = idx
}
logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType)
if err != nil {
return err
}
var eofCancelCh chan error
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
}
p := filepath.Join(logPath, logEntry.Name)
err = f.stream(ctx, openOffset, p, fs, framer, eofCancelCh)
// Check if the context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
if err != nil {
// Check if there was an error where the file does not exist. That means
// it got rotated out from under us.
if os.IsNotExist(err) {
continue
}
// Check if the connection was closed
if err == syscall.EPIPE {
return nil
}
return fmt.Errorf("failed to stream %q: %v", p, err)
}
if exitAfter {
return nil
}
// defensively check to make sure StreamFramer hasn't stopped
// running to avoid tight loops with goroutine leaks as in
// #3342
select {
case <-framer.ExitCh():
err := parseFramerErr(framer.Err())
if err == syscall.EPIPE {
// EPIPE just means the connection was closed
return nil
}
return err
default:
}
// Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1
}
}
// stream is the internal method to stream the content of a file. eofCancelCh is
// used to cancel the stream if triggered while at EOF. If the connection is
// broken an EPIPE error is returned
func (f *FileSystem) stream(ctx context.Context, offset int64, path string,
fs allocdir.AllocDirFS, framer *sframer.StreamFramer,
eofCancelCh chan error) error {
// Get the reader
file, err := fs.ReadAt(path, offset)
if err != nil {
return err
}
defer file.Close()
// Create a tomb to cancel watch events
waitCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Create a variable to allow setting the last event
var lastEvent string
// Only create the file change watcher once. But we need to do it after we
// read and reach EOF.
var changes *watch.FileChanges
// Start streaming the data
data := make([]byte, streamFrameSize)
OUTER:
for {
// Read up to the max frame size
n, readErr := file.Read(data)
// Update the offset
offset += int64(n)
// Return non-EOF errors
if readErr != nil && readErr != io.EOF {
return readErr
}
// Send the frame
if n != 0 || lastEvent != "" {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {
return parseFramerErr(err)
}
}
// Clear the last event
if lastEvent != "" {
lastEvent = ""
}
// Just keep reading
if readErr == nil {
continue
}
// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(waitCtx, path, offset)
if err != nil {
return err
}
}
for {
select {
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
case <-changes.Truncated:
// Close the current reader
if err := file.Close(); err != nil {
return err
}
// Get a new reader at offset zero
offset = 0
var err error
file, err = fs.ReadAt(path, offset)
if err != nil {
return err
}
defer file.Close()
// Store the last event
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err())
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
if !ok {
return nil
}
return err
}
}
}
}
// blockUntilNextLog returns a channel that will have data sent when the next
// log index or anything greater is created.
func blockUntilNextLog(ctx context.Context, fs allocdir.AllocDirFS, logPath, task, logType string, nextIndex int64) chan error {
nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex))
next := make(chan error, 1)
go func() {
eofCancelCh, err := fs.BlockUntilExists(ctx, nextPath)
if err != nil {
next <- err
close(next)
return
}
ticker := time.NewTicker(nextLogCheckRate)
defer ticker.Stop()
scanCh := ticker.C
for {
select {
case <-ctx.Done():
next <- nil
close(next)
return
case err := <-eofCancelCh:
next <- err
close(next)
return
case <-scanCh:
entries, err := fs.List(logPath)
if err != nil {
next <- fmt.Errorf("failed to list entries: %v", err)
close(next)
return
}
indexes, err := logIndexes(entries, task, logType)
if err != nil {
next <- err
close(next)
return
}
// Scan and see if there are any entries larger than what we are
// waiting for.
for _, entry := range indexes {
if entry.idx >= nextIndex {
next <- nil
close(next)
return
}
}
}
}
}()
return next
}
// indexTuple and indexTupleArray are used to find the correct log entry to
// start streaming logs from
type indexTuple struct {
idx int64
entry *allocdir.AllocFileInfo
}
type indexTupleArray []indexTuple
func (a indexTupleArray) Len() int { return len(a) }
func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx }
func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// logIndexes takes a set of entries and returns a indexTupleArray of
// the desired log file entries. If the indexes could not be determined, an
// error is returned.
func logIndexes(entries []*allocdir.AllocFileInfo, task, logType string) (indexTupleArray, error) {
var indexes []indexTuple
prefix := fmt.Sprintf("%s.%s.", task, logType)
for _, entry := range entries {
if entry.IsDir {
continue
}
// If nothing was trimmed, then it is not a match
idxStr := strings.TrimPrefix(entry.Name, prefix)
if idxStr == entry.Name {
continue
}
// Convert to an int
idx, err := strconv.Atoi(idxStr)
if err != nil {
return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err)
}
indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry})
}
return indexTupleArray(indexes), nil
}
// findClosest takes a list of entries, the desired log index and desired log
// offset (which can be negative, treated as offset from end), task name and log
// type and returns the log entry, the log index, the offset to read from and a
// potential error.
func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64,
task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) {
// Build the matching indexes
indexes, err := logIndexes(entries, task, logType)
if err != nil {
return nil, 0, 0, err
}
if len(indexes) == 0 {
return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType)
}
// Binary search the indexes to get the desiredIdx
sort.Sort(indexes)
i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx })
l := len(indexes)
if i == l {
// Use the last index if the number is bigger than all of them.
i = l - 1
}
// Get to the correct offset
offset := desiredOffset
idx := int64(i)
for {
s := indexes[idx].entry.Size
// Base case
if offset == 0 {
break
} else if offset < 0 {
// Going backwards
if newOffset := s + offset; newOffset >= 0 {
// Current file works
offset = newOffset
break
} else if idx == 0 {
// Already at the end
offset = 0
break
} else {
// Try the file before
offset = newOffset
idx -= 1
continue
}
} else {
// Going forward
if offset <= s {
// Current file works
break
} else if idx == int64(l-1) {
// Already at the end
offset = s
break
} else {
// Try the next file
offset = offset - s
idx += 1
continue
}
}
}
return nil
return indexes[idx].entry, indexes[idx].idx, offset, nil
}
// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
func parseFramerErr(err error) error {
if err == nil {
return nil
}
errMsg := err.Error()
if strings.Contains(errMsg, io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}
// The connection was closed by our peer
if strings.Contains(errMsg, syscall.EPIPE.Error()) || strings.Contains(errMsg, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
// Windows version of ECONNRESET
//XXX(schmichael) I could find no existing error or constant to
// compare this against.
if strings.Contains(errMsg, "forcibly closed") {
return syscall.EPIPE
}
return err
}
package framer
import (
"bytes"
"fmt"
"sync"
"time"
)
var (
// HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
// creating many instances of the empty StreamFrame
HeartbeatStreamFrame = &StreamFrame{}
)
// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
// Offset is the offset the data was read from
Offset int64 `json:",omitempty"`
// Data is the read data
Data []byte `json:",omitempty"`
// File is the file that the data was read from
File string `json:",omitempty"`
// FileEvent is the last file event that occurred that could cause the
// streams position to change or end
FileEvent string `json:",omitempty"`
}
// IsHeartbeat returns if the frame is a heartbeat frame
func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
}
func (s *StreamFrame) Clear() {
s.Offset = 0
s.Data = nil
s.File = ""
s.FileEvent = ""
}
func (s *StreamFrame) IsCleared() bool {
if s.Offset != 0 {
return false
} else if s.Data != nil {
return false
} else if s.File != "" {
return false
} else if s.FileEvent != "" {
return false
} else {
return true
}
}
// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out chan<- *StreamFrame
frameSize int
heartbeat *time.Ticker
flusher *time.Ticker
shutdownCh chan struct{}
exitCh chan struct{}
// The mutex protects everything below
l sync.Mutex
// The current working frame
f StreamFrame
data *bytes.Buffer
// Captures whether the framer is running and any error that occurred to
// cause it to stop.
running bool
err error
}
// NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output channel.
func NewStreamFramer(out chan<- *StreamFrame,
heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// Create the heartbeat and flush ticker
heartbeat := time.NewTicker(heartbeatRate)
flusher := time.NewTicker(batchWindow)
return &StreamFramer{
out: out,
frameSize: frameSize,
heartbeat: heartbeat,
flusher: flusher,
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
shutdownCh: make(chan struct{}),
exitCh: make(chan struct{}),
}
}
// Destroy is used to cleanup the StreamFramer and flush any pending frames
func (s *StreamFramer) Destroy() {
s.l.Lock()
close(s.shutdownCh)
s.heartbeat.Stop()
s.flusher.Stop()
running := s.running
s.l.Unlock()
// Ensure things were flushed
if running {
<-s.exitCh
}
close(s.out)
}
// Run starts a long lived goroutine that handles sending data as well as
// heartbeating
func (s *StreamFramer) Run() {
s.l.Lock()
defer s.l.Unlock()
if s.running {
return
}
s.running = true
go s.run()
}
// ExitCh returns a channel that will be closed when the run loop terminates.
func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}
// Err returns the error that caused the StreamFramer to exit
func (s *StreamFramer) Err() error {
s.l.Lock()
defer s.l.Unlock()
return s.err
}
// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
var err error
defer func() {
s.l.Lock()
s.running = false
s.err = err
s.l.Unlock()
close(s.exitCh)
}()
OUTER:
for {
select {
case <-s.shutdownCh:
break OUTER
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f.IsCleared() {
s.l.Unlock()
continue
}
// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(HeartbeatStreamFrame); err != nil {
return
}
}
}
s.l.Lock()
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
}
s.l.Unlock()
}
// send takes a StreamFrame, encodes and sends it
func (s *StreamFramer) send(f *StreamFrame) error {
sending := *f
f.Data = nil
select {
case s.out <- &sending:
return nil
case <-s.exitCh:
return nil
}
}
// readData is a helper which reads the buffered data returning up to the frame
// size of data. Must be called with the lock held. The returned value is
// invalid on the next read or write into the StreamFramer buffer
func (s *StreamFramer) readData() []byte {
// Compute the amount to read from the buffer
size := s.data.Len()
if size > s.frameSize {
size = s.frameSize
}
if size == 0 {
return nil
}
d := s.data.Next(size)
return d
}
// Send creates and sends a StreamFrame based on the passed parameters. An error
// is returned if the run routine hasn't run or encountered an error. Send is
// asynchronous and does not block for the data to be transferred.
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
s.l.Lock()
defer s.l.Unlock()
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.err != nil {
return s.err
}
return fmt.Errorf("StreamFramer not running")
}
// Check if not mergeable
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
}
// Store the new data as the current frame.
if s.f.IsCleared() {
s.f.Offset = offset
s.f.File = file
s.f.FileEvent = fileEvent
}
// Write the data to the buffer
s.data.Write(data)
// Handle the delete case in which there is no data
force := false
if s.data.Len() == 0 && s.f.FileEvent != "" {
force = true
}
// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize || force {
// Clear
if force {
force = false
}
// Create a new frame to send it
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
if err := s.send(&s.f); err != nil {
return err
}
// Update the offset
s.f.Offset += int64(len(s.f.Data))
}
if s.data.Len() == 0 {
s.f.Clear()
}
return nil
}
package framer
import (
"io"
)
type WriteCloseChecker struct {
io.WriteCloser
Closed bool
}
func (w *WriteCloseChecker) Close() error {
w.Closed = true
return w.WriteCloser.Close()
}
/*
// This test checks, that even if the frame size has not been hit, a flush will
// periodically occur.
func TestStreamFramer_Flush(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
f := "foo"
fe := "bar"
d := []byte{0xa}
o := int64(10)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
if reflect.DeepEqual(frame.Data, d) && frame.Offset == o && frame.File == f && frame.FileEvent == fe {
resultCh <- struct{}{}
return
}
}
}()
// Write only 1 byte so we do not hit the frame size
if err := sf.Send(f, fe, d, o); err != nil {
t.Fatalf("Send() failed %v", err)
}
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
t.Fatalf("failed to flush")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames will be batched till the frame size is hit (in
// the case that is before the flush).
func TestStreamFramer_Batch(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
f := "foo"
fe := "bar"
d := []byte{0xa, 0xb, 0xc}
o := int64(10)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
if reflect.DeepEqual(frame.Data, d) && frame.Offset == o && frame.File == f && frame.FileEvent == fe {
resultCh <- struct{}{}
return
}
}
}()
// Write only 1 byte so we do not hit the frame size
if err := sf.Send(f, fe, d[:1], o); err != nil {
t.Fatalf("Send() failed %v", err)
}
// Ensure we didn't get any data
select {
case <-resultCh:
t.Fatalf("Got data before frame size reached")
case <-time.After(bWindow / 2):
}
// Write the rest so we hit the frame size
if err := sf.Send(f, fe, d[1:], o); err != nil {
t.Fatalf("Send() failed %v", err)
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
t.Fatalf("Did not receive data after batch size reached")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
func TestStreamFramer_Heartbeat(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
resultCh <- struct{}{}
return
}
}
}()
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("failed to heartbeat")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames are received in order
func TestStreamFramer_Order(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}
expected := bytes.NewBuffer(make([]byte, 0, 100000))
for range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
receivedBuf.Write(frame.Data)
if reflect.DeepEqual(expected, receivedBuf) {
resultCh <- struct{}{}
return
}
}
}()
// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}
if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if reflect.DeepEqual(expected, receivedBuf) {
got := receivedBuf.String()
want := expected.String()
t.Fatalf("Got %v; want %v", got, want)
}
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames are received in order
func TestStreamFramer_Order_PlainText(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
sf.Run()
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}
expected := bytes.NewBuffer(make([]byte, 0, 100000))
for range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
// Start the reader
resultCh := make(chan struct{})
go func() {
OUTER:
for {
if _, err := receivedBuf.ReadFrom(r); err != nil {
if strings.Contains(err.Error(), "closed pipe") {
resultCh <- struct{}{}
return
}
t.Fatalf("bad read: %v", err)
}
if expected.Len() != receivedBuf.Len() {
continue
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
continue OUTER
}
}
resultCh <- struct{}{}
return
}
}()
// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}
if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if expected.Len() != receivedBuf.Len() {
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
}
}
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
*/
......@@ -19,6 +19,7 @@ import (
// rpcEndpoints holds the RPC endpoints
type rpcEndpoints struct {
ClientStats *ClientStats
FileSystem *FileSystem
}
// ClientRPC is used to make a local, client only RPC call
......@@ -34,6 +35,12 @@ func (c *Client) ClientRPC(method string, args interface{}, reply interface{}) e
return codec.Err
}
// ClientStreamingRpcHandler is used to make a local, client only streaming RPC
// call.
func (c *Client) ClientStreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) {
return c.streamingRpcs.GetHandler(method)
}
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Invoke the RPCHandler if it exists
......@@ -101,6 +108,10 @@ func canRetry(args interface{}, err error) bool {
func (c *Client) setupClientRpc() {
// Initialize the RPC handlers
c.endpoints.ClientStats = &ClientStats{c}
c.endpoints.FileSystem = &FileSystem{c}
// Initialize the streaming RPC handlers.
c.endpoints.FileSystem.Register()
// Create the RPC Server
c.rpcServer = rpc.NewServer()
......
......@@ -9,6 +9,23 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// RpcError is used for serializing errors with a potential error code
type RpcError struct {
Message string
Code *int64
}
func NewRpcError(err error, code *int64) *RpcError {
return &RpcError{
Message: err.Error(),
Code: code,
}
}
func (r *RpcError) Error() string {
return r.Message
}
// ClientStatsRequest is used to request stats about a Node.
type ClientStatsRequest struct {
NodeID string
......@@ -21,6 +38,42 @@ type ClientStatsResponse struct {
structs.QueryMeta
}
// FsLogsRequest is the initial request for accessing allocation logs.
type FsLogsRequest struct {
// AllocID is the allocation to stream logs from
AllocID string
// Task is the task to stream logs from
Task string
// LogType indicates whether "stderr" or "stdout" should be streamed
LogType string
// Offset is the offset to start streaming data at.
Offset int64
// Origin can either be "start" or "end" and determines where the offset is
// applied.
Origin string
// PlainText disables base64 encoding.
PlainText bool
// Follow follows logs.
Follow bool
structs.QueryOptions
}
// StreamErrWrapper is used to serialize output of a stream of a file or logs.
type StreamErrWrapper struct {
// Error stores any error that may have occured.
Error *RpcError
// Payload is the payload
Payload []byte
}
// MemoryStats holds memory usage related stats
type MemoryStats struct {
RSS uint64
......
This diff is collapsed.
package agent
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/ugorji/go/codec"
)
type WriteCloseChecker struct {
io.WriteCloser
Closed bool
}
func (w *WriteCloseChecker) Close() error {
w.Closed = true
return w.WriteCloser.Close()
}
func TestAllocDirFS_List_MissingParams(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
......@@ -229,402 +225,7 @@ func TestAllocDirFS_Logs_ACL(t *testing.T) {
})
}
type WriteCloseChecker struct {
io.WriteCloser
Closed bool
}
func (w *WriteCloseChecker) Close() error {
w.Closed = true
return w.WriteCloser.Close()
}
// This test checks, that even if the frame size has not been hit, a flush will
// periodically occur.
func TestStreamFramer_Flush(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
f := "foo"
fe := "bar"
d := []byte{0xa}
o := int64(10)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
if reflect.DeepEqual(frame.Data, d) && frame.Offset == o && frame.File == f && frame.FileEvent == fe {
resultCh <- struct{}{}
return
}
}
}()
// Write only 1 byte so we do not hit the frame size
if err := sf.Send(f, fe, d, o); err != nil {
t.Fatalf("Send() failed %v", err)
}
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
t.Fatalf("failed to flush")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames will be batched till the frame size is hit (in
// the case that is before the flush).
func TestStreamFramer_Batch(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
f := "foo"
fe := "bar"
d := []byte{0xa, 0xb, 0xc}
o := int64(10)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
if reflect.DeepEqual(frame.Data, d) && frame.Offset == o && frame.File == f && frame.FileEvent == fe {
resultCh <- struct{}{}
return
}
}
}()
// Write only 1 byte so we do not hit the frame size
if err := sf.Send(f, fe, d[:1], o); err != nil {
t.Fatalf("Send() failed %v", err)
}
// Ensure we didn't get any data
select {
case <-resultCh:
t.Fatalf("Got data before frame size reached")
case <-time.After(bWindow / 2):
}
// Write the rest so we hit the frame size
if err := sf.Send(f, fe, d[1:], o); err != nil {
t.Fatalf("Send() failed %v", err)
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
t.Fatalf("Did not receive data after batch size reached")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
func TestStreamFramer_Heartbeat(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
resultCh <- struct{}{}
return
}
}
}()
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("failed to heartbeat")
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames are received in order
func TestStreamFramer_Order(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}
expected := bytes.NewBuffer(make([]byte, 0, 100000))
for range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
// Start the reader
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
if frame.IsHeartbeat() {
continue
}
receivedBuf.Write(frame.Data)
if reflect.DeepEqual(expected, receivedBuf) {
resultCh <- struct{}{}
return
}
}
}()
// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}
if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if reflect.DeepEqual(expected, receivedBuf) {
got := receivedBuf.String()
want := expected.String()
t.Fatalf("Got %v; want %v", got, want)
}
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
select {
case <-sf.ExitCh():
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * hRate):
t.Fatalf("exit channel should close")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames are received in order
func TestStreamFramer_Order_PlainText(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
sf.Run()
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}
expected := bytes.NewBuffer(make([]byte, 0, 100000))
for range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
// Start the reader
resultCh := make(chan struct{})
go func() {
OUTER:
for {
if _, err := receivedBuf.ReadFrom(r); err != nil {
if strings.Contains(err.Error(), "closed pipe") {
resultCh <- struct{}{}
return
}
t.Fatalf("bad read: %v", err)
}
if expected.Len() != receivedBuf.Len() {
continue
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
continue OUTER
}
}
resultCh <- struct{}{}
return
}
}()
// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}
if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if expected.Len() != receivedBuf.Len() {
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
}
}
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
/*
func TestHTTP_Stream_MissingParams(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
......@@ -693,7 +294,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) {
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := sframer.NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
......@@ -731,7 +332,7 @@ func TestHTTP_Stream_Modify(t *testing.T) {
go func() {
var collected []byte
for {
var frame StreamFrame
var frame sframer.StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode: %v", err)
}
......@@ -753,7 +354,7 @@ func TestHTTP_Stream_Modify(t *testing.T) {
t.Fatalf("write failed: %v", err)
}
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := sframer.NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
......@@ -809,7 +410,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
go func() {
var collected []byte
for {
var frame StreamFrame
var frame sframer.StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode: %v", err)
}
......@@ -835,7 +436,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
t.Fatalf("write failed: %v", err)
}
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := sframer.NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()
......@@ -918,7 +519,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
deleteCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
var frame sframer.StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode: %v", err)
}
......@@ -939,7 +540,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
t.Fatalf("write failed: %v", err)
}
framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := sframer.NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
// Start streaming
......@@ -1010,7 +611,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) {
resultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
var frame sframer.StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF {
t.Logf("EOF")
......@@ -1098,7 +699,7 @@ func TestHTTP_Logs_Follow(t *testing.T) {
fullResultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
var frame sframer.StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF {
t.Logf("EOF")
......@@ -1213,7 +814,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) {
fullResultCh := make(chan struct{})
go func() {
for {
var frame StreamFrame
var frame sframer.StreamFrame
if err := dec.Decode(&frame); err != nil {
if err == io.EOF {
t.Logf("EOF")
......@@ -1468,3 +1069,4 @@ func TestLogs_findClosest(t *testing.T) {
}
}
}
*/
......@@ -28,10 +28,14 @@ func IsErrUnknownMethod(err error) bool {
type StreamingRpcHeader struct {
// Method is the name of the method to invoke.
Method string
// QueryOptions and WriteRequest provide metadata about the RPC request.
QueryOptions *QueryOptions
WriteRequest *WriteRequest
}
// StreamingRpcHandler defines the handler for a streaming RPC.
type StreamingRpcHandler func(io.ReadWriter)
type StreamingRpcHandler func(conn io.ReadWriteCloser)
// StreamingRpcRegistery is used to add and retrieve handlers
type StreamingRpcRegistery struct {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment