Unverified Commit e27e323a authored by Drew Bailey's avatar Drew Bailey Committed by GitHub
Browse files

Merge pull request #6691 from hashicorp/f-nomad-monitor-leader

Nomad monitor - target remote servers
parents ed111c3b e46c4155
No related merge requests found
Showing with 251 additions and 9 deletions
+251 -9
......@@ -44,6 +44,9 @@ type MonitorRequest struct {
// NodeID is the node we want to track the logs of
NodeID string
// ServerID is the server we want to track the logs of
ServerID string
// PlainText disables base64 encoding.
PlainText bool
......
......@@ -175,9 +175,6 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
return nil, CodedError(400, fmt.Sprintf("Unknown log level: %s", logLevel))
}
// Determine if we are targeting a server or client
nodeID := req.URL.Query().Get("node_id")
logJSON := false
logJSONStr := req.URL.Query().Get("log_json")
if logJSONStr != "" {
......@@ -198,14 +195,21 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
plainText = parsed
}
nodeID := req.URL.Query().Get("node_id")
// Build the request and parse the ACL token
args := cstructs.MonitorRequest{
NodeID: nodeID,
ServerID: req.URL.Query().Get("server_id"),
LogLevel: logLevel,
LogJSON: logJSON,
PlainText: plainText,
}
// if node and server were requested return error
if args.NodeID != "" && args.ServerID != "" {
return nil, CodedError(400, "Cannot target node and server simultaneously")
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
// Make the RPC
......
......@@ -61,12 +61,14 @@ func (c *MonitorCommand) Run(args []string) int {
var logLevel string
var nodeID string
var serverID string
var logJSON bool
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&logLevel, "log-level", "", "")
flags.StringVar(&nodeID, "node-id", "", "")
flags.StringVar(&serverID, "server-id", "", "")
flags.BoolVar(&logJSON, "json", false, "")
if err := flags.Parse(args); err != nil {
......@@ -90,6 +92,7 @@ func (c *MonitorCommand) Run(args []string) int {
params := map[string]string{
"log_level": logLevel,
"node_id": nodeID,
"server_id": serverID,
"log_json": strconv.FormatBool(logJSON),
}
......
......@@ -61,11 +61,28 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
// Targeting a node, forward request to node
if args.NodeID != "" {
m.forwardMonitor(conn, args, encoder, decoder)
m.forwardMonitorClient(conn, args, encoder, decoder)
// forwarded request has ended, return
return
}
currentServer := m.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
forwardServer = true
}
// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !m.srv.IsLeader() {
forwardServer = true
}
if forwardServer {
m.forwardMonitorServer(conn, args, encoder, decoder)
return
}
// NodeID was empty, so monitor this current server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -168,7 +185,7 @@ OUTER:
}
}
func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
nodeID := args.NodeID
snap, err := m.srv.State().Snapshot()
......@@ -236,3 +253,56 @@ func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorReq
structs.Bridge(conn, clientConn)
return
}
func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
var target *serverParts
serverID := args.ServerID
// empty ServerID to prevent forwarding loop
args.ServerID = ""
if serverID == "leader" {
isLeader, remoteServer := m.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
if !isLeader && remoteServer == nil {
handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder)
return
}
} else {
// See if the server ID is a known member
serfMembers := m.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
target = srv
}
}
}
}
// Unable to find a server
if target == nil {
err := fmt.Errorf("unknown nomad server %s", serverID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
defer serverConn.Close()
// Send the Request
outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
structs.Bridge(conn, serverConn)
return
}
......@@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
......@@ -22,7 +23,7 @@ import (
"github.com/ugorji/go/codec"
)
func TestMonitor_Monitor_Remote_Server(t *testing.T) {
func TestMonitor_Monitor_Remote_Client(t *testing.T) {
t.Parallel()
require := require.New(t)
......@@ -117,6 +118,156 @@ OUTER:
}
}
func TestMonitor_Monitor_RemoteServer(t *testing.T) {
t.Parallel()
// start servers
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// determine leader and nonleader
servers := []*Server{s1, s2}
var nonLeader *Server
var leader *Server
for _, s := range servers {
if !s.IsLeader() {
nonLeader = s
} else {
leader = s
}
}
cases := []struct {
desc string
serverID string
expectedLog string
logger hclog.InterceptLogger
origin *Server
}{
{
desc: "remote leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: nonLeader,
},
{
desc: "remote server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
},
{
desc: "serverID is current leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: leader,
},
{
desc: "serverID is current server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "non leader log",
logger: nonLeader.logger,
origin: nonLeader,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
// send some specific logs
doneCh := make(chan struct{})
go func() {
for {
select {
case <-doneCh:
return
default:
tc.logger.Warn(tc.expectedLog)
time.Sleep(10 * time.Millisecond)
}
}
}()
req := cstructs.MonitorRequest{
LogLevel: "warn",
ServerID: tc.serverID,
}
handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(2 * time.Second)
received := ""
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
}
}
}
})
}
}
func TestMonitor_MonitorServer(t *testing.T) {
t.Parallel()
require := require.New(t)
......
......@@ -518,16 +518,20 @@ The table below shows this endpoint's support for
### Parameters
- `log-level` `(string: "info")` - Specifies a text string containing a log level
- `log_level` `(string: "info")` - Specifies a text string containing a log level
to filter on, such as `info`. Possible values include `trace`, `debug`,
`info`, `warn`, `error`
- `json` `(bool: false)` - Specifies if the log format for streamed logs
should be JSON.
- `node-id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text
- `node_id` `(string: "a57b2adb-1a30-2dda-8df0-25abb0881952")` - Specifies a text
string containing a node-id to target for streaming.
- `server_id` `(string: "server1.global")` - Specifies a text
string containing a server name or "leader" to target a specific remote server
or leader for streaming.
- `plain` `(bool: false)` - Specifies if the response should be JSON or
plaintext
......@@ -535,7 +539,10 @@ The table below shows this endpoint's support for
```text
$ curl \
https://localhost:4646/v1/agent/monitor?log-level=debug&node-id=a57b2adb-1a30-2dda-8df0-25abb0881952
https://localhost:4646/v1/agent/monitor?log_level=debug&server_id=leader
$ curl \
https://localhost:4646/v1/agent/monitor?log_level=debug&node_id=a57b2adb-1a30-2dda-8df0-25abb0881952
```
### Sample Response
......
......@@ -37,6 +37,10 @@ The monitor command also allows you to specify a single client node id to follow
- `-node-id`: Specifies the client node-id to stream logs from. If no
node-id is given the nomad server from the -address flag will be used.
- `-server-id`: Specifies the nomad server id to stream logs from. Accepts
server names from `nomad server members` and also a special `leader` option
which will target the current leader.
- `-json`: Stream logs in json format
## Examples
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment