Commit 0eeb7862 authored by Alex Dadgar's avatar Alex Dadgar
Browse files

Stats Endpoint

parent f3b262bb
Showing with 252 additions and 55 deletions
+252 -55
......@@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"path/filepath"
"strconv"
......@@ -157,6 +158,10 @@ type Client struct {
// clientACLResolver holds the ACL resolution state
clientACLResolver
// rpcServer is used to serve RPCs by the local agent.
rpcServer *rpc.Server
endpoints rpcEndpoints
// baseLabels are used when emitting tagged metrics. All client metrics will
// have these tags, and optionally more.
baseLabels []metrics.Label
......@@ -202,6 +207,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("failed to initialize client: %v", err)
}
// Setup the clients RPC server
c.setupClientRpc()
// Initialize the ACL state
if err := c.clientACLResolver.init(); err != nil {
return nil, fmt.Errorf("failed to initialize ACL state: %v", err)
......@@ -446,35 +454,6 @@ func (c *Client) Shutdown() error {
return c.saveState()
}
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Invoke the RPCHandler if it exists
if c.config.RPCHandler != nil {
return c.config.RPCHandler.RPC(method, args, reply)
}
servers := c.servers.all()
if len(servers) == 0 {
return noServersErr
}
var mErr multierror.Error
for _, s := range servers {
// Make the RPC request
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
mErr.Errors = append(mErr.Errors, errmsg)
c.logger.Printf("[DEBUG] client: %v", errmsg)
c.servers.failed(s)
continue
}
c.servers.good(s)
return nil
}
return mErr.ErrorOrNil()
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
......@@ -2227,19 +2206,3 @@ func (c *Client) allAllocs() map[string]*structs.Allocation {
}
return allocs
}
// resolveServer given a sever's address as a string, return it's resolved
// net.Addr or an error.
func resolveServer(s string) (net.Addr, error) {
const defaultClientPort = "4647" // default client RPC port
host, port, err := net.SplitHostPort(s)
if err != nil {
if strings.Contains(err.Error(), "missing port") {
host = s
port = defaultClientPort
} else {
return nil, err
}
}
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
}
package client
import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
)
// ClientStats endpoint is used for retrieving stats about a client
type ClientStats struct {
c *Client
}
// Stats is used to retrieve the Clients stats.
func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.ClientStatsResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client", "client_stats", "stats"}, time.Now())
// Check node read permissions
if aclObj, err := s.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeRead() {
return nstructs.ErrPermissionDenied
}
clientStats := s.c.StatsReporter()
reply.HostStats = clientStats.LatestHostStats()
return nil
}
package client
import (
"testing"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/mock"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestClientStats_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
client := testClient(t, nil)
req := &structs.ClientStatsRequest{}
var resp structs.ClientStatsResponse
require.Nil(client.ClientRPC("ClientStats.Stats", &req, &resp))
require.NotNil(resp.HostStats)
require.NotNil(resp.HostStats.AllocDirStats)
require.NotZero(resp.HostStats.Uptime)
}
func TestClientStats_Stats_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()
client := testClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer client.Shutdown()
// Try request without a token and expect failure
{
req := &structs.ClientStatsRequest{}
var resp structs.ClientStatsResponse
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
req := &structs.ClientStatsRequest{}
req.AuthToken = token.SecretID
var resp structs.ClientStatsResponse
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}
// Try request with a valid token
{
token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyRead))
req := &structs.ClientStatsRequest{}
req.AuthToken = token.SecretID
var resp structs.ClientStatsResponse
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
require.Nil(err)
require.NotNil(resp.HostStats)
}
// Try request with a management token
{
req := &structs.ClientStatsRequest{}
req.AuthToken = root.SecretID
var resp structs.ClientStatsResponse
err := client.ClientRPC("ClientStats.Stats", &req, &resp)
require.Nil(err)
require.NotNil(resp.HostStats)
}
}
......@@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
......@@ -120,7 +121,7 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
cb(conf)
}
logger := log.New(conf.LogOutput, "", log.LstdFlags)
logger := testlog.Logger(t)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
......
package client
import (
"fmt"
"net"
"net/rpc"
"strings"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/codec"
)
// rpcEndpoints holds the RPC endpoints
type rpcEndpoints struct {
ClientStats *ClientStats
}
// ClientRPC is used to make a local, client only RPC call
func (c *Client) ClientRPC(method string, args interface{}, reply interface{}) error {
codec := &codec.InmemCodec{
Method: method,
Args: args,
Reply: reply,
}
if err := c.rpcServer.ServeRequest(codec); err != nil {
return err
}
return codec.Err
}
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Invoke the RPCHandler if it exists
if c.config.RPCHandler != nil {
return c.config.RPCHandler.RPC(method, args, reply)
}
servers := c.servers.all()
if len(servers) == 0 {
return noServersErr
}
var mErr multierror.Error
for _, s := range servers {
// Make the RPC request
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
mErr.Errors = append(mErr.Errors, errmsg)
c.logger.Printf("[DEBUG] client: %v", errmsg)
c.servers.failed(s)
continue
}
c.servers.good(s)
return nil
}
return mErr.ErrorOrNil()
}
// setupClientRpc is used to setup the Client's RPC endpoints
func (c *Client) setupClientRpc() {
// Initialize the RPC handlers
c.endpoints.ClientStats = &ClientStats{c}
// Create the RPC Server
c.rpcServer = rpc.NewServer()
// Register the endpoints with the RPC server
c.setupClientRpcServer(c.rpcServer)
}
// setupClientRpcServer is used to populate a client RPC server with endpoints.
func (c *Client) setupClientRpcServer(server *rpc.Server) {
// Register the endpoints
server.Register(c.endpoints.ClientStats)
}
// resolveServer given a sever's address as a string, return it's resolved
// net.Addr or an error.
func resolveServer(s string) (net.Addr, error) {
const defaultClientPort = "4647" // default client RPC port
host, port, err := net.SplitHostPort(s)
if err != nil {
if strings.Contains(err.Error(), "missing port") {
host = s
port = defaultClientPort
} else {
return nil, err
}
}
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
}
......@@ -93,7 +93,12 @@ func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollec
func (h *HostStatsCollector) Collect() error {
h.hostStatsLock.Lock()
defer h.hostStatsLock.Unlock()
return h.collectLocked()
}
// collectLocked collects stats related to resource usage of the host but should
// be called with the lock held.
func (h *HostStatsCollector) collectLocked() error {
hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()}
// Determine up-time
......@@ -185,6 +190,13 @@ func (h *HostStatsCollector) collectDiskStats() ([]*DiskStats, error) {
func (h *HostStatsCollector) Stats() *HostStats {
h.hostStatsLock.RLock()
defer h.hostStatsLock.RUnlock()
if h.hostStats == nil {
if err := h.collectLocked(); err != nil {
h.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err)
}
}
return h.hostStats
}
......
......@@ -4,8 +4,23 @@ import (
"crypto/md5"
"io"
"strconv"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad/structs"
)
// ClientStatsRequest is used to request stats about a Node.
type ClientStatsRequest struct {
NodeID string
structs.QueryOptions
}
// ClientStatsResponse is used to return statistics about a node.
type ClientStatsResponse struct {
HostStats *stats.HostStats
structs.QueryMeta
}
// MemoryStats holds memory usage related stats
type MemoryStats struct {
RSS uint64
......
......@@ -3,7 +3,7 @@ package agent
import (
"net/http"
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)
func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
......@@ -11,16 +11,15 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ
return nil, clientNotRunning
}
var secret string
s.parseToken(req, &secret)
// Parse the ACL token
var args cstructs.ClientStatsRequest
s.parseToken(req, &args.AuthToken)
// Check node read permissions
if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
// Make the RPC
var reply cstructs.ClientStatsResponse
if err := s.agent.Client().ClientRPC("ClientStats.Stats", &args, &reply); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowNodeRead() {
return nil, structs.ErrPermissionDenied
}
clientStats := s.agent.client.StatsReporter()
return clientStats.LatestHostStats(), nil
return reply.HostStats, nil
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment