Commit 0d1a8bf7 authored by Michael Schurter's avatar Michael Schurter
Browse files

rawexec: fix fingerprint results and tests

In tests:
* always cleanup (Kill) the harness
* don't call SetConfig more than once (a race)
* avoid cgroups to allow running without being root
Showing with 119 additions and 76 deletions
+119 -76
......@@ -46,6 +46,7 @@ var (
}
)
// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options.
func PluginLoader(opts map[string]string) (map[string]interface{}, error) {
conf := map[string]interface{}{}
if v, err := strconv.ParseBool(opts["driver.raw_exec.enable"]); err == nil {
......@@ -94,10 +95,10 @@ var (
}
)
// RawExecDriver is a privileged version of the exec driver. It provides no
// Driver is a privileged version of the exec driver. It provides no
// resource isolation and just fork/execs. The Exec driver should be preferred
// and this should only be used when explicitly needed.
type RawExecDriver struct {
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
......@@ -119,8 +120,10 @@ type RawExecDriver struct {
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the plugin output which is usually an 'executor.out'
// file located in the root of the TaskDir
// fingerprintPeriod allows overriding the global fingerprint period
fingerprintPeriod time.Duration
// logger will log to the Nomad agent
logger hclog.Logger
}
......@@ -140,10 +143,10 @@ type TaskConfig struct {
Args []string `codec:"args"`
}
// RawExecTaskState is the state which is encoded in the handle returned in
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type RawExecTaskState struct {
type TaskState struct {
ReattachConfig *utils.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
......@@ -154,25 +157,26 @@ type RawExecTaskState struct {
func NewRawExecDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &RawExecDriver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
fingerprintPeriod: fingerprintPeriod,
logger: logger,
}
}
func (r *RawExecDriver) PluginInfo() (*base.PluginInfoResponse, error) {
func (r *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
func (r *RawExecDriver) ConfigSchema() (*hclspec.Spec, error) {
func (r *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
func (r *RawExecDriver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error {
func (r *Driver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error {
var config Config
if err := base.MsgPackDecode(data, &config); err != nil {
return err
......@@ -185,26 +189,26 @@ func (r *RawExecDriver) SetConfig(data []byte, cfg *base.ClientAgentConfig) erro
return nil
}
func (r *RawExecDriver) Shutdown(ctx context.Context) error {
func (r *Driver) Shutdown(ctx context.Context) error {
r.signalShutdown()
return nil
}
func (r *RawExecDriver) TaskConfigSchema() (*hclspec.Spec, error) {
func (r *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
func (r *RawExecDriver) Capabilities() (*drivers.Capabilities, error) {
func (r *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}
func (r *RawExecDriver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
func (r *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go r.handleFingerprint(ctx, ch)
return ch, nil
}
func (r *RawExecDriver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
func (r *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
defer close(ch)
ticker := time.NewTimer(0)
for {
......@@ -214,38 +218,38 @@ func (r *RawExecDriver) handleFingerprint(ctx context.Context, ch chan *drivers.
case <-r.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ticker.Reset(r.fingerprintPeriod)
ch <- r.buildFingerprint()
}
}
}
func (r *RawExecDriver) buildFingerprint() *drivers.Fingerprint {
func (r *Driver) buildFingerprint() *drivers.Fingerprint {
var health drivers.HealthState
var desc string
attrs := map[string]string{}
if r.config.Enabled {
health = drivers.HealthStateHealthy
desc = "raw_exec enabled"
desc = "ready"
attrs["driver.raw_exec"] = "1"
} else {
health = drivers.HealthStateUndetected
desc = "raw_exec disabled"
desc = "disabled"
}
return &drivers.Fingerprint{
Attributes: map[string]string{},
Attributes: attrs,
Health: health,
HealthDescription: desc,
}
}
func (r *RawExecDriver) RecoverTask(handle *drivers.TaskHandle) error {
func (r *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("error: handle cannot be nil")
}
var taskState RawExecTaskState
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
r.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode task state from handle: %v", err)
......@@ -283,7 +287,7 @@ func (r *RawExecDriver) RecoverTask(handle *drivers.TaskHandle) error {
return nil
}
func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
func (r *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
if _, ok := r.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
......@@ -335,7 +339,7 @@ func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle,
logger: r.logger,
}
driverState := RawExecTaskState{
driverState := TaskState{
ReattachConfig: utils.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
......@@ -354,7 +358,7 @@ func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle,
return handle, nil, nil
}
func (r *RawExecDriver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
func (r *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := r.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
......@@ -366,7 +370,7 @@ func (r *RawExecDriver) WaitTask(ctx context.Context, taskID string) (<-chan *dr
return ch, nil
}
func (r *RawExecDriver) handleWait(ctx context.Context, handle *rawExecTaskHandle, ch chan *drivers.ExitResult) {
func (r *Driver) handleWait(ctx context.Context, handle *rawExecTaskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
......@@ -390,7 +394,7 @@ func (r *RawExecDriver) handleWait(ctx context.Context, handle *rawExecTaskHandl
}
}
func (r *RawExecDriver) StopTask(taskID string, timeout time.Duration, signal string) error {
func (r *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := r.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
......@@ -406,7 +410,7 @@ func (r *RawExecDriver) StopTask(taskID string, timeout time.Duration, signal st
return nil
}
func (r *RawExecDriver) DestroyTask(taskID string, force bool) error {
func (r *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := r.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
......@@ -430,7 +434,7 @@ func (r *RawExecDriver) DestroyTask(taskID string, force bool) error {
return nil
}
func (r *RawExecDriver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
func (r *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := r.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
......@@ -454,7 +458,7 @@ func (r *RawExecDriver) InspectTask(taskID string) (*drivers.TaskStatus, error)
return status, nil
}
func (r *RawExecDriver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
func (r *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
handle, ok := r.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
......@@ -463,11 +467,11 @@ func (r *RawExecDriver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, e
return handle.exec.Stats()
}
func (r *RawExecDriver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
func (r *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return r.eventer.TaskEvents(ctx)
}
func (r *RawExecDriver) SignalTask(taskID string, signal string) error {
func (r *Driver) SignalTask(taskID string, signal string) error {
handle, ok := r.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
......@@ -481,7 +485,7 @@ func (r *RawExecDriver) SignalTask(taskID string, signal string) error {
return handle.exec.Signal(sig)
}
func (r *RawExecDriver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
func (r *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
if len(cmd) == 0 {
return nil, fmt.Errorf("error cmd must have atleast one value")
}
......
......@@ -37,6 +37,7 @@ func TestRawExecDriver_SetConfig(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
// Disable raw exec.
config := &Config{}
......@@ -44,63 +45,78 @@ func TestRawExecDriver_SetConfig(t *testing.T) {
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
require.Exactly(config, d.(*RawExecDriver).config)
require.Exactly(config, d.(*Driver).config)
config.Enabled = true
config.NoCgroups = true
data = []byte{}
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
require.Exactly(config, d.(*RawExecDriver).config)
require.Exactly(config, d.(*Driver).config)
config.NoCgroups = false
data = []byte{}
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
require.Exactly(config, d.(*RawExecDriver).config)
require.Exactly(config, d.(*Driver).config)
}
func TestRawExecDriver_Fingerprint(t *testing.T) {
t.Parallel()
require := require.New(t)
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
// Disable raw exec.
config := &Config{}
fingerprintTest := func(config *Config, expected *drivers.Fingerprint) func(t *testing.T) {
return func(t *testing.T) {
require := require.New(t)
d := NewRawExecDriver(testlog.HCLogger(t)).(*Driver)
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
fingerCh, err := harness.Fingerprint(context.Background())
require.NoError(err)
select {
case finger := <-fingerCh:
require.Equal(drivers.HealthStateUndetected, finger.Health)
require.Empty(finger.Attributes["driver.raw_exec"])
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout receiving fingerprint")
fingerCh, err := harness.Fingerprint(context.Background())
require.NoError(err)
select {
case result := <-fingerCh:
require.Equal(expected, result)
case <-time.After(time.Duration(testutil.TestMultiplier()) * time.Second):
require.Fail("timeout receiving fingerprint")
}
}
}
// Enable raw exec
config.Enabled = true
data = []byte{}
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
cases := []struct {
Name string
Conf Config
Expected drivers.Fingerprint
}{
{
Name: "Disabled",
Conf: Config{
Enabled: false,
},
Expected: drivers.Fingerprint{
Attributes: nil,
Health: drivers.HealthStateUndetected,
HealthDescription: "disabled",
},
},
{
Name: "Enabled",
Conf: Config{
Enabled: true,
},
Expected: drivers.Fingerprint{
Attributes: map[string]string{"driver.raw_exec": "1"},
Health: drivers.HealthStateHealthy,
HealthDescription: "ready",
},
},
}
FINGER_LOOP:
for {
select {
case finger := <-fingerCh:
if finger.Health == drivers.HealthStateHealthy {
break FINGER_LOOP
}
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout receiving fingerprint")
break FINGER_LOOP
}
for _, tc := range cases {
t.Run(tc.Name, fingerprintTest(&tc.Conf, &tc.Expected))
}
}
......@@ -110,6 +126,7 @@ func TestRawExecDriver_StartWait(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
......@@ -130,6 +147,9 @@ func TestRawExecDriver_StartWait(t *testing.T) {
require.NoError(err)
result := <-ch
require.Zero(result.ExitCode)
require.Zero(result.Signal)
require.False(result.OOMKilled)
require.NoError(result.Err)
require.NoError(harness.DestroyTask(task.ID, true))
}
......@@ -139,6 +159,14 @@ func TestRawExecDriver_StartWaitStop(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
// Disable cgroups so test works without root
config := &Config{NoCgroups: true}
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
......@@ -200,6 +228,14 @@ func TestRawExecDriver_StartWaitRecoverWaitStop(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
// Disable cgroups so test works without root
config := &Config{NoCgroups: true}
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
require.NoError(harness.SetConfig(data, nil))
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
......@@ -233,7 +269,7 @@ func TestRawExecDriver_StartWaitRecoverWaitStop(t *testing.T) {
originalStatus, err := d.InspectTask(task.ID)
require.NoError(err)
d.(*RawExecDriver).tasks.Delete(task.ID)
d.(*Driver).tasks.Delete(task.ID)
wg.Wait()
require.True(waitDone)
......@@ -275,6 +311,7 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
......@@ -328,6 +365,7 @@ func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
......@@ -416,6 +454,7 @@ func TestRawExecDriver_Exec(t *testing.T) {
d := NewRawExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment