Unverified Commit 8c881465 authored by Alex Dadgar's avatar Alex Dadgar Committed by GitHub
Browse files

Merge pull request #4359 from hashicorp/b-tests

test fixes
parents 69a94bae 0e85d203
Showing with 68 additions and 26 deletions
+68 -26
......@@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/client/testutil"
tu "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
......
......@@ -763,6 +763,7 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
task2, _, _ := dockerTask(t)
task2.Config["image"] = "busybox:musl"
task2.Config["load"] = "busybox_musl.tar"
task2.Config["args"] = []string{"-l", "-p", "0"}
task3, _, _ := dockerTask(t)
task3.Config["image"] = "busybox:glibc"
......@@ -773,6 +774,7 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
handles := make([]DriverHandle, len(taskList))
t.Logf("Starting %d tasks", len(taskList))
client := newTestDockerClient(t)
// Let's spin up a bunch of things
for idx, task := range taskList {
......@@ -794,6 +796,7 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
continue
}
handles[idx] = resp.Handle
waitForExist(t, client, resp.Handle.(*DockerHandle))
}
t.Log("All tasks are started. Terminating...")
......
......@@ -40,6 +40,11 @@ const (
// tree for finding out the pids that the executor and it's child processes
// have forked
pidScanInterval = 5 * time.Second
// processOutputCloseTolerance is the length of time we will wait for the
// launched process to close its stdout/stderr before we force close it. If
// data is written after this tolerance, we will not capture it.
processOutputCloseTolerance = 2 * time.Second
)
var (
......@@ -285,6 +290,11 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.cmd.Args, err)
}
// Close the files. This is copied from the os/exec package.
e.lro.processOutWriter.Close()
e.lre.processOutWriter.Close()
go e.collectPids()
go e.wait()
ic := e.resConCtx.getIsolationConfig()
......@@ -832,9 +842,10 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) {
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
hasFinishedCopied chan struct{}
}
// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the
......@@ -846,9 +857,10 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err
}
wrap := &logRotatorWrapper{
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}, 1),
}
wrap.start()
return wrap, nil
......@@ -860,6 +872,7 @@ func (l *logRotatorWrapper) start() {
go func() {
io.Copy(l.rotatorWriter, l.processOutReader)
l.processOutReader.Close() // in case io.Copy stopped due to write error
close(l.hasFinishedCopied)
}()
return
}
......@@ -867,6 +880,12 @@ func (l *logRotatorWrapper) start() {
// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() error {
// Wait up to the close tolerance before we force close
select {
case <-l.hasFinishedCopied:
case <-time.After(processOutputCloseTolerance):
}
err := l.processOutReader.Close()
l.rotatorWriter.Close()
return l.processOutWriter.Close()
return err
}
......@@ -84,10 +84,13 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
if ps.Pid == 0 {
t.Fatalf("expected process to start and have non zero pid")
}
_, err = executor.Wait()
state, err := executor.Wait()
if err != nil {
t.Fatalf("error in waiting for command: %v", err)
}
if state.ExitCode != 0 {
t.Errorf("exited with non-zero code: %v", state.ExitCode)
}
// Check if the resource constraints were applied
memLimits := filepath.Join(ps.IsolationConfig.CgroupPaths["memory"], "memory.limit_in_bytes")
......@@ -135,7 +138,7 @@ ld.so.conf.d/`
act := strings.TrimSpace(string(output))
if act != expected {
t.Fatalf("Command output incorrectly: want %v; got %v", expected, act)
t.Errorf("Command output incorrectly: want %v; got %v", expected, act)
}
}
......
......@@ -144,8 +144,8 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
if agentConfig.Server.RaftProtocol != 0 {
conf.RaftConfig.ProtocolVersion = raft.ProtocolVersion(agentConfig.Server.RaftProtocol)
}
if agentConfig.Server.NumSchedulers != 0 {
conf.NumSchedulers = agentConfig.Server.NumSchedulers
if agentConfig.Server.NumSchedulers != nil {
conf.NumSchedulers = *agentConfig.Server.NumSchedulers
}
if len(agentConfig.Server.EnabledSchedulers) != 0 {
// Convert to a set and require the core scheduler
......
......@@ -15,6 +15,7 @@ import (
"github.com/golang/snappy"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
......@@ -435,7 +436,10 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) {
// snapshotting a valid tar is not returned.
func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
httpTest(t, func(c *Config) {
// Disable the schedulers
c.Server.NumSchedulers = helper.IntToPtr(0)
}, func(s *TestAgent) {
// Create an alloc
state := s.server.State()
alloc := mock.Alloc()
......
......@@ -268,7 +268,7 @@ type ServerConfig struct {
// NumSchedulers is the number of scheduler thread that are run.
// This can be as many as one per core, or zero to disable this server
// from doing any scheduling work.
NumSchedulers int `mapstructure:"num_schedulers"`
NumSchedulers *int `mapstructure:"num_schedulers"`
// EnabledSchedulers controls the set of sub-schedulers that are
// enabled for this server to handle. This will restrict the evaluations
......@@ -1009,8 +1009,8 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.RaftProtocol != 0 {
result.RaftProtocol = b.RaftProtocol
}
if b.NumSchedulers != 0 {
result.NumSchedulers = b.NumSchedulers
if b.NumSchedulers != nil {
result.NumSchedulers = helper.IntToPtr(*b.NumSchedulers)
}
if b.NodeGCThreshold != "" {
result.NodeGCThreshold = b.NodeGCThreshold
......
......@@ -88,7 +88,7 @@ func TestConfig_Parse(t *testing.T) {
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
NumSchedulers: 2,
NumSchedulers: helper.IntToPtr(2),
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
EvalGCThreshold: "12h",
......
......@@ -105,7 +105,7 @@ func TestConfig_Merge(t *testing.T) {
DataDir: "/tmp/data1",
ProtocolVersion: 1,
RaftProtocol: 1,
NumSchedulers: 1,
NumSchedulers: helper.IntToPtr(1),
NodeGCThreshold: "1h",
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
......@@ -255,7 +255,7 @@ func TestConfig_Merge(t *testing.T) {
DataDir: "/tmp/data2",
ProtocolVersion: 2,
RaftProtocol: 2,
NumSchedulers: 2,
NumSchedulers: helper.IntToPtr(2),
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
HeartbeatGrace: 2 * time.Minute,
......
......@@ -222,17 +222,23 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
out := outBuf.String()
t.Logf("Output:\n%s", out)
require.Contains(out, "marked all allocations for migration")
for _, a := range allocs {
if *a.Job.Type == "system" {
if strings.Contains(out, a.ID) {
t.Fatalf("output should not contain system alloc %q", a.ID)
// Unfortunately travis is too slow to reliably see the expected output. The
// monitor goroutines may start only after some or all the allocs have been
// migrated.
if !testutil.IsTravis() {
require.Contains(out, "marked all allocations for migration")
for _, a := range allocs {
if *a.Job.Type == "system" {
if strings.Contains(out, a.ID) {
t.Fatalf("output should not contain system alloc %q", a.ID)
}
continue
}
continue
require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID))
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}
require.Contains(out, fmt.Sprintf("Alloc %q marked for migration", a.ID))
require.Contains(out, fmt.Sprintf("Alloc %q draining", a.ID))
}
expected := fmt.Sprintf("All allocations on node %q have stopped.\n", nodeID)
if !strings.HasSuffix(out, expected) {
t.Fatalf("expected output to end with:\n%s", expected)
......
......@@ -2570,6 +2570,12 @@ func TestFSM_SnapshotRestore_Deployments(t *testing.T) {
state := fsm.State()
d1 := mock.Deployment()
d2 := mock.Deployment()
j := mock.Job()
d1.JobID = j.ID
d2.JobID = j.ID
state.UpsertJob(999, j)
state.UpsertDeployment(1000, d1)
state.UpsertDeployment(1001, d2)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment