Commit b0edca94 authored by Michael Schurter's avatar Michael Schurter
Browse files

core: remove all traces of unused protocol version

Nomad inherited protocol version numbering configuration from Consul and
Serf, but unlike those projects Nomad has never used it. Nomad's
`protocol_version` has always been `1`.

While the code is effectively unused and therefore poses no runtime
risks to leave, I felt like removing it was best because:

1. Nomad's RPC subsystem has been able to evolve extensively without
   needing to increment the version number.
2. Nomad's HTTP API has evolved extensively without increment
   `API{Major,Minor}Version`. If we want to version the HTTP API in the
   future, I doubt this is the mechanism we would choose.
3. The presence of the `server.protocol_version` configuration
   parameter is confusing since `server.raft_protocol` *is* an important
   parameter for operators to consider. Even more confusing is that
   there is a distinct Serf protocol version which is included in `nomad
   server members` output under the heading `Protocol`. `raft_protocol`
   is the *only* protocol version relevant to Nomad developers and
   operators. The other protocol versions are either deadcode or have
   never changed (Serf).
4. If we were to need to version the RPC, HTTP API, or Serf protocols, I
   don't think these configuration parameters and variables are the best
   choice. If we come to that point we should choose a versioning scheme
   based on the use case and modern best practices -- not this 6+ year
   old dead code.
parent 189806f6
Showing with 30 additions and 153 deletions
+30 -153
......@@ -749,18 +749,6 @@ func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
}
// RPCMajorVersion returns the structs.ApiMajorVersion supported by the
// client.
func (c *Client) RPCMajorVersion() int {
return structs.ApiMajorVersion
}
// RPCMinorVersion returns the structs.ApiMinorVersion supported by the
// client.
func (c *Client) RPCMinorVersion() int {
return structs.ApiMinorVersion
}
// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
......@@ -2773,7 +2761,7 @@ DISCOLOOP:
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
if err := c.connPool.RPC(region, addr, "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
......
......@@ -74,7 +74,7 @@ TRY:
}
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
rpcErr := c.connPool.RPC(c.Region(), server.Addr, method, args, reply)
if rpcErr == nil {
c.fireRpcRetryWatcher()
......@@ -427,7 +427,7 @@ func resolveServer(s string) (net.Addr, error) {
// a potential error.
func (c *Client) Ping(srv net.Addr) error {
var reply struct{}
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
err := c.connPool.RPC(c.Region(), srv, "Status.Ping", struct{}{}, &reply)
return err
}
......
......@@ -191,9 +191,6 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.DataDir != "" {
conf.DataDir = agentConfig.Server.DataDir
}
if agentConfig.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(agentConfig.Server.ProtocolVersion)
}
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
......
......@@ -413,6 +413,12 @@ func (c *Command) isValidConfig(config, cmdConfig *Config) bool {
}
}
// ProtocolVersion has never been used. Warn if it is set as someone
// has probably made a mistake.
if config.Server.ProtocolVersion != 0 {
c.agent.logger.Warn("Please remove deprecated protocol_version field from config.")
}
return true
}
......
......@@ -385,7 +385,9 @@ type ServerConfig struct {
// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `hcl:"protocol_version"`
//
// Deprecated: This has never been used and will emit a warning if nonzero.
ProtocolVersion int `hcl:"protocol_version" json:"-"`
// RaftProtocol is the Raft protocol version to speak. This must be from [1-3].
RaftProtocol int `hcl:"raft_protocol"`
......
......@@ -95,7 +95,6 @@ var basicConfig = &Config{
AuthoritativeRegion: "foobar",
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
RaftMultiplier: helper.IntToPtr(4),
NumSchedulers: helper.IntToPtr(2),
......@@ -494,7 +493,6 @@ func TestConfig_Parse(t *testing.T) {
}
actual = oldDefault.Merge(actual)
//panic(fmt.Sprintf("first: %+v \n second: %+v", actual.TLSConfig, tc.Result.TLSConfig))
require.EqualValues(tc.Result, removeHelperAttributes(actual))
})
}
......
......@@ -106,7 +106,6 @@ server {
authoritative_region = "foobar"
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
......
......@@ -277,7 +277,6 @@
"node_gc_threshold": "12h",
"non_voting_server": true,
"num_schedulers": 2,
"protocol_version": 3,
"raft_protocol": 3,
"raft_multiplier": 4,
"redundancy_zone": "foo",
......
......@@ -48,7 +48,6 @@ type Conn struct {
addr net.Addr
session *yamux.Session
lastUsed time.Time
version int
pool *ConnPool
......@@ -278,7 +277,7 @@ func (p *ConnPool) SetConnListener(l chan<- *Conn) {
// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) acquire(region string, addr net.Addr) (*Conn, error) {
// Check to see if there's a pooled connection available. This is up
// here since it should the vastly more common case than the rest
// of the code here.
......@@ -305,7 +304,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
// If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it.
if isLeadThread {
c, err := p.getNewConn(region, addr, version)
c, err := p.getNewConn(region, addr)
p.Lock()
delete(p.limiter, addr.String())
close(wait)
......@@ -349,7 +348,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
}
// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) {
func (p *ConnPool) getNewConn(region string, addr net.Addr) (*Conn, error) {
// Try to dial the conn
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
......@@ -404,7 +403,6 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
session: session,
clients: list.New(),
lastUsed: time.Now(),
version: version,
pool: p,
}
return c, nil
......@@ -429,12 +427,12 @@ func (p *ConnPool) clearConn(conn *Conn) {
}
}
// getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
// getClient is used to get a usable client for an address
func (p *ConnPool) getRPCClient(region string, addr net.Addr) (*Conn, *StreamClient, error) {
retries := 0
START:
// Try to get a conn first
conn, err := p.acquire(region, addr, version)
conn, err := p.acquire(region, addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
}
......@@ -457,8 +455,8 @@ START:
// StreamingRPC is used to make an streaming RPC call. Callers must
// close the connection when done.
func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.Conn, error) {
conn, err := p.acquire(region, addr, version)
func (p *ConnPool) StreamingRPC(region string, addr net.Addr) (net.Conn, error) {
conn, err := p.acquire(region, addr)
if err != nil {
return nil, fmt.Errorf("failed to get conn: %v", err)
}
......@@ -477,9 +475,9 @@ func (p *ConnPool) StreamingRPC(region string, addr net.Addr, version int) (net.
}
// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
func (p *ConnPool) RPC(region string, addr net.Addr, method string, args interface{}, reply interface{}) error {
// Get a usable client
conn, sc, err := p.getRPCClient(region, addr, version)
conn, sc, err := p.getRPCClient(region, addr)
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
......
......@@ -8,7 +8,6 @@ import (
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
......@@ -50,7 +49,7 @@ func TestConnPool_ConnListener(t *testing.T) {
pool.SetConnListener(c)
// Make an RPC
_, err = pool.acquire("test", addr, structs.ApiMajorVersion)
_, err = pool.acquire("test", addr)
require.Nil(err)
// Assert we get a connection.
......
......@@ -188,8 +188,7 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error)
// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.HasNodeConn", &req, &resp)
err := s.connPool.RPC(s.config.Region, server.Addr, "Status.HasNodeConn", &req, &resp)
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
......
package nomad
import (
"fmt"
"io"
"net"
"os"
......@@ -27,23 +26,6 @@ const (
DefaultSerfPort = 4648
)
// These are the protocol versions that Nomad can understand
const (
ProtocolVersionMin uint8 = 1
ProtocolVersionMax = 1
)
// ProtocolVersionMap is the mapping of Nomad protocol versions
// to Serf protocol versions. We mask the Serf protocols using
// our own protocol version.
var protocolVersionMap map[uint8]uint8
func init() {
protocolVersionMap = map[uint8]uint8{
1: 4,
}
}
func DefaultRPCAddr() *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 4647}
}
......@@ -93,10 +75,6 @@ type Config struct {
// Logger is the logger used by the server.
Logger log.InterceptLogger
// ProtocolVersion is the protocol version to speak. This must be between
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion uint8
// RPCAddr is the RPC address used by Nomad. This should be reachable
// by the other servers and clients
RPCAddr *net.TCPAddr
......@@ -370,18 +348,6 @@ type Config struct {
DeploymentQueryRateLimit float64
}
// CheckVersion is used to check if the ProtocolVersion is valid
func (c *Config) CheckVersion() error {
if c.ProtocolVersion < ProtocolVersionMin {
return fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
} else if c.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
c.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
return nil
}
// DefaultConfig returns the default configuration. Only used as the basis for
// merging agent or test parameters.
func DefaultConfig() *Config {
......@@ -396,7 +362,6 @@ func DefaultConfig() *Config {
Datacenter: DefaultDC,
NodeName: hostname,
NodeID: uuid.Generate(),
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
LogOutput: os.Stderr,
......
......@@ -267,8 +267,6 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
reply.Servers = append(reply.Servers,
&structs.NodeServerInfo{
RPCAdvertiseAddr: v.RPCAddr.String(),
RPCMajorVersion: int32(v.MajorVersion),
RPCMinorVersion: int32(v.MinorVersion),
Datacenter: v.Datacenter,
})
}
......
......@@ -612,7 +612,7 @@ func (r *rpcHandler) forwardLeader(server *serverParts, method string, args inte
if server == nil {
return structs.ErrNoLeader
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}
// forwardServer is used to forward an RPC call to a particular server
......@@ -621,7 +621,7 @@ func (r *rpcHandler) forwardServer(server *serverParts, method string, args inte
if server == nil {
return errors.New("must be given a valid server address")
}
return r.connPool.RPC(r.config.Region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(r.config.Region, server.Addr, method, args, reply)
}
func (r *rpcHandler) findRegionServer(region string) (*serverParts, error) {
......@@ -648,7 +648,7 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl
// Forward to remote Nomad
metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1)
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
return r.connPool.RPC(region, server.Addr, method, args, reply)
}
func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
......@@ -676,7 +676,7 @@ func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
func (r *rpcHandler) streamingRpc(server *serverParts, method string) (net.Conn, error) {
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr, server.MajorVersion)
c, err := r.connPool.StreamingRPC(r.config.Region, server.Addr)
if err != nil {
return nil, err
}
......
......@@ -164,7 +164,7 @@ func (s *Server) maybeBootstrap() {
// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
if err := s.connPool.RPC(s.config.Region, server.Addr,
"Status.Peers", req, &peers); err != nil {
nextRetry := (1 << attempt) * peerRetryBase
s.logger.Error("failed to confirm peer status", "peer", server.Name, "error", err, "retry", nextRetry)
......
......@@ -289,10 +289,6 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}
// Create an eval broker
evalBroker, err := NewEvalBroker(
......@@ -1380,8 +1376,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.Tags["role"] = "nomad"
conf.Tags["region"] = s.config.Region
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["id"] = s.config.NodeID
......@@ -1415,7 +1409,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return nil, err
}
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = true
// LeavePropagateDelay is used to make sure broadcasted leave intents propagate
// This value was tuned using https://www.serf.io/docs/internals/simulator.html to
......
......@@ -43,7 +43,7 @@ func NewStatsFetcher(logger log.Logger, pool *pool.ConnPool, region string) *Sta
func (f *StatsFetcher) fetch(server *serverParts, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.region, server.Addr, server.MajorVersion, "Status.RaftStats", &args, &reply)
err := f.pool.RPC(f.region, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Warn("failed retrieving server health", "server", server.Name, "error", err)
} else {
......
......@@ -17,23 +17,6 @@ type Status struct {
logger log.Logger
}
// Version is used to allow clients to determine the capabilities
// of the server
func (s *Status) Version(args *structs.GenericRequest, reply *structs.VersionResponse) error {
if done, err := s.srv.forward("Status.Version", args, args, reply); done {
return err
}
conf := s.srv.config
reply.Build = conf.Build
reply.Versions = map[string]int{
structs.ProtocolVersion: int(conf.ProtocolVersion),
structs.APIMajorVersion: structs.ApiMajorVersion,
structs.APIMinorVersion: structs.ApiMinorVersion,
}
return nil
}
// Ping is used to just check for connectivity
func (s *Status) Ping(args struct{}, reply *struct{}) error {
return nil
......
......@@ -13,38 +13,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestStatusVersion(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
arg := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
AllowStale: true,
},
}
var out structs.VersionResponse
if err := msgpackrpc.CallWithCodec(codec, "Status.Version", arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out.Build == "" {
t.Fatalf("bad: %#v", out)
}
if out.Versions[structs.ProtocolVersion] != ProtocolVersionMax {
t.Fatalf("bad: %#v", out)
}
if out.Versions[structs.APIMajorVersion] != structs.ApiMajorVersion {
t.Fatalf("bad: %#v", out)
}
if out.Versions[structs.APIMinorVersion] != structs.ApiMinorVersion {
t.Fatalf("bad: %#v", out)
}
}
func TestStatusPing(t *testing.T) {
t.Parallel()
......
......@@ -122,21 +122,6 @@ const (
// methods directly that require an FSM MessageType
MsgTypeTestSetup MessageType = IgnoreUnknownTypeFlag
// ApiMajorVersion is returned as part of the Status.Version request.
// It should be incremented anytime the APIs are changed in a way
// that would break clients for sane client versioning.
ApiMajorVersion = 1
// ApiMinorVersion is returned as part of the Status.Version request.
// It should be incremented anytime the APIs are changed to allow
// for sane client versioning. Minor changes should be compatible
// within the major version.
ApiMinorVersion = 1
ProtocolVersion = "protocol"
APIMajorVersion = "api.major"
APIMinorVersion = "api.minor"
GetterModeAny = "any"
GetterModeFile = "file"
GetterModeDir = "dir"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment