Commit 5ca9b6eb authored by Mahmood Ali's avatar Mahmood Ali
Browse files

fifo: Use plain fifo file in Unix

This PR switches to using plain fifo files instead of golang structs
managed by containerd/fifo library.

The library main benefit is management of opening fifo files.  In Linux,
a reader `open()` request would block until a writer opens the file (and
vice-versa).  The library uses goroutines so that it's the first IO
operation that blocks.

This benefit isn't really useful for us: Given that logmon simply
streams output in a separate process, blocking of opening or first read
is effectively the same.

The library additionally makes further complications for managing state
and tracking read/write permission that seems overhead for our use,
compared to using a file directly.

Looking here, I made the following incidental changes:
* document that we do handle if fifo files are already created, as we
rely on that behavior for logmon restarts
* use type system to lock read vs write: currently, fifo library returns
`io.ReadWriteCloser` even if fifo is opened for writing only!
parent da430f86
Showing with 76 additions and 30 deletions
+76 -30
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestFIFO tests basic behavior, and that reader closes when writer closes
func TestFIFO(t *testing.T) { func TestFIFO(t *testing.T) {
require := require.New(t) require := require.New(t)
var path string var path string
...@@ -29,9 +30,11 @@ func TestFIFO(t *testing.T) { ...@@ -29,9 +30,11 @@ func TestFIFO(t *testing.T) {
path = filepath.Join(dir, "fifo") path = filepath.Join(dir, "fifo")
} }
reader, err := New(path) readerOpenFn, err := New(path)
require.NoError(err) require.NoError(err)
var reader io.ReadCloser
toWrite := [][]byte{ toWrite := [][]byte{
[]byte("abc\n"), []byte("abc\n"),
[]byte(""), []byte(""),
...@@ -45,7 +48,12 @@ func TestFIFO(t *testing.T) { ...@@ -45,7 +48,12 @@ func TestFIFO(t *testing.T) {
wait.Add(1) wait.Add(1)
go func() { go func() {
defer wait.Done() defer wait.Done()
io.Copy(&readBuf, reader)
reader, err = readerOpenFn()
require.NoError(err)
_, err = io.Copy(&readBuf, reader)
require.NoError(err)
}() }()
writer, err := Open(path) writer, err := Open(path)
...@@ -57,9 +65,9 @@ func TestFIFO(t *testing.T) { ...@@ -57,9 +65,9 @@ func TestFIFO(t *testing.T) {
} }
require.NoError(writer.Close()) require.NoError(writer.Close())
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
require.NoError(reader.Close())
wait.Wait() wait.Wait()
require.NoError(reader.Close())
expected := "abc\ndef\nnomad\n" expected := "abc\ndef\nnomad\n"
require.Equal(expected, readBuf.String()) require.Equal(expected, readBuf.String())
...@@ -67,6 +75,7 @@ func TestFIFO(t *testing.T) { ...@@ -67,6 +75,7 @@ func TestFIFO(t *testing.T) {
require.NoError(Remove(path)) require.NoError(Remove(path))
} }
// TestWriteClose asserts that when writer closes, subsequent Write() fails
func TestWriteClose(t *testing.T) { func TestWriteClose(t *testing.T) {
require := require.New(t) require := require.New(t)
var path string var path string
...@@ -81,15 +90,21 @@ func TestWriteClose(t *testing.T) { ...@@ -81,15 +90,21 @@ func TestWriteClose(t *testing.T) {
path = filepath.Join(dir, "fifo") path = filepath.Join(dir, "fifo")
} }
reader, err := New(path) readerOpenFn, err := New(path)
require.NoError(err) require.NoError(err)
var reader io.ReadCloser
var readBuf bytes.Buffer var readBuf bytes.Buffer
var wait sync.WaitGroup var wait sync.WaitGroup
wait.Add(1) wait.Add(1)
go func() { go func() {
defer wait.Done() defer wait.Done()
io.Copy(&readBuf, reader)
reader, err = readerOpenFn()
require.NoError(err)
_, err = io.Copy(&readBuf, reader)
require.NoError(err)
}() }()
writer, err := Open(path) writer, err := Open(path)
......
...@@ -3,23 +3,34 @@ ...@@ -3,23 +3,34 @@
package fifo package fifo
import ( import (
"context" "fmt"
"io" "io"
"os" "os"
"syscall"
cfifo "github.com/containerd/fifo" "golang.org/x/sys/unix"
) )
// New creates a fifo at the given path and returns an io.ReadWriteCloser for it // New creates a fifo at the given path, and returns an open function for reading.
// The fifo must not already exist // The fifo must not exist already, or that it's already a fifo file
func New(path string) (io.ReadWriteCloser, error) { //
return cfifo.OpenFifo(context.Background(), path, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600) // It returns a reader open function that may block until a writer opens
// so it's advised to run it in a goroutine different from reader goroutine
func New(path string) (func() (io.ReadCloser, error), error) {
// create first
if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("error creating fifo %v: %v", path, err)
}
openFn := func() (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}
return openFn, nil
} }
// Open opens a fifo that already exists and returns an io.ReadWriteCloser for it // Open opens a fifo file for reading, assuming it already exists, returns io.WriteCloser
func Open(path string) (io.ReadWriteCloser, error) { func Open(path string) (io.WriteCloser, error) {
return cfifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0600) return os.OpenFile(path, unix.O_WRONLY, os.ModeNamedPipe)
} }
// Remove a fifo that already exists at a given path // Remove a fifo that already exists at a given path
...@@ -34,3 +45,7 @@ func IsClosedErr(err error) bool { ...@@ -34,3 +45,7 @@ func IsClosedErr(err error) bool {
} }
return false return false
} }
func mkfifo(path string, mode uint32) (err error) {
return unix.Mkfifo(path, mode)
}
...@@ -67,9 +67,9 @@ func (f *winFIFO) Close() error { ...@@ -67,9 +67,9 @@ func (f *winFIFO) Close() error {
return f.listener.Close() return f.listener.Close()
} }
// New creates a fifo at the given path and returns an io.ReadWriteCloser for it. The fifo // New creates a fifo at the given path and returns an io.ReadCloser open for it.
// must not already exist // The fifo must not already exist
func New(path string) (io.ReadWriteCloser, error) { func New(path string) (func() (io.ReadCloser, error), error) {
l, err := winio.ListenPipe(path, &winio.PipeConfig{ l, err := winio.ListenPipe(path, &winio.PipeConfig{
InputBufferSize: PipeBufferSize, InputBufferSize: PipeBufferSize,
OutputBufferSize: PipeBufferSize, OutputBufferSize: PipeBufferSize,
...@@ -78,13 +78,17 @@ func New(path string) (io.ReadWriteCloser, error) { ...@@ -78,13 +78,17 @@ func New(path string) (io.ReadWriteCloser, error) {
return nil, err return nil, err
} }
return &winFIFO{ openFn := func() (io.ReadCloser, error) {
listener: l, return &winFIFO{
}, nil listener: l,
}, nil
}
return openFn, nil
} }
// OpenWriter opens a fifo that already exists and returns an io.ReadWriteCloser for it // OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it
func Open(path string) (io.ReadWriteCloser, error) { func Open(path string) (io.WriteCloser, error) {
return winio.DialPipe(path, nil) return winio.DialPipe(path, nil)
} }
......
...@@ -177,10 +177,12 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) { ...@@ -177,10 +177,12 @@ func NewTaskLogger(cfg *LogConfig, logger hclog.Logger) (*TaskLogger, error) {
// data will be copied from the reader to the rotator. // data will be copied from the reader to the rotator.
type logRotatorWrapper struct { type logRotatorWrapper struct {
fifoPath string fifoPath string
processOutReader io.ReadCloser
rotatorWriter *logging.FileRotator rotatorWriter *logging.FileRotator
hasFinishedCopied chan struct{} hasFinishedCopied chan struct{}
logger hclog.Logger logger hclog.Logger
processOutReader io.ReadCloser
opened chan struct{}
} }
// isRunning will return true until the reader is closed // isRunning will return true until the reader is closed
...@@ -197,29 +199,38 @@ func (l *logRotatorWrapper) isRunning() bool { ...@@ -197,29 +199,38 @@ func (l *logRotatorWrapper) isRunning() bool {
// processOutWriter to attach to the stdout or stderr of a process. // processOutWriter to attach to the stdout or stderr of a process.
func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) { func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
logger.Info("opening fifo", "path", path) logger.Info("opening fifo", "path", path)
f, err := fifo.New(path) fifoOpenFn, err := fifo.New(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err) return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
} }
wrap := &logRotatorWrapper{ wrap := &logRotatorWrapper{
fifoPath: path, fifoPath: path,
processOutReader: f,
rotatorWriter: rotator, rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}), hasFinishedCopied: make(chan struct{}),
opened: make(chan struct{}),
logger: logger, logger: logger,
} }
wrap.start() wrap.start(fifoOpenFn)
return wrap, nil return wrap, nil
} }
// start starts a goroutine that copies from the pipe into the rotator. This is // start starts a goroutine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper. // called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start() { func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) {
go func() { go func() {
defer close(l.hasFinishedCopied) defer close(l.hasFinishedCopied)
_, err := io.Copy(l.rotatorWriter, l.processOutReader)
reader, err := readerOpenFn()
if err != nil {
return
}
l.processOutReader = reader
close(l.opened)
_, err = io.Copy(l.rotatorWriter, reader)
if err != nil { if err != nil {
l.logger.Error("copying got an error", "error", err)
// Close reader to propagate io error across pipe. // Close reader to propagate io error across pipe.
// Note that this may block until the process exits on // Note that this may block until the process exits on
// Windows due to // Windows due to
...@@ -227,7 +238,7 @@ func (l *logRotatorWrapper) start() { ...@@ -227,7 +238,7 @@ func (l *logRotatorWrapper) start() {
// or similar issues. Since this is already running in // or similar issues. Since this is already running in
// a goroutine its safe to block until the process is // a goroutine its safe to block until the process is
// force-killed. // force-killed.
l.processOutReader.Close() reader.Close()
} }
}() }()
return return
...@@ -249,6 +260,7 @@ func (l *logRotatorWrapper) Close() { ...@@ -249,6 +260,7 @@ func (l *logRotatorWrapper) Close() {
closeDone := make(chan struct{}) closeDone := make(chan struct{})
go func() { go func() {
defer close(closeDone) defer close(closeDone)
<-l.opened
err := l.processOutReader.Close() err := l.processOutReader.Close()
if err != nil && !strings.Contains(err.Error(), "file already closed") { if err != nil && !strings.Contains(err.Error(), "file already closed") {
l.logger.Warn("error closing read-side of process output pipe", "err", err) l.logger.Warn("error closing read-side of process output pipe", "err", err)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment