Unverified Commit 6a0eb7bd authored by Alex Dadgar's avatar Alex Dadgar Committed by GitHub
Browse files

Merge pull request #3756 from hashicorp/f-server-routing

Server routing of client RPCs
parents 0fd21fd4 b816d44f
Showing with 430 additions and 53 deletions
+430 -53
package nomad
import (
"fmt"
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)
// nodeConnState is used to track connection information about a Nomad Client.
type nodeConnState struct {
// Session holds the multiplexed yamux Session for dialing back.
Session *yamux.Session
// Established is when the connection was established.
Established time.Time
}
// getNodeConn returns the connection to the given node and whether it exists.
func (s *Server) getNodeConn(nodeID string) (*nodeConnState, bool) {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
state, ok := s.nodeConns[nodeID]
return state, ok
}
// connectedNodes returns the set of nodes we have a connection with.
func (s *Server) connectedNodes() map[string]time.Time {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
nodes := make(map[string]time.Time, len(s.nodeConns))
for nodeID, state := range s.nodeConns {
nodes[nodeID] = state.Established
}
return nodes
}
// addNodeConn adds the mapping between a node and its session.
func (s *Server) addNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
s.nodeConns[ctx.NodeID] = &nodeConnState{
Session: ctx.Session,
Established: time.Now(),
}
}
// removeNodeConn removes the mapping between a node and its session.
func (s *Server) removeNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
delete(s.nodeConns, ctx.NodeID)
}
// serverWithNodeConn is used to determine which remote server has the most
// recent connection to the given node. The local server is not queried.
// ErrNoNodeConn is returned if all local peers could be queried but did not
// have a connection to the node. Otherwise if a connection could not be found
// and there were RPC errors, an error is returned.
func (s *Server) serverWithNodeConn(nodeID string) (*serverParts, error) {
s.peerLock.RLock()
defer s.peerLock.RUnlock()
// We skip ourselves.
selfAddr := s.LocalMember().Addr.String()
// Build the request
req := &structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: s.config.Region,
},
}
// connections is used to store the servers that have connections to the
// requested node.
var mostRecentServer *serverParts
var mostRecent time.Time
var rpcErr multierror.Error
for addr, server := range s.localPeers {
if string(addr) == selfAddr {
continue
}
// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.HasNodeConn", &req, &resp)
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
}
if resp.Connected && resp.Established.After(mostRecent) {
mostRecentServer = server
mostRecent = resp.Established
}
}
// Return an error if there is no route to the node.
if mostRecentServer == nil {
if err := rpcErr.ErrorOrNil(); err != nil {
return nil, err
}
return nil, ErrNoNodeConn
}
return mostRecentServer, nil
}
package nomad
import (
"testing"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestServerWithNodeConn_NoPath(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
nodeID := uuid.Generate()
srv, err := s1.serverWithNodeConn(nodeID)
require.Nil(srv)
require.EqualError(err, ErrNoNodeConn.Error())
}
func TestServerWithNodeConn_Path(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Create a fake connection for the node on server 2
nodeID := uuid.Generate()
s2.addNodeConn(&RPCContext{
NodeID: nodeID,
})
srv, err := s1.serverWithNodeConn(nodeID)
require.NotNil(srv)
require.Equal(srv.Addr.String(), s2.config.RPCAddr.String())
require.Nil(err)
}
func TestServerWithNodeConn_Path_Newest(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
s3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s3.Shutdown()
TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForLeader(t, s3.RPC)
// Create a fake connection for the node on server 2 and 3
nodeID := uuid.Generate()
s2.addNodeConn(&RPCContext{
NodeID: nodeID,
})
s3.addNodeConn(&RPCContext{
NodeID: nodeID,
})
srv, err := s1.serverWithNodeConn(nodeID)
require.NotNil(srv)
require.Equal(srv.Addr.String(), s3.config.RPCAddr.String())
require.Nil(err)
}
func TestServerWithNodeConn_PathAndErr(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
s3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s3.Shutdown()
TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForLeader(t, s3.RPC)
// Create a fake connection for the node on server 2
nodeID := uuid.Generate()
s2.addNodeConn(&RPCContext{
NodeID: nodeID,
})
// Shutdown the RPC layer for server 3
s3.rpcListener.Close()
srv, err := s1.serverWithNodeConn(nodeID)
require.NotNil(srv)
require.Equal(srv.Addr.String(), s2.config.RPCAddr.String())
require.Nil(err)
}
func TestServerWithNodeConn_NoPathAndErr(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
s3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s3.Shutdown()
TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForLeader(t, s3.RPC)
// Shutdown the RPC layer for server 3
s3.rpcListener.Close()
srv, err := s1.serverWithNodeConn(uuid.Generate())
require.Nil(srv)
require.NotNil(err)
require.Contains(err.Error(), "failed querying")
}
......@@ -57,7 +57,7 @@ func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.Cli
}
// Get the connection to the client
session, ok := s.srv.getNodeConn(args.NodeID)
state, ok := s.srv.getNodeConn(args.NodeID)
if !ok {
// Check if the node even exists
snap, err := s.srv.State().Snapshot()
......@@ -74,12 +74,21 @@ func (s *ClientStats) Stats(args *structs.ClientStatsRequest, reply *structs.Cli
return fmt.Errorf("Unknown node %q", args.NodeID)
}
// TODO Handle forwarding to other servers
return ErrNoNodeConn
// Determine the Server that has a connection to the node.
srv, err := s.srv.serverWithNodeConn(args.NodeID)
if err != nil {
return err
}
if srv == nil {
return ErrNoNodeConn
}
return s.srv.forwardServer(srv, "ClientStats.Stats", args, reply)
}
// Open a new session
stream, err := session.Open()
stream, err := state.Session.Open()
if err != nil {
return err
}
......
......@@ -7,6 +7,7 @@ import (
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
......@@ -51,3 +52,68 @@ func TestClientStats_Stats_Local(t *testing.T) {
require.Nil(err)
require.NotNil(resp2.HostStats)
}
func TestClientStats_Stats_NoNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Make the request without having a node-id
req := &cstructs.ClientStatsRequest{
NodeID: uuid.Generate(),
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.ClientStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientStats.Stats", req, &resp)
require.Nil(resp.HostStats)
require.NotNil(err)
require.Contains(err.Error(), "Unknown node")
}
func TestClientStats_Stats_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Make the request without having a node-id
req := &cstructs.ClientStatsRequest{
NodeID: uuid.Generate(),
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
req.NodeID = c.NodeID()
var resp cstructs.ClientStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientStats.Stats", req, &resp)
require.Nil(err)
require.NotNil(resp.HostStats)
}
......@@ -49,7 +49,7 @@ func TestClientEndpoint_Register(t *testing.T) {
// Check that we have the client connections
nodes := s1.connectedNodes()
require.Len(nodes, 1)
require.Equal(node.ID, nodes[0])
require.Contains(nodes, node.ID)
// Check for the node in the FSM
state := s1.fsm.State()
......@@ -330,7 +330,7 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
// Check that we have the client connections
nodes := s1.connectedNodes()
require.Len(nodes, 1)
require.Equal(node.ID, nodes[0])
require.Contains(nodes, node.ID)
// Check for the node in the FSM
state := s1.fsm.State()
......@@ -1311,7 +1311,7 @@ func TestClientEndpoint_GetClientAllocs(t *testing.T) {
// Check that we have the client connections
nodes := s1.connectedNodes()
require.Len(nodes, 1)
require.Equal(node.ID, nodes[0])
require.Contains(nodes, node.ID)
// Lookup node with bad SecretID
get.SecretID = "foobarbaz"
......
......@@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"math/rand"
......@@ -306,6 +307,15 @@ func (s *Server) forwardLeader(server *serverParts, method string, args interfac
return s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion, method, args, reply)
}
// forwardServer is used to forward an RPC call to a particular server
func (s *Server) forwardServer(server *serverParts, method string, args interface{}, reply interface{}) error {
// Handle a missing server
if server == nil {
return errors.New("must be given a valid server address")
}
return s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion, method, args, reply)
}
// forwardRegion is used to forward an RPC call to a remote region, or fail if no servers
func (s *Server) forwardRegion(region, method string, args interface{}, reply interface{}) error {
// Bail if we can't find any servers
......
......@@ -30,7 +30,6 @@ import (
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/yamux"
)
const (
......@@ -120,7 +119,7 @@ type Server struct {
// nodeConns is the set of multiplexed node connections we have keyed by
// NodeID
nodeConns map[string]*yamux.Session
nodeConns map[string]*nodeConnState
nodeConnsLock sync.RWMutex
// peers is used to track the known Nomad servers. This is
......@@ -271,7 +270,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
rpcServer: rpc.NewServer(),
nodeConns: make(map[string]*yamux.Session),
nodeConns: make(map[string]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
......@@ -1154,49 +1153,6 @@ func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
return codec.Err
}
// getNodeConn returns the connection to the given node and whether it exists.
func (s *Server) getNodeConn(nodeID string) (*yamux.Session, bool) {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
session, ok := s.nodeConns[nodeID]
return session, ok
}
// connectedNodes returns the set of nodes we have a connection with.
func (s *Server) connectedNodes() []string {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
nodes := make([]string, 0, len(s.nodeConns))
for nodeID := range s.nodeConns {
nodes = append(nodes, nodeID)
}
return nodes
}
// addNodeConn adds the mapping between a node and its session.
func (s *Server) addNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
s.nodeConns[ctx.NodeID] = ctx.Session
}
// removeNodeConn removes the mapping between a node and its session.
func (s *Server) removeNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
delete(s.nodeConns, ctx.NodeID)
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (s *Server) Stats() map[string]map[string]string {
......
package nomad
import (
"errors"
"github.com/hashicorp/nomad/nomad/structs"
)
......@@ -104,3 +106,20 @@ func (s *Status) Members(args *structs.GenericRequest, reply *structs.ServerMemb
}
return nil
}
// HasNodeConn returns whether the server has a connection to the requested
// Node.
func (s *Status) HasNodeConn(args *structs.NodeSpecificRequest, reply *structs.NodeConnQueryResponse) error {
// Validate the args
if args.NodeID == "" {
return errors.New("Must provide the NodeID")
}
state, ok := s.srv.getNodeConn(args.NodeID)
if ok {
reply.Connected = true
reply.Established = state.Established
}
return nil
}
......@@ -5,10 +5,12 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStatusVersion(t *testing.T) {
......@@ -169,3 +171,37 @@ func TestStatusMembers_ACL(t *testing.T) {
assert.Len(out.Members, 1)
}
}
func TestStatus_HasClientConn(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
require := require.New(t)
arg := &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
AllowStale: true,
},
}
// Try without setting a node id
var out structs.NodeConnQueryResponse
require.NotNil(msgpackrpc.CallWithCodec(codec, "Status.HasNodeConn", arg, &out))
// Set a bad node id
arg.NodeID = uuid.Generate()
var out2 structs.NodeConnQueryResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Status.HasNodeConn", arg, &out2))
require.False(out2.Connected)
// Create a connection on that node
s1.addNodeConn(&RPCContext{
NodeID: arg.NodeID,
})
var out3 structs.NodeConnQueryResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Status.HasNodeConn", arg, &out3))
require.True(out3.Connected)
require.NotZero(out3.Established)
}
......@@ -1032,6 +1032,18 @@ type DeploymentUpdateResponse struct {
WriteMeta
}
// NodeConnQueryResponse is used to respond to a query of whether a server has
// a connection to a specific Node
type NodeConnQueryResponse struct {
// Connected indicates whether a connection to the Client exists
Connected bool
// Established marks the time at which the connection was established
Established time.Time
QueryMeta
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment