Commit c86a3bb8 authored by Michael Schurter's avatar Michael Schurter
Browse files

client: fix data races in config handling (#14139)

Before this change, Client had 2 copies of the config object: config and configCopy. There was no guidance around which to use where (other than configCopy's comment to pass it to alloc runners), both are shared among goroutines and mutated in data racy ways. At least at one point I think the idea was to have `config` be mutable and then grab a lock to overwrite `configCopy`'s pointer atomically. This would have allowed alloc runners to read their config copies in data race safe ways, but this isn't how the current implementation worked.

This change takes the following approach to safely handling configs in the client:

1. `Client.config` is the only copy of the config and all access must go through the `Client.configLock` mutex
2. Since the mutex *only protects the config pointer itself and not fields inside the Config struct:* all config mutation must be done on a *copy* of the config, and then Client's config pointer is overwritten while the mutex is acquired. Alloc runners and other goroutines with the old config pointer will not see config updates.
3. Deep copying is implemented on the Config struct to satisfy the previous approach. The TLS Keyloader is an exception because it has its own internal locking to support mutating in place. An unfortunate complication but one I couldn't find a way to untangle in a timely fashion.
4. To facilitate deep copying I made an *internally backward incompatible API change:* our `helper/funcs` used to turn containers (slices and maps) with 0 elements into nils. This probably saves a few memory allocations but makes it very easy to cause panics. Since my new config handling approach uses more copying, it became very difficult to ensure all code that used containers on configs could handle nils properly. Since this code has caused panics in the past, I fixed it: nil containers are copied as nil, but 0-element containers properly return a new 0-element container. No more "downgrading to nil!"
parent e993aa6e
Showing with 597 additions and 309 deletions
+597 -309
......@@ -81,7 +81,7 @@ func (c *Client) ResolveSecretToken(secretID string) (*structs.ACLToken, error)
func (c *Client) resolveTokenAndACL(secretID string) (*acl.ACL, *structs.ACLToken, error) {
// Fast-path if ACLs are disabled
if !c.config.ACLEnabled {
if !c.GetConfig().ACLEnabled {
return nil, nil, nil
}
defer metrics.MeasureSince([]string{"client", "acl", "resolve_token"}, time.Now())
......@@ -127,7 +127,7 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
raw, ok := c.tokenCache.Get(secretID)
if ok {
cached := raw.(*cachedACLValue)
if cached.Age() <= c.config.ACLTokenTTL {
if cached.Age() <= c.GetConfig().ACLTokenTTL {
return cached.Token, nil
}
}
......@@ -179,7 +179,7 @@ func (c *Client) resolvePolicies(secretID string, policies []string) ([]*structs
// Check if the cached value is valid or expired
cached := raw.(*cachedACLValue)
if cached.Age() <= c.config.ACLPolicyTTL {
if cached.Age() <= c.GetConfig().ACLPolicyTTL {
out = append(out, cached.Policy)
} else {
expired = append(expired, cached.Policy)
......
......@@ -42,7 +42,7 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr
}
// If ACLs are disabled, EnableDebug must be enabled
if aclObj == nil && !a.c.config.EnableDebug {
if aclObj == nil && !a.c.GetConfig().EnableDebug {
return structs.ErrPermissionDenied
}
......@@ -218,7 +218,7 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo
return err
}
if (aclObj != nil && !aclObj.AllowAgentRead()) ||
(aclObj == nil && !a.c.config.EnableDebug) {
(aclObj == nil && !a.c.GetConfig().EnableDebug) {
return structs.ErrPermissionDenied
}
......
This diff is collapsed.
......@@ -192,63 +192,55 @@ func TestClient_Fingerprint_Periodic(t *testing.T) {
})
defer cleanup()
node := c1.config.Node
{
// Ensure the mock driver is registered on the client
testutil.WaitForResult(func() (bool, error) {
c1.configLock.Lock()
defer c1.configLock.Unlock()
// Ensure the mock driver is registered on the client
testutil.WaitForResult(func() (bool, error) {
node := c1.Node()
// assert that the driver is set on the node attributes
mockDriverInfoAttr := node.Attributes["driver.mock_driver"]
if mockDriverInfoAttr == "" {
return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes")
}
// assert that the driver is set on the node attributes
mockDriverInfoAttr := node.Attributes["driver.mock_driver"]
if mockDriverInfoAttr == "" {
return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes")
}
mockDriverInfo := node.Drivers["mock_driver"]
mockDriverInfo := node.Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if !mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should be set as detected")
}
if !mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if !mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should be set as detected")
}
if !mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
{
testutil.WaitForResult(func() (bool, error) {
c1.configLock.Lock()
defer c1.configLock.Unlock()
mockDriverInfo := node.Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should not be set as detected")
}
if mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should not be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
testutil.WaitForResult(func() (bool, error) {
mockDriverInfo := c1.Node().Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should not be set as detected")
}
if mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should not be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// TestClient_MixedTLS asserts that when a server is running with TLS enabled
......@@ -1117,17 +1109,18 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
})
// initial check
conf := client.GetConfig()
expectedResources := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
Disk: client.configCopy.Node.NodeResources.Disk,
Networks: conf.Node.NodeResources.Networks,
NodeNetworks: conf.Node.NodeResources.NodeNetworks,
Disk: conf.Node.NodeResources.Disk,
// injected
Cpu: structs.NodeCpuResources{
CpuShares: 123,
ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: client.configCopy.Node.NodeResources.Cpu.TotalCpuCores,
ReservableCpuCores: conf.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: conf.Node.NodeResources.Cpu.TotalCpuCores,
},
Memory: structs.NodeMemoryResources{MemoryMB: 1024},
Devices: []*structs.NodeDeviceResource{
......@@ -1138,7 +1131,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
}
assert.EqualValues(t, expectedResources, client.configCopy.Node.NodeResources)
assert.EqualValues(t, expectedResources, conf.Node.NodeResources)
// overrides of values
......@@ -1159,17 +1152,19 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
})
conf = client.GetConfig()
expectedResources2 := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
Disk: client.configCopy.Node.NodeResources.Disk,
Networks: conf.Node.NodeResources.Networks,
NodeNetworks: conf.Node.NodeResources.NodeNetworks,
Disk: conf.Node.NodeResources.Disk,
// injected
Cpu: structs.NodeCpuResources{
CpuShares: 123,
ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: client.configCopy.Node.NodeResources.Cpu.TotalCpuCores,
ReservableCpuCores: conf.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: conf.Node.NodeResources.Cpu.TotalCpuCores,
},
Memory: structs.NodeMemoryResources{MemoryMB: 2048},
Devices: []*structs.NodeDeviceResource{
......@@ -1184,7 +1179,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
}
assert.EqualValues(t, expectedResources2, client.configCopy.Node.NodeResources)
assert.EqualValues(t, expectedResources2, conf.Node.NodeResources)
}
......
......@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul-template/config"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/command/agent/host"
"golang.org/x/exp/slices"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/state"
......@@ -698,8 +699,11 @@ func (rc *RetryConfig) ToConsulTemplate() (*config.RetryConfig, error) {
}
func (c *Config) Copy() *Config {
nc := new(Config)
*nc = *c
if c == nil {
return nil
}
nc := *c
nc.Node = nc.Node.Copy()
nc.Servers = helper.CopySliceString(nc.Servers)
nc.Options = helper.CopyMapStringString(nc.Options)
......@@ -707,12 +711,9 @@ func (c *Config) Copy() *Config {
nc.ConsulConfig = c.ConsulConfig.Copy()
nc.VaultConfig = c.VaultConfig.Copy()
nc.TemplateConfig = c.TemplateConfig.Copy()
if c.ReservableCores != nil {
nc.ReservableCores = make([]uint16, len(c.ReservableCores))
copy(nc.ReservableCores, c.ReservableCores)
}
nc.ReservableCores = slices.Clone(c.ReservableCores)
nc.Artifact = c.Artifact.Copy()
return nc
return &nc
}
// DefaultConfig returns the default configuration
......
......@@ -22,10 +22,11 @@ func TestDriverManager_Fingerprint_Run(t *testing.T) {
testClient, cleanup := TestClient(t, nil)
defer cleanup()
conf := testClient.GetConfig()
dm := drivermanager.New(&drivermanager.Config{
Logger: testClient.logger,
Loader: testClient.config.PluginSingletonLoader,
PluginConfig: testClient.configCopy.NomadPluginConfig(),
Loader: conf.PluginSingletonLoader,
PluginConfig: conf.NomadPluginConfig(),
Updater: testClient.updateNodeFromDriver,
EventHandlerFactory: testClient.GetTaskEventHandler,
State: testClient.stateDB,
......@@ -35,7 +36,7 @@ func TestDriverManager_Fingerprint_Run(t *testing.T) {
defer dm.Shutdown()
testutil.WaitForResult(func() (bool, error) {
node := testClient.configCopy.Node
node := testClient.Node()
d, ok := node.Drivers["mock_driver"]
if !ok {
......@@ -73,10 +74,11 @@ func TestDriverManager_Fingerprint_Periodic(t *testing.T) {
})
defer cleanup()
conf := testClient.GetConfig()
dm := drivermanager.New(&drivermanager.Config{
Logger: testClient.logger,
Loader: testClient.config.PluginSingletonLoader,
PluginConfig: testClient.configCopy.NomadPluginConfig(),
Loader: conf.PluginSingletonLoader,
PluginConfig: conf.NomadPluginConfig(),
Updater: testClient.updateNodeFromDriver,
EventHandlerFactory: testClient.GetTaskEventHandler,
State: testClient.stateDB,
......@@ -134,10 +136,11 @@ func TestDriverManager_NodeAttributes_Run(t *testing.T) {
})
defer cleanup()
conf := testClient.GetConfig()
dm := drivermanager.New(&drivermanager.Config{
Logger: testClient.logger,
Loader: testClient.config.PluginSingletonLoader,
PluginConfig: testClient.configCopy.NomadPluginConfig(),
Loader: conf.PluginSingletonLoader,
PluginConfig: conf.NomadPluginConfig(),
Updater: testClient.updateNodeFromDriver,
EventHandlerFactory: testClient.GetTaskEventHandler,
State: testClient.stateDB,
......
......@@ -41,18 +41,20 @@ SEND_BATCH:
c.configLock.Lock()
defer c.configLock.Unlock()
newConfig := c.config.Copy()
// csi updates
var csiChanged bool
c.batchNodeUpdates.batchCSIUpdates(func(name string, info *structs.CSIInfo) {
if c.updateNodeFromCSIControllerLocked(name, info) {
if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSIControllerLocked(name, info, newConfig.Node) {
if newConfig.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
}
csiChanged = true
}
if c.updateNodeFromCSINodeLocked(name, info) {
if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() {
c.config.Node.CSINodePlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSINodeLocked(name, info, newConfig.Node) {
if newConfig.Node.CSINodePlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSINodePlugins[name].UpdateTime = time.Now()
}
csiChanged = true
}
......@@ -61,10 +63,10 @@ SEND_BATCH:
// driver node updates
var driverChanged bool
c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) {
if c.updateNodeFromDriverLocked(driver, info) {
c.config.Node.Drivers[driver] = info
if c.config.Node.Drivers[driver].UpdateTime.IsZero() {
c.config.Node.Drivers[driver].UpdateTime = time.Now()
if c.applyNodeUpdatesFromDriver(driver, info, newConfig.Node) {
newConfig.Node.Drivers[driver] = info
if newConfig.Node.Drivers[driver].UpdateTime.IsZero() {
newConfig.Node.Drivers[driver].UpdateTime = time.Now()
}
driverChanged = true
}
......@@ -80,7 +82,8 @@ SEND_BATCH:
// only update the node if changes occurred
if driverChanged || devicesChanged || csiChanged {
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
close(c.fpInitialized)
......@@ -92,24 +95,27 @@ func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) {
c.configLock.Lock()
defer c.configLock.Unlock()
newConfig := c.config.Copy()
changed := false
if c.updateNodeFromCSIControllerLocked(name, info) {
if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSIControllerLocked(name, info, newConfig.Node) {
if newConfig.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
}
changed = true
}
if c.updateNodeFromCSINodeLocked(name, info) {
if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() {
c.config.Node.CSINodePlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSINodeLocked(name, info, newConfig.Node) {
if newConfig.Node.CSINodePlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSINodePlugins[name].UpdateTime = time.Now()
}
changed = true
}
if changed {
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
}
......@@ -119,7 +125,7 @@ func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) {
//
// It is safe to call for all CSI Updates, but will only perform changes when
// a ControllerInfo field is present.
func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CSIInfo) bool {
func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CSIInfo, node *structs.Node) bool {
var changed bool
if info.ControllerInfo == nil {
return false
......@@ -127,15 +133,15 @@ func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CS
i := info.Copy()
i.NodeInfo = nil
oldController, hadController := c.config.Node.CSIControllerPlugins[name]
oldController, hadController := node.CSIControllerPlugins[name]
if !hadController {
// If the controller info has not yet been set, do that here
changed = true
c.config.Node.CSIControllerPlugins[name] = i
node.CSIControllerPlugins[name] = i
} else {
// The controller info has already been set, fix it up
if !oldController.Equal(i) {
c.config.Node.CSIControllerPlugins[name] = i
node.CSIControllerPlugins[name] = i
changed = true
}
......@@ -162,7 +168,7 @@ func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CS
//
// It is safe to call for all CSI Updates, but will only perform changes when
// a NodeInfo field is present.
func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo) bool {
func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo, node *structs.Node) bool {
var changed bool
if info.NodeInfo == nil {
return false
......@@ -170,15 +176,15 @@ func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo)
i := info.Copy()
i.ControllerInfo = nil
oldNode, hadNode := c.config.Node.CSINodePlugins[name]
oldNode, hadNode := node.CSINodePlugins[name]
if !hadNode {
// If the Node info has not yet been set, do that here
changed = true
c.config.Node.CSINodePlugins[name] = i
node.CSINodePlugins[name] = i
} else {
// The node info has already been set, fix it up
if !oldNode.Equal(info) {
c.config.Node.CSINodePlugins[name] = i
node.CSINodePlugins[name] = i
changed = true
}
......@@ -205,30 +211,33 @@ func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) {
c.configLock.Lock()
defer c.configLock.Unlock()
if c.updateNodeFromDriverLocked(name, info) {
c.config.Node.Drivers[name] = info
if c.config.Node.Drivers[name].UpdateTime.IsZero() {
c.config.Node.Drivers[name].UpdateTime = time.Now()
newConfig := c.config.Copy()
if c.applyNodeUpdatesFromDriver(name, info, newConfig.Node) {
newConfig.Node.Drivers[name] = info
if newConfig.Node.Drivers[name].UpdateTime.IsZero() {
newConfig.Node.Drivers[name].UpdateTime = time.Now()
}
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
}
// updateNodeFromDriverLocked makes the changes to the node from a driver update
// but does not send the update to the server. c.configLock must be held before
// calling this func
func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool {
// applyNodeUpdatesFromDriver applies changes to the passed in node. true is
// returned if the node has changed.
func (c *Client) applyNodeUpdatesFromDriver(name string, info *structs.DriverInfo, node *structs.Node) bool {
var hasChanged bool
hadDriver := c.config.Node.Drivers[name] != nil
hadDriver := node.Drivers[name] != nil
if !hadDriver {
// If the driver info has not yet been set, do that here
hasChanged = true
for attrName, newVal := range info.Attributes {
c.config.Node.Attributes[attrName] = newVal
node.Attributes[attrName] = newVal
}
} else {
oldVal := c.config.Node.Drivers[name]
oldVal := node.Drivers[name]
// The driver info has already been set, fix it up
if oldVal.Detected != info.Detected {
hasChanged = true
......@@ -247,16 +256,16 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
}
for attrName, newVal := range info.Attributes {
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
oldVal := node.Drivers[name].Attributes[attrName]
if oldVal == newVal {
continue
}
hasChanged = true
if newVal == "" {
delete(c.config.Node.Attributes, attrName)
delete(node.Attributes, attrName)
} else {
c.config.Node.Attributes[attrName] = newVal
node.Attributes[attrName] = newVal
}
}
}
......@@ -266,16 +275,14 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
// their attributes as DriverInfo
driverName := fmt.Sprintf("driver.%s", name)
if info.Detected {
c.config.Node.Attributes[driverName] = "1"
node.Attributes[driverName] = "1"
} else {
delete(c.config.Node.Attributes, driverName)
delete(node.Attributes, driverName)
}
return hasChanged
}
// updateNodeFromFingerprint updates the node with the result of
// fingerprinting the node from the diff that was created
func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
c.configLock.Lock()
defer c.configLock.Unlock()
......@@ -284,7 +291,7 @@ func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
// dispatched task resources and not appropriate for expressing
// node available device resources
if c.updateNodeFromDevicesLocked(devices) {
c.updateNodeLocked()
c.updateNode()
}
}
......@@ -294,7 +301,9 @@ func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool {
if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) {
c.logger.Debug("new devices detected", "devices", len(devices))
c.config.Node.NodeResources.Devices = devices
newConfig := c.config.Copy()
newConfig.Node.NodeResources.Devices = devices
c.config = newConfig
return true
}
......
......@@ -47,9 +47,11 @@ func (c *Client) StreamingRpcHandler(method string) (structs.StreamingRpcHandler
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
conf := c.GetConfig()
// Invoke the RPCHandler if it exists
if c.config.RPCHandler != nil {
return c.config.RPCHandler.RPC(method, args, reply)
if conf.RPCHandler != nil {
return conf.RPCHandler.RPC(method, args, reply)
}
// We will try to automatically retry requests that fail due to things like server unavailability
......@@ -60,7 +62,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// to the leader they may also allow for an RPCHoldTimeout while waiting for leader election.
// That's OK, we won't double up because we are using it here not as a sleep but
// as a hint to give up
deadline = deadline.Add(c.config.RPCHoldTimeout)
deadline = deadline.Add(conf.RPCHoldTimeout)
// If its a blocking query, allow the time specified by the request
if info, ok := args.(structs.RPCInfo); ok {
......@@ -109,7 +111,7 @@ TRY:
}
// Wait to avoid thundering herd
timer, cancel := helper.NewSafeTimer(helper.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction))
timer, cancel := helper.NewSafeTimer(helper.RandomStagger(conf.RPCHoldTimeout / structs.JitterFraction))
defer cancel()
select {
......
......@@ -94,7 +94,6 @@ func TestRPCOnlyClient(t testing.T, srvAddr net.Addr, rpcs map[string]interface{
client := &Client{config: conf, logger: testlog.HCLogger(t)}
client.servers = servers.New(client.logger, client.shutdownCh, client)
client.configCopy = client.config.Copy()
client.rpcServer = rpc.NewServer()
for name, rpc := range rpcs {
......
......@@ -1147,15 +1147,17 @@ func (a *Agent) Reload(newConfig *Config) error {
a.configLock.Lock()
defer a.configLock.Unlock()
updatedLogging := newConfig != nil && (newConfig.LogLevel != a.config.LogLevel)
current := a.config.Copy()
updatedLogging := newConfig != nil && (newConfig.LogLevel != current.LogLevel)
if newConfig == nil || newConfig.TLSConfig == nil && !updatedLogging {
return fmt.Errorf("cannot reload agent with nil configuration")
}
if updatedLogging {
a.config.LogLevel = newConfig.LogLevel
a.logger.SetLevel(log.LevelFromString(newConfig.LogLevel))
current.LogLevel = newConfig.LogLevel
a.logger.SetLevel(log.LevelFromString(current.LogLevel))
}
// Update eventer config
......@@ -1175,10 +1177,10 @@ func (a *Agent) Reload(newConfig *Config) error {
// Completely reload the agent's TLS configuration (moving from non-TLS to
// TLS, or vice versa)
// This does not handle errors in loading the new TLS configuration
a.config.TLSConfig = newConfig.TLSConfig.Copy()
current.TLSConfig = newConfig.TLSConfig.Copy()
}
if !a.config.TLSConfig.IsEmpty() && !newConfig.TLSConfig.IsEmpty() {
if !current.TLSConfig.IsEmpty() && !newConfig.TLSConfig.IsEmpty() {
// This is just a TLS configuration reload, we don't need to refresh
// existing network connections
......@@ -1187,26 +1189,31 @@ func (a *Agent) Reload(newConfig *Config) error {
// as this allows us to dynamically reload configurations not only
// on the Agent but on the Server and Client too (they are
// referencing the same keyloader).
keyloader := a.config.TLSConfig.GetKeyLoader()
keyloader := current.TLSConfig.GetKeyLoader()
_, err := keyloader.LoadKeyPair(newConfig.TLSConfig.CertFile, newConfig.TLSConfig.KeyFile)
if err != nil {
return err
}
a.config.TLSConfig = newConfig.TLSConfig
a.config.TLSConfig.KeyLoader = keyloader
current.TLSConfig = newConfig.TLSConfig
current.TLSConfig.KeyLoader = keyloader
a.config = current
return nil
} else if newConfig.TLSConfig.IsEmpty() && !a.config.TLSConfig.IsEmpty() {
} else if newConfig.TLSConfig.IsEmpty() && !current.TLSConfig.IsEmpty() {
a.logger.Warn("downgrading agent's existing TLS configuration to plaintext")
fullUpdateTLSConfig()
} else if !newConfig.TLSConfig.IsEmpty() && a.config.TLSConfig.IsEmpty() {
} else if !newConfig.TLSConfig.IsEmpty() && current.TLSConfig.IsEmpty() {
a.logger.Info("upgrading from plaintext configuration to TLS")
fullUpdateTLSConfig()
}
// Set agent config to the updated config
a.config = current
return nil
}
// GetConfig creates a locked reference to the agent's config
// GetConfig returns the current agent configuration. The Config should *not*
// be mutated directly. First call Config.Copy.
func (a *Agent) GetConfig() *Config {
a.configLock.Lock()
defer a.configLock.Unlock()
......
......@@ -23,7 +23,6 @@ import (
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
)
type Member struct {
......@@ -81,11 +80,8 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques
Member: nomadMember(member),
Stats: s.agent.Stats(),
}
if ac, err := copystructure.Copy(s.agent.config); err != nil {
return nil, CodedError(500, err.Error())
} else {
self.Config = ac.(*Config)
}
self.Config = s.agent.GetConfig().Copy()
if self.Config != nil && self.Config.Vault != nil && self.Config.Vault.Token != "" {
self.Config.Vault.Token = "<redacted>"
......
......@@ -817,9 +817,9 @@ func TestServer_Reload_TLS_Shared_Keyloader(t *testing.T) {
}
assert.Nil(agent.Reload(newConfig))
assert.Equal(agent.Config.TLSConfig.CertFile, newConfig.TLSConfig.CertFile)
assert.Equal(agent.Config.TLSConfig.KeyFile, newConfig.TLSConfig.KeyFile)
assert.Equal(agent.Config.TLSConfig.GetKeyLoader(), originalKeyloader)
assert.Equal(agent.Agent.config.TLSConfig.CertFile, newConfig.TLSConfig.CertFile)
assert.Equal(agent.Agent.config.TLSConfig.KeyFile, newConfig.TLSConfig.KeyFile)
assert.Equal(agent.Agent.config.TLSConfig.GetKeyLoader(), originalKeyloader)
// Assert is passed through on the server correctly
if assert.NotNil(agent.server.GetConfig().TLSConfig) {
......@@ -1055,7 +1055,7 @@ func TestServer_Reload_TLS_DowngradeFromTLS(t *testing.T) {
err := agent.Reload(newConfig)
assert.Nil(err)
assert.True(agentConfig.TLSConfig.IsEmpty())
assert.True(agent.config.TLSConfig.IsEmpty())
}
func TestServer_Reload_VaultConfig(t *testing.T) {
......
......@@ -1008,7 +1008,7 @@ func (c *Command) handleReload() {
}
}
if s := c.agent.Client(); s != nil {
if client := c.agent.Client(); client != nil {
c.agent.logger.Debug("starting reload of client config")
clientConfig, err := convertClientConfig(newConf)
if err != nil {
......@@ -1022,7 +1022,7 @@ func (c *Command) handleReload() {
return
}
if err := c.agent.Client().Reload(clientConfig); err != nil {
if err := client.Reload(clientConfig); err != nil {
c.agent.logger.Error("reloading client config failed", "error", err)
return
}
......
......@@ -21,11 +21,13 @@ import (
"github.com/hashicorp/go-sockaddr/template"
client "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/version"
"golang.org/x/exp/slices"
)
// Config is the configuration for the Nomad agent.
......@@ -332,6 +334,28 @@ type ClientConfig struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (c *ClientConfig) Copy() *ClientConfig {
if c == nil {
return c
}
nc := *c
nc.Servers = slices.Clone(c.Servers)
nc.Options = helper.CopyMap(c.Options)
nc.Meta = helper.CopyMap(c.Meta)
nc.ChrootEnv = helper.CopyMap(c.ChrootEnv)
nc.Reserved = c.Reserved.Copy()
nc.NoHostUUID = pointer.Copy(c.NoHostUUID)
nc.TemplateConfig = c.TemplateConfig.Copy()
nc.ServerJoin = c.ServerJoin.Copy()
nc.HostVolumes = helper.CopySlice(c.HostVolumes)
nc.HostNetworks = helper.CopySlice(c.HostNetworks)
nc.NomadServiceDiscovery = pointer.Copy(c.NomadServiceDiscovery)
nc.Artifact = c.Artifact.Copy()
nc.ExtraKeysHCL = slices.Clone(c.ExtraKeysHCL)
return &nc
}
// ACLConfig is configuration specific to the ACL system
type ACLConfig struct {
// Enabled controls if we are enforce and manage ACLs
......@@ -360,6 +384,16 @@ type ACLConfig struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (a *ACLConfig) Copy() *ACLConfig {
if a == nil {
return nil
}
na := *a
na.ExtraKeysHCL = slices.Clone(a.ExtraKeysHCL)
return &na
}
// ServerConfig is configuration specific to the server mode
type ServerConfig struct {
// Enabled controls if we are a server
......@@ -542,6 +576,29 @@ type ServerConfig struct {
RaftBoltConfig *RaftBoltConfig `hcl:"raft_boltdb"`
}
func (s *ServerConfig) Copy() *ServerConfig {
if s == nil {
return nil
}
ns := *s
ns.RaftMultiplier = pointer.Copy(s.RaftMultiplier)
ns.NumSchedulers = pointer.Copy(s.NumSchedulers)
ns.EnabledSchedulers = slices.Clone(s.EnabledSchedulers)
ns.StartJoin = slices.Clone(s.StartJoin)
ns.RetryJoin = slices.Clone(s.RetryJoin)
ns.ServerJoin = s.ServerJoin.Copy()
ns.DefaultSchedulerConfig = s.DefaultSchedulerConfig.Copy()
ns.PlanRejectionTracker = s.PlanRejectionTracker.Copy()
ns.EnableEventBroker = pointer.Copy(s.EnableEventBroker)
ns.EventBufferSize = pointer.Copy(s.EventBufferSize)
ns.licenseAdditionalPublicKeys = slices.Clone(s.licenseAdditionalPublicKeys)
ns.ExtraKeysHCL = slices.Clone(s.ExtraKeysHCL)
ns.Search = s.Search.Copy()
ns.RaftBoltConfig = s.RaftBoltConfig.Copy()
return &ns
}
// RaftBoltConfig is used in servers to configure parameters of the boltdb
// used for raft consensus.
type RaftBoltConfig struct {
......@@ -553,6 +610,15 @@ type RaftBoltConfig struct {
NoFreelistSync bool `hcl:"no_freelist_sync"`
}
func (r *RaftBoltConfig) Copy() *RaftBoltConfig {
if r == nil {
return nil
}
nr := *r
return &nr
}
// PlanRejectionTracker is used in servers to configure the plan rejection
// tracker.
type PlanRejectionTracker struct {
......@@ -572,6 +638,17 @@ type PlanRejectionTracker struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (p *PlanRejectionTracker) Copy() *PlanRejectionTracker {
if p == nil {
return nil
}
np := *p
np.Enabled = pointer.Copy(p.Enabled)
np.ExtraKeysHCL = slices.Clone(p.ExtraKeysHCL)
return &np
}
func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTracker {
if p == nil {
return b
......@@ -636,6 +713,15 @@ type Search struct {
MinTermLength int `hcl:"min_term_length"`
}
func (s *Search) Copy() *Search {
if s == nil {
return nil
}
ns := *s
return &ns
}
// ServerJoin is used in both clients and servers to bootstrap connections to
// servers
type ServerJoin struct {
......@@ -663,6 +749,18 @@ type ServerJoin struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (s *ServerJoin) Copy() *ServerJoin {
if s == nil {
return nil
}
ns := *s
ns.StartJoin = slices.Clone(s.StartJoin)
ns.RetryJoin = slices.Clone(s.RetryJoin)
ns.ExtraKeysHCL = slices.Clone(s.ExtraKeysHCL)
return &ns
}
func (s *ServerJoin) Merge(b *ServerJoin) *ServerJoin {
if s == nil {
return b
......@@ -797,6 +895,19 @@ type Telemetry struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (t *Telemetry) Copy() *Telemetry {
if t == nil {
return nil
}
nt := *t
nt.DataDogTags = slices.Clone(t.DataDogTags)
nt.PrefixFilter = slices.Clone(t.PrefixFilter)
nt.FilterDefault = pointer.Copy(t.FilterDefault)
nt.ExtraKeysHCL = slices.Clone(t.ExtraKeysHCL)
return &nt
}
// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (a *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range a.PrefixFilter {
......@@ -825,6 +936,16 @@ type Ports struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (p *Ports) Copy() *Ports {
if p == nil {
return nil
}
np := *p
np.ExtraKeysHCL = slices.Clone(p.ExtraKeysHCL)
return &np
}
// Addresses encapsulates all of the addresses we bind to for various
// network services. Everything is optional and defaults to BindAddr.
type Addresses struct {
......@@ -835,6 +956,16 @@ type Addresses struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (a *Addresses) Copy() *Addresses {
if a == nil {
return nil
}
na := *a
na.ExtraKeysHCL = slices.Clone(a.ExtraKeysHCL)
return &na
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
......@@ -844,6 +975,16 @@ type NormalizedAddrs struct {
Serf string
}
func (n *NormalizedAddrs) Copy() *NormalizedAddrs {
if n == nil {
return nil
}
nn := *n
nn.HTTP = slices.Clone(n.HTTP)
return &nn
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
......@@ -855,6 +996,16 @@ type AdvertiseAddrs struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (a *AdvertiseAddrs) Copy() *AdvertiseAddrs {
if a == nil {
return nil
}
na := *a
na.ExtraKeysHCL = slices.Clone(a.ExtraKeysHCL)
return &na
}
type Resources struct {
CPU int `hcl:"cpu"`
MemoryMB int `hcl:"memory"`
......@@ -865,6 +1016,16 @@ type Resources struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (r *Resources) Copy() *Resources {
if r == nil {
return nil
}
nr := *r
nr.ExtraKeysHCL = slices.Clone(r.ExtraKeysHCL)
return &nr
}
// devModeConfig holds the config for the -dev and -dev-connect flags
type devModeConfig struct {
// mode flags are set at the command line via -dev and -dev-connect
......@@ -1297,6 +1458,42 @@ func (c *Config) Merge(b *Config) *Config {
return &result
}
// Copy returns a deep copy safe for mutation.
func (c *Config) Copy() *Config {
if c == nil {
return nil
}
nc := *c
nc.Ports = c.Ports.Copy()
nc.Addresses = c.Addresses.Copy()
nc.normalizedAddrs = c.normalizedAddrs.Copy()
nc.AdvertiseAddrs = c.AdvertiseAddrs.Copy()
nc.Client = c.Client.Copy()
nc.Server = c.Server.Copy()
nc.ACL = c.ACL.Copy()
nc.Telemetry = c.Telemetry.Copy()
nc.DisableUpdateCheck = pointer.Copy(c.DisableUpdateCheck)
nc.Consul = c.Consul.Copy()
nc.Vault = c.Vault.Copy()
nc.UI = c.UI.Copy()
nc.NomadConfig = c.NomadConfig.Copy()
nc.ClientConfig = c.ClientConfig.Copy()
nc.Version = c.Version.Copy()
nc.Files = slices.Clone(c.Files)
nc.TLSConfig = c.TLSConfig.Copy()
nc.HTTPAPIResponseHeaders = helper.CopyMap(c.HTTPAPIResponseHeaders)
nc.Sentinel = c.Sentinel.Copy()
nc.Autopilot = c.Autopilot.Copy()
nc.Plugins = helper.CopySlice(c.Plugins)
nc.Limits = c.Limits.Copy()
nc.Audit = c.Audit.Copy()
nc.ExtraKeysHCL = slices.Clone(c.ExtraKeysHCL)
return &nc
}
// normalizeAddrs normalizes Addresses and AdvertiseAddrs to always be
// initialized and have reasonable defaults.
func (c *Config) normalizeAddrs() error {
......
......@@ -935,7 +935,7 @@ func TestHTTP_VerifyHTTPSClient_AfterConfigReload(t *testing.T) {
assert.Nil(err)
resp, err := client.Do(req)
if assert.Nil(err) {
if assert.NoError(err) {
resp.Body.Close()
assert.Equal(resp.StatusCode, 200)
}
......
......@@ -79,7 +79,6 @@ func TestHTTP_PrefixJobsList(t *testing.T) {
"aabbbbbb-e8f7-fd38-c855-ab94ceb89706",
"aabbcccc-e8f7-fd38-c855-ab94ceb89706",
}
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
for i := 0; i < 3; i++ {
// Create the job
......@@ -3726,7 +3725,7 @@ func TestConversion_apiConnectSidecarServiceProxyToStructs(t *testing.T) {
require.Equal(t, &structs.ConsulProxy{
LocalServiceAddress: "192.168.30.1",
LocalServicePort: 9000,
Config: nil,
Config: map[string]any{},
Upstreams: []structs.ConsulUpstream{{
DestinationName: "upstream",
}},
......
......@@ -33,6 +33,10 @@ var invalidFilenameNonASCII = regexp.MustCompile(`[[:^ascii:]/\\<>:"|?*]`)
// invalidFilenameStrict = invalidFilename plus additional punctuation
var invalidFilenameStrict = regexp.MustCompile(`[/\\<>:"|?*$()+=[\];#@~,&']`)
type Copyable[T any] interface {
Copy() T
}
// IsUUID returns true if the given string is a valid UUID.
func IsUUID(str string) bool {
const uuidLen = 36
......@@ -252,9 +256,9 @@ func CompareMapStringString(a, b map[string]string) bool {
// CopyMap creates a copy of m. Struct values are not deep copies.
//
// If m is nil or contains no elements, the return value is nil.
// If m is nil the return value is nil.
func CopyMap[M ~map[K]V, K comparable, V any](m M) M {
if len(m) == 0 {
if m == nil {
return nil
}
......@@ -265,16 +269,44 @@ func CopyMap[M ~map[K]V, K comparable, V any](m M) M {
return result
}
// DeepCopyMap creates a copy of m by calling Copy() on each value.
//
// If m is nil the return value is nil.
func DeepCopyMap[M ~map[K]V, K comparable, V Copyable[V]](m M) M {
if m == nil {
return nil
}
result := make(M, len(m))
for k, v := range m {
result[k] = v.Copy()
}
return result
}
// CopySlice creates a deep copy of s. For slices with elements that do not
// implement Copy(), use slices.Clone.
func CopySlice[S ~[]E, E Copyable[E]](s S) S {
if s == nil {
return nil
}
result := make(S, len(s))
for i, v := range s {
result[i] = v.Copy()
}
return result
}
// CopyMapStringString creates a copy of m.
//
// Deprecated; use CopyMap instead.
func CopyMapStringString(m map[string]string) map[string]string {
l := len(m)
if l == 0 {
if m == nil {
return nil
}
c := make(map[string]string, l)
c := make(map[string]string, len(m))
for k, v := range m {
c[k] = v
}
......@@ -285,12 +317,11 @@ func CopyMapStringString(m map[string]string) map[string]string {
//
// Deprecated; use CopyMap instead.
func CopyMapStringStruct(m map[string]struct{}) map[string]struct{} {
l := len(m)
if l == 0 {
if m == nil {
return nil
}
c := make(map[string]struct{}, l)
c := make(map[string]struct{}, len(m))
for k := range m {
c[k] = struct{}{}
}
......@@ -301,12 +332,11 @@ func CopyMapStringStruct(m map[string]struct{}) map[string]struct{} {
//
// Deprecated; use CopyMap instead.
func CopyMapStringInterface(m map[string]interface{}) map[string]interface{} {
l := len(m)
if l == 0 {
if m == nil {
return nil
}
c := make(map[string]interface{}, l)
c := make(map[string]interface{}, len(m))
for k, v := range m {
c[k] = v
}
......
......@@ -66,7 +66,7 @@ func Test_CopyMap(t *testing.T) {
t.Run("empty", func(t *testing.T) {
m := make(map[string]int, 10)
result := CopyMap(m)
must.Nil(t, result)
must.MapEq(t, map[string]int{}, result)
})
t.Run("elements", func(t *testing.T) {
......
......@@ -541,8 +541,9 @@ func setupLocal(t *testing.T) (rpc.ClientCodec, func()) {
require.NoError(t, err, "could not setup test client")
}
node1 := c1.Node()
node1.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
node1 := c1.UpdateConfig(func(c *config.Config) {
c.Node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
}).Node
req := &structs.NodeRegisterRequest{
Node: node1,
......@@ -568,7 +569,9 @@ func setupLocal(t *testing.T) (rpc.ClientCodec, func()) {
}
// update w/ plugin
node1.CSIControllerPlugins = plugins
node1 = c1.UpdateConfig(func(c *config.Config) {
c.Node.CSIControllerPlugins = plugins
}).Node
s1.fsm.state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)
cleanup := func() {
......
......@@ -8,9 +8,11 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
"golang.org/x/exp/slices"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -362,6 +364,36 @@ type Config struct {
DeploymentQueryRateLimit float64
}
func (c *Config) Copy() *Config {
if c == nil {
return nil
}
nc := *c
// Can't deep copy interfaces
// LogOutput io.Writer
// Logger log.InterceptLogger
// PluginLoader loader.PluginCatalog
// PluginSingletonLoader loader.PluginCatalog
nc.RPCAddr = pointer.Copy(c.RPCAddr)
nc.ClientRPCAdvertise = pointer.Copy(c.ClientRPCAdvertise)
nc.ServerRPCAdvertise = pointer.Copy(c.ServerRPCAdvertise)
nc.RaftConfig = pointer.Copy(c.RaftConfig)
nc.SerfConfig = pointer.Copy(c.SerfConfig)
nc.EnabledSchedulers = slices.Clone(c.EnabledSchedulers)
nc.ConsulConfig = c.ConsulConfig.Copy()
nc.VaultConfig = c.VaultConfig.Copy()
nc.TLSConfig = c.TLSConfig.Copy()
nc.SentinelConfig = c.SentinelConfig.Copy()
nc.AutopilotConfig = c.AutopilotConfig.Copy()
nc.LicenseConfig = c.LicenseConfig.Copy()
nc.SearchConfig = c.SearchConfig.Copy()
return &nc
}
// DefaultConfig returns the default configuration. Only used as the basis for
// merging agent or test parameters.
func DefaultConfig() *Config {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment