Unverified Commit 33ba36ac authored by Drew Bailey's avatar Drew Bailey
Browse files

log-json -> json


fix typo command/agent/monitor/monitor.go
Co-Authored-By: default avatarChris Baker <1675087+cgbaker@users.noreply.github.com>

Update command/agent/monitor/monitor.go
Co-Authored-By: default avatarChris Baker <1675087+cgbaker@users.noreply.github.com>

address feedback, lock to prevent send on closed channel

fix lock/unlock for dropped messages
parent bb2a7f43
No related merge requests found
Showing with 30 additions and 20 deletions
+30 -20
......@@ -5,7 +5,6 @@ import (
"context"
"errors"
"io"
"strings"
"time"
"github.com/hashicorp/nomad/command/agent/monitor"
......@@ -159,9 +158,7 @@ OUTER:
}
if streamErr != nil {
// Nothing to do as conn is closed
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
......@@ -12,12 +12,22 @@ import (
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor struct {
// sync.Mutex protects droppedCount and logCh
sync.Mutex
sink log.SinkAdapter
logger log.InterceptLogger
logCh chan []byte
droppedCount int
bufSize int
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
logCh chan []byte
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
// only access under lock
droppedCount int
bufSize int
// droppedDuration is the amount of time we should
// wait to check for dropped messages. Defaults
// to 3 seconds
droppedDuration time.Duration
}
......@@ -38,9 +48,9 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor
return sw
}
// Start registers a sink on the monitors logger and starts sending
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel. A non-nil
// sopCh can be used to deregister the sink and stop log streaming
// stopCh can be used to de-register the sink and stop log streaming
func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
d.logger.RegisterSink(d.sink)
......@@ -77,14 +87,16 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
break LOOP
case <-time.After(d.droppedDuration):
d.Lock()
defer d.Unlock()
// Check if there have been any dropped messages.
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
// Try sending dropped message count to logCh in case
// there is room in the buffer now.
case d.logCh <- []byte(dropped):
default:
// Make room for dropped message
// Drop a log message to make room for "Monitor dropped.." message
select {
case <-d.logCh:
d.droppedCount++
......@@ -95,6 +107,7 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
}
d.droppedCount = 0
}
d.Unlock()
}
}
}()
......@@ -105,6 +118,9 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *Monitor) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()
bytes := make([]byte, len(p))
copy(bytes, p)
......
......@@ -39,7 +39,7 @@ Monitor Specific Options:
-node-id <node-id>
Sets the specific node to monitor
-log-json <true|false>
-json
Sets log output to JSON format
`
return strings.TrimSpace(helpText)
......
......@@ -7,7 +7,6 @@ import (
"fmt"
"io"
"net"
"strings"
"time"
log "github.com/hashicorp/go-hclog"
......@@ -163,10 +162,8 @@ OUTER:
}
if streamErr != nil {
// Nothing to do as conn is closed
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment