Commit e9f6769f authored by Alex Dadgar's avatar Alex Dadgar Committed by GitHub
Browse files

Merge branch 'master' into feature/2334

parents 20fd19d6 633ec0c3
Showing with 360 additions and 118 deletions
+360 -118
......@@ -11,6 +11,7 @@ IMPROVEMENTS:
* api/job: Ability to revert job to older versions [GH-2575]
* client: Environment variables for client DC and Region [GH-2507]
* client: Hash host ID so its stable and well distributed [GH-2541]
* client: GC dead allocs if total allocs > `gc_max_allocs` tunable [GH-2636]
* client: Persist state using bolt-db and more efficient write patterns
[GH-2610]
* client: Fingerprint all routable addresses on an interface including IPv6
......
......@@ -323,6 +323,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
upd, ar := testAllocRunnerFromAlloc(alloc, false)
go ar.Run()
defer ar.Destroy()
// Snapshot state
testutil.WaitForResult(func() (bool, error) {
......@@ -390,6 +391,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["run_for"] = "10s"
go ar.Run()
defer ar.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
......@@ -436,8 +438,9 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
ar2.logger.Println("[TESTING] running second alloc runner")
go ar2.Run()
ar2.logger.Println("[TESTING] starting second alloc runner")
defer ar2.Destroy() // Just-in-case of failure before Destroy below
testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
......@@ -516,6 +519,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
origConfig := ar.config.Copy()
ar.config.Version = "0.5.6"
go ar.Run()
defer ar.Destroy()
// Snapshot state
testutil.WaitForResult(func() (bool, error) {
......@@ -544,6 +548,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
t.Fatalf("err: %v", err)
}
go ar2.Run()
defer ar2.Destroy() // Just-in-case of failure before Destroy below
testutil.WaitForResult(func() (bool, error) {
if len(ar2.tasks) != 1 {
......@@ -736,6 +741,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
go ar.Run()
defer ar.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
......@@ -862,6 +868,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
}
upd, ar := testAllocRunnerFromAlloc(alloc, false)
go ar.Run()
defer ar.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
......@@ -893,6 +900,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
upd1, ar1 := testAllocRunnerFromAlloc(alloc1, false)
ar1.SetPreviousAllocDir(ar.allocDir)
go ar1.Run()
defer ar1.Destroy()
testutil.WaitForResult(func() (bool, error) {
if upd1.Count == 0 {
......
......@@ -137,7 +137,7 @@ type Client struct {
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex
migratingAllocsLock sync.RWMutex
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
......@@ -240,13 +240,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Add the garbage collector
gcConfig := &GCConfig{
MaxAllocs: cfg.GCMaxAllocs,
DiskUsageThreshold: cfg.GCDiskUsageThreshold,
InodeUsageThreshold: cfg.GCInodeUsageThreshold,
Interval: cfg.GCInterval,
ParallelDestroys: cfg.GCParallelDestroys,
ReservedDiskMB: cfg.Node.Reserved.DiskMB,
}
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, gcConfig)
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, c, gcConfig)
go c.garbageCollector.Run()
// Setup the node
if err := c.setupNode(); err != nil {
......@@ -482,17 +484,13 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
c.allocLock.RLock()
numAllocs := len(c.allocs)
c.allocLock.RUnlock()
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
stats := map[string]map[string]string{
"client": map[string]string{
"node_id": c.Node().ID,
"known_servers": c.servers.all().String(),
"num_allocations": strconv.Itoa(numAllocs),
"num_allocations": strconv.Itoa(c.NumAllocs()),
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
},
......@@ -722,6 +720,24 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}
// NumAllocs returns the number of allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
c.allocLock.RLock()
n := len(c.allocs)
c.allocLock.RUnlock()
c.blockedAllocsLock.RLock()
n += len(c.blockedAllocations)
c.blockedAllocsLock.RUnlock()
c.migratingAllocsLock.RLock()
n += len(c.migratingAllocs)
c.migratingAllocsLock.RUnlock()
return n
}
// nodeID restores, or generates if necessary, a unique node ID and SecretID.
// The node ID is, if available, a persistent unique ID. The secret ID is a
// high-entropy random UUID.
......@@ -1228,25 +1244,31 @@ func (c *Client) updateNodeStatus() error {
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
// If this alloc was blocking another alloc and transitioned to a
// terminal state then start the blocked allocation
c.blockedAllocsLock.Lock()
if blockedAlloc, ok := c.blockedAllocations[alloc.ID]; ok && alloc.Terminated() {
var prevAllocDir *allocdir.AllocDir
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky {
prevAllocDir = ar.GetAllocDir()
if alloc.Terminated() {
c.blockedAllocsLock.Lock()
blockedAlloc, ok := c.blockedAllocations[alloc.ID]
if ok {
var prevAllocDir *allocdir.AllocDir
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky {
prevAllocDir = ar.GetAllocDir()
}
}
delete(c.blockedAllocations, blockedAlloc.PreviousAllocation)
c.blockedAllocsLock.Unlock()
// Need to call addAlloc without holding the lock
if err := c.addAlloc(blockedAlloc, prevAllocDir); err != nil {
c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v",
blockedAlloc.ID, err)
}
} else {
c.blockedAllocsLock.Unlock()
}
if err := c.addAlloc(blockedAlloc, prevAllocDir); err != nil {
c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v",
blockedAlloc.ID, err)
}
delete(c.blockedAllocations, blockedAlloc.PreviousAllocation)
}
c.blockedAllocsLock.Unlock()
// Mark the allocation for GC if it is in terminal state
if alloc.Terminated() {
// Mark the allocation for GC if it is in terminal state
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
if err := c.garbageCollector.MarkForCollection(ar); err != nil {
c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err)
......@@ -1553,9 +1575,9 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
// See if the updated alloc is getting migrated
c.migratingAllocsLock.Lock()
c.migratingAllocsLock.RLock()
ch, ok := c.migratingAllocs[update.updated.ID]
c.migratingAllocsLock.Unlock()
c.migratingAllocsLock.RUnlock()
if ok {
// Stopping the migration if the allocation doesn't need any
// migration
......@@ -2314,13 +2336,13 @@ func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID
// Emit allocation metrics
c.migratingAllocsLock.Lock()
migrating := len(c.migratingAllocs)
c.migratingAllocsLock.Unlock()
c.blockedAllocsLock.Lock()
c.blockedAllocsLock.RLock()
blocked := len(c.blockedAllocations)
c.blockedAllocsLock.Unlock()
c.blockedAllocsLock.RUnlock()
c.migratingAllocsLock.RLock()
migrating := len(c.migratingAllocs)
c.migratingAllocsLock.RUnlock()
pending, running, terminal := 0, 0, 0
for _, ar := range c.getAllocRunners() {
......@@ -2392,17 +2414,17 @@ func (c *Client) allAllocs() map[string]*structs.Allocation {
a := ar.Alloc()
allocs[a.ID] = a
}
c.blockedAllocsLock.Lock()
c.blockedAllocsLock.RLock()
for _, alloc := range c.blockedAllocations {
allocs[alloc.ID] = alloc
}
c.blockedAllocsLock.Unlock()
c.blockedAllocsLock.RUnlock()
c.migratingAllocsLock.Lock()
c.migratingAllocsLock.RLock()
for _, ctrl := range c.migratingAllocs {
allocs[ctrl.alloc.ID] = ctrl.alloc
}
c.migratingAllocsLock.Unlock()
c.migratingAllocsLock.RUnlock()
return allocs
}
......
......@@ -171,6 +171,10 @@ type Config struct {
// beyond which the Nomad client triggers GC of the terminal allocations
GCInodeUsageThreshold float64
// GCMaxAllocs is the maximum number of allocations a node can have
// before garbage collection is triggered.
GCMaxAllocs int
// LogLevel is the level of the logs to putout
LogLevel string
......@@ -205,6 +209,7 @@ func DefaultConfig() *Config {
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
}
}
......
......@@ -220,6 +220,7 @@ func (f *FileRotator) purgeOldFiles() {
var fIndexes []int
files, err := ioutil.ReadDir(f.path)
if err != nil {
f.logger.Printf("[ERROR] driver.rotator: error getting directory listing: %v", err)
return
}
// Inserting all the rotated files in a slice
......@@ -228,6 +229,7 @@ func (f *FileRotator) purgeOldFiles() {
fileIdx := strings.TrimPrefix(fi.Name(), fmt.Sprintf("%s.", f.baseFileName))
n, err := strconv.Atoi(fileIdx)
if err != nil {
f.logger.Printf("[ERROR] driver.rotator: error extracting file index: %v", err)
continue
}
fIndexes = append(fIndexes, n)
......@@ -246,7 +248,10 @@ func (f *FileRotator) purgeOldFiles() {
toDelete := fIndexes[0 : len(fIndexes)-f.MaxFiles]
for _, fIndex := range toDelete {
fname := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, fIndex))
os.RemoveAll(fname)
err := os.RemoveAll(fname)
if err != nil {
f.logger.Printf("[ERROR] driver.rotator: error removing file: %v", err)
}
}
f.oldestLogFileIdx = fIndexes[0]
case <-f.doneCh:
......
......@@ -18,6 +18,9 @@ const (
// GCConfig allows changing the behaviour of the garbage collector
type GCConfig struct {
// MaxAllocs is the maximum number of allocations to track before a GC
// is triggered.
MaxAllocs int
DiskUsageThreshold float64
InodeUsageThreshold float64
Interval time.Duration
......@@ -25,10 +28,17 @@ type GCConfig struct {
ParallelDestroys int
}
// AllocCounter is used by AllocGarbageCollector to discover how many
// allocations a node has and is generally fulfilled by the Client.
type AllocCounter interface {
NumAllocs() int
}
// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
statsCollector stats.NodeStatsCollector
allocCounter AllocCounter
config *GCConfig
logger *log.Logger
destroyCh chan struct{}
......@@ -36,8 +46,9 @@ type AllocGarbageCollector struct {
}
// NewAllocGarbageCollector returns a garbage collector for terminated
// allocations on a node.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, config *GCConfig) *AllocGarbageCollector {
// allocations on a node. Must call Run() in a goroutine enable periodic
// garbage collection.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
......@@ -47,17 +58,18 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
gc := &AllocGarbageCollector{
allocRunners: NewIndexedGCAllocPQ(),
statsCollector: statsCollector,
allocCounter: ac,
config: config,
logger: logger,
destroyCh: make(chan struct{}, config.ParallelDestroys),
shutdownCh: make(chan struct{}),
}
go gc.run()
return gc
}
func (a *AllocGarbageCollector) run() {
// Run the periodic garbage collector.
func (a *AllocGarbageCollector) Run() {
ticker := time.NewTicker(a.config.Interval)
for {
select {
......@@ -100,23 +112,33 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
break
}
if diskStats.UsedPercent <= a.config.DiskUsageThreshold &&
diskStats.InodesUsedPercent <= a.config.InodeUsageThreshold {
reason := ""
switch {
case diskStats.UsedPercent > a.config.DiskUsageThreshold:
reason = fmt.Sprintf("disk usage of %.0f is over gc threshold of %.0f",
diskStats.UsedPercent, a.config.DiskUsageThreshold)
case diskStats.InodesUsedPercent > a.config.InodeUsageThreshold:
reason = fmt.Sprintf("inode usage of %.0f is over gc threshold of %.0f",
diskStats.InodesUsedPercent, a.config.InodeUsageThreshold)
case a.numAllocs() > a.config.MaxAllocs:
reason = fmt.Sprintf("number of allocations is over the limit (%d)", a.config.MaxAllocs)
}
// No reason to gc, exit
if reason == "" {
break
}
// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
break
}
ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar)
a.destroyAllocRunner(gcAlloc.allocRunner, reason)
}
return nil
}
......@@ -124,7 +146,13 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
// destroyAllocRunner is used to destroy an allocation runner. It will acquire a
// lock to restrict parallelism and then destroy the alloc runner, returning
// once the allocation has been destroyed.
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) {
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason string) {
id := "<nil>"
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)
// Acquire the destroy lock
select {
case <-a.shutdownCh:
......@@ -155,11 +183,7 @@ func (a *AllocGarbageCollector) Collect(allocID string) error {
if err != nil {
return fmt.Errorf("unable to collect allocation %q: %v", allocID, err)
}
ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID)
a.destroyAllocRunner(ar)
a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection")
return nil
}
......@@ -177,9 +201,7 @@ func (a *AllocGarbageCollector) CollectAll() error {
break
}
ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID)
go a.destroyAllocRunner(ar)
go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full collection")
}
return nil
}
......@@ -187,6 +209,26 @@ func (a *AllocGarbageCollector) CollectAll() error {
// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.numAllocs() > max {
select {
case <-a.shutdownCh:
return nil
default:
}
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
// It's fine if we can't lower below the limit here as
// we'll keep trying to drop below the limit with each
// periodic gc
break
}
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(gcAlloc.allocRunner, "new allocations")
}
totalResource := &structs.Resources{}
for _, alloc := range allocations {
if err := totalResource.Add(alloc.Resources); err != nil {
......@@ -244,10 +286,9 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)
// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar)
a.destroyAllocRunner(ar, fmt.Sprintf("freeing %d MB for new allocations", alloc.Resources.DiskMB))
// Call stats collect again
diskCleared += alloc.Resources.DiskMB
......@@ -261,8 +302,7 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error {
return fmt.Errorf("nil allocation runner inserted for garbage collection")
}
if ar.Alloc() == nil {
a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting")
a.destroyAllocRunner(ar)
a.destroyAllocRunner(ar, "alloc is nil")
}
a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
......@@ -281,6 +321,12 @@ func (a *AllocGarbageCollector) Remove(ar *AllocRunner) {
}
}
// numAllocs returns the total number of allocs tracked by the client as well
// as those marked for GC.
func (a *AllocGarbageCollector) numAllocs() int {
return a.allocRunners.Length() + a.allocCounter.NumAllocs()
}
// GCAlloc wraps an allocation runner and an index enabling it to be used within
// a PQ
type GCAlloc struct {
......
package client
import (
"log"
"os"
"testing"
"time"
......@@ -11,11 +9,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
var gcConfig = GCConfig{
DiskUsageThreshold: 80,
InodeUsageThreshold: 70,
Interval: 1 * time.Minute,
ReservedDiskMB: 0,
func gcConfig() *GCConfig {
return &GCConfig{
DiskUsageThreshold: 80,
InodeUsageThreshold: 70,
Interval: 1 * time.Minute,
ReservedDiskMB: 0,
MaxAllocs: 100,
}
}
func TestIndexedGCAllocPQ(t *testing.T) {
......@@ -57,6 +58,15 @@ func TestIndexedGCAllocPQ(t *testing.T) {
}
}
// MockAllocCounter implements AllocCounter interface.
type MockAllocCounter struct {
allocs int
}
func (m *MockAllocCounter) NumAllocs() int {
return m.allocs
}
type MockStatsCollector struct {
availableValues []uint64
usedPercents []float64
......@@ -90,8 +100,8 @@ func (m *MockStatsCollector) Stats() *stats.HostStats {
}
func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &gcConfig)
logger := testLogger()
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
if err := gc.MarkForCollection(ar1); err != nil {
......@@ -105,8 +115,8 @@ func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
}
func TestAllocGarbageCollector_Collect(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &gcConfig)
logger := testLogger()
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
......@@ -131,8 +141,8 @@ func TestAllocGarbageCollector_Collect(t *testing.T) {
}
func TestAllocGarbageCollector_CollectAll(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &gcConfig)
logger := testLogger()
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig())
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
......@@ -153,10 +163,11 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) {
}
func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
logger := testLogger()
statsCollector := &MockStatsCollector{}
gcConfig.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &gcConfig)
conf := gcConfig()
conf.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
......@@ -190,10 +201,11 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T)
}
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
logger := testLogger()
statsCollector := &MockStatsCollector{}
gcConfig.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &gcConfig)
conf := gcConfig()
conf.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
......@@ -228,10 +240,11 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
}
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
logger := testLogger()
statsCollector := &MockStatsCollector{}
gcConfig.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &gcConfig)
conf := gcConfig()
conf.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
......@@ -262,10 +275,11 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
}
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
logger := testLogger()
statsCollector := &MockStatsCollector{}
gcConfig.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &gcConfig)
conf := gcConfig()
conf.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
......@@ -294,11 +308,49 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T)
}
}
func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
const (
liveAllocs = 3
maxAllocs = 6
gcAllocs = 4
gcAllocsLeft = 1
)
logger := testLogger()
statsCollector := &MockStatsCollector{
availableValues: []uint64{10 * 1024 * MB},
usedPercents: []float64{0},
inodePercents: []float64{0},
}
allocCounter := &MockAllocCounter{allocs: liveAllocs}
conf := gcConfig()
conf.MaxAllocs = maxAllocs
gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf)
for i := 0; i < gcAllocs; i++ {
_, ar := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar.waitCh)
if err := gc.MarkForCollection(ar); err != nil {
t.Fatalf("error marking alloc for gc: %v", err)
}
}
if err := gc.MakeRoomFor([]*structs.Allocation{mock.Alloc(), mock.Alloc()}); err != nil {
t.Fatalf("error making room for 2 new allocs: %v", err)
}
// There should be gcAllocsLeft alloc runners left to be collected
if n := len(gc.allocRunners.index); n != gcAllocsLeft {
t.Fatalf("expected %d remaining GC-able alloc runners but found %d", gcAllocsLeft, n)
}
}
func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
logger := testLogger()
statsCollector := &MockStatsCollector{}
gcConfig.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &gcConfig)
conf := gcConfig()
conf.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
......@@ -329,10 +381,11 @@ func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
}
func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
logger := testLogger()
statsCollector := &MockStatsCollector{}
gcConfig.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &gcConfig)
conf := gcConfig()
conf.ReservedDiskMB = 20
gc := NewAllocGarbageCollector(logger, statsCollector, &MockAllocCounter{}, conf)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
......@@ -363,3 +416,40 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
t.Fatalf("gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_MaxAllocsThreshold(t *testing.T) {
const (
liveAllocs = 3
maxAllocs = 6
gcAllocs = 4
gcAllocsLeft = 1
)
logger := testLogger()
statsCollector := &MockStatsCollector{
availableValues: []uint64{1000},
usedPercents: []float64{0},
inodePercents: []float64{0},
}
allocCounter := &MockAllocCounter{allocs: liveAllocs}
conf := gcConfig()
conf.MaxAllocs = 4
gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf)
for i := 0; i < gcAllocs; i++ {
_, ar := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar.waitCh)
if err := gc.MarkForCollection(ar); err != nil {
t.Fatalf("error marking alloc for gc: %v", err)
}
}
if err := gc.keepUsageBelowThreshold(); err != nil {
t.Fatalf("error gc'ing: %v", err)
}
// We should have gc'd down to MaxAllocs
if n := len(gc.allocRunners.index); n != gcAllocsLeft {
t.Fatalf("expected remaining gc allocs (%d) to equal %d", n, gcAllocsLeft)
}
}
......@@ -31,7 +31,10 @@ func testLogger() *log.Logger {
}
func prefixedTestLogger(prefix string) *log.Logger {
return log.New(os.Stderr, prefix, log.LstdFlags)
if testing.Verbose() {
return log.New(os.Stderr, prefix, log.LstdFlags)
}
return log.New(ioutil.Discard, "", 0)
}
type MockTaskStateUpdater struct {
......
......@@ -321,6 +321,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.GCParallelDestroys = a.config.Client.GCParallelDestroys
conf.GCDiskUsageThreshold = a.config.Client.GCDiskUsageThreshold
conf.GCInodeUsageThreshold = a.config.Client.GCInodeUsageThreshold
conf.GCMaxAllocs = a.config.Client.GCMaxAllocs
conf.NoHostUUID = a.config.Client.NoHostUUID
return conf, nil
......
......@@ -58,6 +58,7 @@ client {
gc_parallel_destroys = 6
gc_disk_usage_threshold = 82
gc_inode_usage_threshold = 91
gc_max_allocs = 50
no_host_uuid = true
}
server {
......
......@@ -212,14 +212,18 @@ type ClientConfig struct {
// collector will allow.
GCParallelDestroys int `mapstructure:"gc_parallel_destroys"`
// GCInodeUsageThreshold is the inode usage threshold beyond which the Nomad
// client triggers GC of the terminal allocations
// GCDiskUsageThreshold is the disk usage threshold given as a percent
// beyond which the Nomad client triggers GC of terminal allocations
GCDiskUsageThreshold float64 `mapstructure:"gc_disk_usage_threshold"`
// GCInodeUsageThreshold is the inode usage threshold beyond which the Nomad
// client triggers GC of the terminal allocations
GCInodeUsageThreshold float64 `mapstructure:"gc_inode_usage_threshold"`
// GCMaxAllocs is the maximum number of allocations a node can have
// before garbage collection is triggered.
GCMaxAllocs int `mapstructure:"gc_max_allocs"`
// NoHostUUID disables using the host's UUID and will force generation of a
// random UUID.
NoHostUUID bool `mapstructure:"no_host_uuid"`
......@@ -506,6 +510,7 @@ func DevConfig() *Config {
conf.Client.GCInterval = 10 * time.Minute
conf.Client.GCDiskUsageThreshold = 99
conf.Client.GCInodeUsageThreshold = 99
conf.Client.GCMaxAllocs = 50
return conf
}
......@@ -535,8 +540,9 @@ func DefaultConfig() *Config {
Reserved: &Resources{},
GCInterval: 1 * time.Minute,
GCParallelDestroys: 2,
GCInodeUsageThreshold: 70,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
},
Server: &ServerConfig{
Enabled: false,
......@@ -986,6 +992,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.GCInodeUsageThreshold != 0 {
result.GCInodeUsageThreshold = b.GCInodeUsageThreshold
}
if b.GCMaxAllocs != 0 {
result.GCMaxAllocs = b.GCMaxAllocs
}
if b.NoHostUUID {
result.NoHostUUID = b.NoHostUUID
}
......
......@@ -346,6 +346,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
"gc_disk_usage_threshold",
"gc_inode_usage_threshold",
"gc_parallel_destroys",
"gc_max_allocs",
"no_host_uuid",
}
if err := checkHCLKeys(listVal, valid); err != nil {
......
......@@ -3,10 +3,12 @@ package agent
import (
"path/filepath"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/kr/pretty"
)
func TestConfig_Parse(t *testing.T) {
......@@ -75,6 +77,7 @@ func TestConfig_Parse(t *testing.T) {
GCParallelDestroys: 6,
GCDiskUsageThreshold: 82,
GCInodeUsageThreshold: 91,
GCMaxAllocs: 50,
NoHostUUID: true,
},
Server: &ServerConfig{
......@@ -165,22 +168,20 @@ func TestConfig_Parse(t *testing.T) {
}
for _, tc := range cases {
t.Logf("Testing parse: %s", tc.File)
t.Run(tc.File, func(t *testing.T) {
path, err := filepath.Abs(filepath.Join("./config-test-fixtures", tc.File))
if err != nil {
t.Fatalf("file: %s\n\n%s", tc.File, err)
}
path, err := filepath.Abs(filepath.Join("./config-test-fixtures", tc.File))
if err != nil {
t.Fatalf("file: %s\n\n%s", tc.File, err)
continue
}
actual, err := ParseConfigFile(path)
if (err != nil) != tc.Err {
t.Fatalf("file: %s\n\n%s", tc.File, err)
}
actual, err := ParseConfigFile(path)
if (err != nil) != tc.Err {
t.Fatalf("file: %s\n\n%s", tc.File, err)
continue
}
if !reflect.DeepEqual(actual, tc.Result) {
t.Fatalf("file: %s\n\n%#v\n\n%#v", tc.File, actual, tc.Result)
}
if !reflect.DeepEqual(actual, tc.Result) {
t.Errorf("file: %s diff: (actual vs expected)\n\n%s", tc.File, strings.Join(pretty.Diff(actual, tc.Result), "\n"))
}
})
}
}
package config
import (
"net/http"
"strings"
"time"
......@@ -144,6 +145,8 @@ func (a *ConsulConfig) Merge(b *ConsulConfig) *ConsulConfig {
// ApiConfig returns a usable Consul config that can be passed directly to
// hashicorp/consul/api. NOTE: datacenter is not set
func (c *ConsulConfig) ApiConfig() (*consul.Config, error) {
// Get the default config from consul to reuse things like the default
// http.Transport.
config := consul.DefaultConfig()
if c.Addr != "" {
config.Address = c.Addr
......@@ -152,7 +155,12 @@ func (c *ConsulConfig) ApiConfig() (*consul.Config, error) {
config.Token = c.Token
}
if c.Timeout != 0 {
// Create a custom Client to set the timeout
if config.HttpClient == nil {
config.HttpClient = &http.Client{}
}
config.HttpClient.Timeout = c.Timeout
config.HttpClient.Transport = config.Transport
}
if c.Auth != "" {
var username, password string
......@@ -180,6 +188,11 @@ func (c *ConsulConfig) ApiConfig() (*consul.Config, error) {
if c.VerifySSL != nil {
config.TLSConfig.InsecureSkipVerify = !*c.VerifySSL
}
tlsConfig, err := consul.SetupTLSConfig(&config.TLSConfig)
if err != nil {
return nil, err
}
config.Transport.TLSClientConfig = tlsConfig
}
return config, nil
......
......@@ -14,6 +14,12 @@ import (
"time"
)
func init() {
// Seed the default rand Source with current time to produce better random
// numbers used with splay
rand.Seed(time.Now().UnixNano())
}
var (
// ErrMissingCommand is the error returned when no command is specified
// to run.
......
......@@ -8,11 +8,14 @@ import (
const (
// DefaultRetryAttempts is the default number of maximum retry attempts.
DefaultRetryAttempts = 5
DefaultRetryAttempts = 12
// DefaultRetryBackoff is the default base for the exponential backoff
// algorithm.
DefaultRetryBackoff = 250 * time.Millisecond
// DefaultRetryMaxBackoff is the default maximum of backoff time
DefaultRetryMaxBackoff = 1 * time.Minute
)
// RetryFunc is the signature of a function that supports retries.
......@@ -23,12 +26,17 @@ type RetryFunc func(int) (bool, time.Duration)
type RetryConfig struct {
// Attempts is the total number of maximum attempts to retry before letting
// the error fall through.
// 0 means unlimited.
Attempts *int
// Backoff is the base of the exponentialbackoff. This number will be
// multipled by the next power of 2 on each iteration.
Backoff *time.Duration
// MaxBackoff is an upper limit to the sleep time between retries
// A MaxBackoff of zero means there is no limit to the exponential growth of the backoff.
MaxBackoff *time.Duration `mapstructure:"max_backoff"`
// Enabled signals if this retry is enabled.
Enabled *bool
}
......@@ -51,6 +59,8 @@ func (c *RetryConfig) Copy() *RetryConfig {
o.Backoff = c.Backoff
o.MaxBackoff = c.MaxBackoff
o.Enabled = c.Enabled
return &o
......@@ -82,6 +92,10 @@ func (c *RetryConfig) Merge(o *RetryConfig) *RetryConfig {
r.Backoff = o.Backoff
}
if o.MaxBackoff != nil {
r.MaxBackoff = o.MaxBackoff
}
if o.Enabled != nil {
r.Enabled = o.Enabled
}
......@@ -103,6 +117,11 @@ func (c *RetryConfig) RetryFunc() RetryFunc {
base := math.Pow(2, float64(retry))
sleep := time.Duration(base) * TimeDurationVal(c.Backoff)
maxSleep := TimeDurationVal(c.MaxBackoff)
if maxSleep > 0 && maxSleep < sleep {
return true, maxSleep
}
return true, sleep
}
}
......@@ -117,6 +136,10 @@ func (c *RetryConfig) Finalize() {
c.Backoff = TimeDuration(DefaultRetryBackoff)
}
if c.MaxBackoff == nil {
c.MaxBackoff = TimeDuration(DefaultRetryMaxBackoff)
}
if c.Enabled == nil {
c.Enabled = Bool(true)
}
......@@ -131,10 +154,12 @@ func (c *RetryConfig) GoString() string {
return fmt.Sprintf("&RetryConfig{"+
"Attempts:%s, "+
"Backoff:%s, "+
"MaxBackoff:%s, "+
"Enabled:%s"+
"}",
IntGoString(c.Attempts),
TimeDurationGoString(c.Backoff),
TimeDurationGoString(c.MaxBackoff),
BoolGoString(c.Enabled),
)
}
......@@ -128,6 +128,7 @@ func (d *CatalogNodeQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interf
ID: node.Node.ID,
Node: node.Node.Node,
Address: node.Node.Address,
Datacenter: node.Node.Datacenter,
TaggedAddresses: node.Node.TaggedAddresses,
Meta: node.Node.Meta,
},
......
......@@ -28,6 +28,7 @@ type Node struct {
ID string
Node string
Address string
Datacenter string
TaggedAddresses map[string]string
Meta map[string]string
}
......@@ -86,6 +87,7 @@ func (d *CatalogNodesQuery) Fetch(clients *ClientSet, opts *QueryOptions) (inter
ID: node.ID,
Node: node.Node,
Address: node.Address,
Datacenter: node.Datacenter,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
})
......
......@@ -27,6 +27,7 @@ type CatalogService struct {
ID string
Node string
Address string
Datacenter string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string
......@@ -101,6 +102,7 @@ func (d *CatalogServiceQuery) Fetch(clients *ClientSet, opts *QueryOptions) (int
ID: s.ID,
Node: s.Node,
Address: s.Address,
Datacenter: s.Datacenter,
TaggedAddresses: s.TaggedAddresses,
NodeMeta: s.NodeMeta,
ServiceID: s.ServiceID,
......
......@@ -25,8 +25,8 @@ type ClientSet struct {
// consulClient is a wrapper around a real Consul API client.
type consulClient struct {
client *consulapi.Client
httpClient *http.Client
client *consulapi.Client
transport *http.Transport
}
// vaultClient is a wrapper around a real Vault API client.
......@@ -169,7 +169,7 @@ func (c *ClientSet) CreateConsulClient(i *CreateConsulClientInput) error {
}
// Setup the new transport
consulConfig.HttpClient.Transport = transport
consulConfig.Transport = transport
// Create the API client
client, err := consulapi.NewClient(consulConfig)
......@@ -180,8 +180,8 @@ func (c *ClientSet) CreateConsulClient(i *CreateConsulClientInput) error {
// Save the data on ourselves
c.Lock()
c.consul = &consulClient{
client: client,
httpClient: consulConfig.HttpClient,
client: client,
transport: transport,
}
c.Unlock()
......@@ -323,7 +323,7 @@ func (c *ClientSet) Stop() {
defer c.Unlock()
if c.consul != nil {
c.consul.httpClient.Transport.(*http.Transport).CloseIdleConnections()
c.consul.transport.CloseIdleConnections()
}
if c.vault != nil {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment