Unverified Commit efacb1e0 authored by Drew Bailey's avatar Drew Bailey
Browse files

retry copy if copy receives EOF but still running

parent a7291fc6
Branches unavailable
No related merge requests found
Showing with 99 additions and 14 deletions
+99 -14
package logmon package logmon
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"os" "os"
...@@ -177,6 +178,8 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { ...@@ -177,6 +178,8 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {
// log rotator data. The processOutWriter should be attached to the process and // log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator. // data will be copied from the reader to the rotator.
type logRotatorWrapper struct { type logRotatorWrapper struct {
retryCtx context.Context
retryCancel context.CancelFunc
fifoPath string fifoPath string
rotatorWriter io.WriteCloser rotatorWriter io.WriteCloser
hasFinishedCopied chan struct{} hasFinishedCopied chan struct{}
...@@ -217,8 +220,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos ...@@ -217,8 +220,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos
logger.Error("failed to create FIFO", "stat_error", serr, "create_err", err) logger.Error("failed to create FIFO", "stat_error", serr, "create_err", err)
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err) return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
} }
ctx, cancel := context.WithCancel(context.Background())
wrap := &logRotatorWrapper{ wrap := &logRotatorWrapper{
retryCtx: ctx,
retryCancel: cancel,
fifoPath: path, fifoPath: path,
rotatorWriter: rotator, rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}), hasFinishedCopied: make(chan struct{}),
...@@ -235,6 +241,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos ...@@ -235,6 +241,11 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator io.WriteClos
func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) { func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
go func() { go func() {
defer close(l.hasFinishedCopied) defer close(l.hasFinishedCopied)
RETRY_COPY:
if l.retryCtx.Err() != nil {
l.logger.Warn("fifo context cancelled, exiting")
return
}
reader, err := openFn() reader, err := openFn()
if err != nil { if err != nil {
...@@ -256,6 +267,21 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) { ...@@ -256,6 +267,21 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
// force-killed. // force-killed.
reader.Close() reader.Close()
} }
// Close reader
reader.Close()
// Check if the context has been closed
// return if it has, otherwise wait before
// restarting copy
select {
case <-l.retryCtx.Done():
return
case <-time.After(1 * time.Second):
}
// reset openCompleted
l.openCompleted = make(chan struct{})
goto RETRY_COPY
}() }()
return return
} }
...@@ -263,6 +289,9 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) { ...@@ -263,6 +289,9 @@ func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
// Close closes the rotator and the process writer to ensure that the Wait // Close closes the rotator and the process writer to ensure that the Wait
// command exits. // command exits.
func (l *logRotatorWrapper) Close() { func (l *logRotatorWrapper) Close() {
// Cancel the retryable context
l.retryCancel()
// Wait up to the close tolerance before we force close // Wait up to the close tolerance before we force close
select { select {
case <-l.hasFinishedCopied: case <-l.hasFinishedCopied:
......
...@@ -131,16 +131,10 @@ func TestLogmon_Start_restart_flusheslogs(t *testing.T) { ...@@ -131,16 +131,10 @@ func TestLogmon_Start_restart_flusheslogs(t *testing.T) {
}) })
require.True(impl.tl.IsRunning()) require.True(impl.tl.IsRunning())
// Close stdout and assert that logmon no longer writes to the file // Close stdout and stderr
require.NoError(stdout.Close()) require.NoError(stdout.Close())
require.NoError(stderr.Close()) require.NoError(stderr.Close())
testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})
stdout, err = fifo.OpenWriter(stdoutFifoPath) stdout, err = fifo.OpenWriter(stdoutFifoPath)
require.NoError(err) require.NoError(err)
stderr, err = fifo.OpenWriter(stderrFifoPath) stderr, err = fifo.OpenWriter(stderrFifoPath)
...@@ -234,16 +228,10 @@ func TestLogmon_Start_restart(t *testing.T) { ...@@ -234,16 +228,10 @@ func TestLogmon_Start_restart(t *testing.T) {
}) })
require.True(impl.tl.IsRunning()) require.True(impl.tl.IsRunning())
// Close stdout and assert that logmon no longer writes to the file // Close stdout and assert that logmon no longer writes to the fileA
require.NoError(stdout.Close()) require.NoError(stdout.Close())
require.NoError(stderr.Close()) require.NoError(stderr.Close())
testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})
// Start logmon again and assert that it can receive logs again // Start logmon again and assert that it can receive logs again
require.NoError(lm.Start(cfg)) require.NoError(lm.Start(cfg))
...@@ -267,6 +255,74 @@ func TestLogmon_Start_restart(t *testing.T) { ...@@ -267,6 +255,74 @@ func TestLogmon_Start_restart(t *testing.T) {
}) })
} }
// asserts that start goroutine properly exists when ctx is cancelled
func TestLogmon_Start_restart_Close(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)
if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}
cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
}
lm := NewLogMon(testlog.HCLogger(t))
impl, ok := lm.(*logmonImpl)
require.True(ok)
require.NoError(lm.Start(cfg))
stdout, err := fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err := fifo.OpenWriter(stderrFifoPath)
require.NoError(err)
// Write a string and assert it was written to the file
_, err = stdout.Write([]byte("test\n"))
require.NoError(err)
testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}
return "test\n" == string(raw), fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
require.True(impl.tl.IsRunning())
// Close stdout and assert that logmon no longer writes to the fileA
require.NoError(stdout.Close())
require.NoError(stderr.Close())
// Close the task logger
impl.tl.Close()
// Ensure that the task logger is no longer running
testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})
}
// panicWriter panics on use // panicWriter panics on use
type panicWriter struct{} type panicWriter struct{}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment