Unverified Commit ed4ed16b authored by Preetha Appan's avatar Preetha Appan
Browse files

Parsing and API layer for spread stanza

parent 72570e06
Showing with 350 additions and 495 deletions
+350 -495
......@@ -29,9 +29,13 @@ func TestCompose(t *testing.T) {
})
// Compose a task group
st1 := NewSpreadTarget("dc1", 80)
st2 := NewSpreadTarget("dc2", 20)
grp := NewTaskGroup("grp1", 2).
Constrain(NewConstraint("kernel.name", "=", "linux")).
AddAffinity(NewAffinity("${node.class}", "=", "large", 50)).
AddSpread(NewSpread("${node.datacenter}", 30, []*SpreadTarget{st1, st2})).
SetMeta("foo", "bar").
AddTask(task)
......@@ -81,6 +85,22 @@ func TestCompose(t *testing.T) {
Weight: 50,
},
},
Spreads: []*Spread{
{
Attribute: "${node.datacenter}",
Weight: 30,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
},
},
Tasks: []*Task{
{
Name: "task1",
......
......@@ -613,6 +613,7 @@ type Job struct {
Affinities []*Affinity
TaskGroups []*TaskGroup
Update *UpdateStrategy
Spreads []*Spread
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
Dispatched bool
......@@ -855,6 +856,11 @@ func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job {
return j
}
func (j *Job) AddSpread(s *Spread) *Job {
j.Spreads = append(j.Spreads, s)
return j
}
type WriteRequest struct {
// The target region for this write
Region string
......
......@@ -1387,6 +1387,57 @@ func TestJobs_Sort(t *testing.T) {
}
}
func TestJobs_AddSpread(t *testing.T) {
t.Parallel()
job := &Job{Spreads: nil}
// Create and add a Spread
spreadTarget := NewSpreadTarget("r1", 50)
spread := NewSpread("${meta.rack}", 100, []*SpreadTarget{spreadTarget})
out := job.AddSpread(spread)
if n := len(job.Spreads); n != 1 {
t.Fatalf("expected 1 spread, got: %d", n)
}
// Check that the job was returned
if job != out {
t.Fatalf("expect: %#v, got: %#v", job, out)
}
// Adding another spread preserves the original
spreadTarget2 := NewSpreadTarget("dc1", 100)
spread2 := NewSpread("${node.datacenter}", 100, []*SpreadTarget{spreadTarget2})
job.AddSpread(spread2)
expect := []*Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
}
if !reflect.DeepEqual(job.Spreads, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, job.Spreads)
}
}
func TestJobs_Summary_WithACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)
......
......@@ -219,6 +219,34 @@ func (p *ReschedulePolicy) String() string {
return fmt.Sprintf("%v in %v with %v delay, max_delay = %v", *p.Attempts, *p.Interval, *p.DelayFunction, *p.MaxDelay)
}
// Spread is used to serialize task group allocation spread preferences
type Spread struct {
Attribute string
Weight int
SpreadTarget []*SpreadTarget
}
// SpreadTarget is used to serialize target allocation spread percentages
type SpreadTarget struct {
Value string
Percent uint32
}
func NewSpreadTarget(value string, percent uint32) *SpreadTarget {
return &SpreadTarget{
Value: value,
Percent: percent,
}
}
func NewSpread(attribute string, weight int, spreadTargets []*SpreadTarget) *Spread {
return &Spread{
Attribute: attribute,
Weight: weight,
SpreadTarget: spreadTargets,
}
}
// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
......@@ -432,6 +460,7 @@ type TaskGroup struct {
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
......@@ -573,6 +602,12 @@ func (g *TaskGroup) RequireDisk(disk *EphemeralDisk) *TaskGroup {
return g
}
// AddSpread is used to add a new spread preference to a task group.
func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {
g.Spreads = append(g.Spreads, s)
return g
}
// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles *int `mapstructure:"max_files"`
......
......@@ -116,6 +116,57 @@ func TestTaskGroup_SetMeta(t *testing.T) {
}
}
func TestTaskGroup_AddSpread(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
// Create and add spread
spreadTarget := NewSpreadTarget("r1", 50)
spread := NewSpread("${meta.rack}", 100, []*SpreadTarget{spreadTarget})
out := grp.AddSpread(spread)
if n := len(grp.Spreads); n != 1 {
t.Fatalf("expected 1 spread, got: %d", n)
}
// Check that the group was returned
if out != grp {
t.Fatalf("expected: %#v, got: %#v", grp, out)
}
// Add a second spread
spreadTarget2 := NewSpreadTarget("dc1", 100)
spread2 := NewSpread("${node.datacenter}", 100, []*SpreadTarget{spreadTarget2})
grp.AddSpread(spread2)
expect := []*Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
}
if !reflect.DeepEqual(grp.Spreads, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp.Spreads)
}
}
func TestTaskGroup_AddTask(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
......
This diff is collapsed.
......@@ -635,6 +635,13 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
}
}
if l := len(job.Spreads); l != 0 {
j.Spreads = make([]*structs.Spread, l)
for i, apiSpread := range job.Spreads {
j.Spreads[i] = ApiSpreadToStructs(apiSpread)
}
}
if job.Periodic != nil {
j.Periodic = &structs.PeriodicConfig{
Enabled: *job.Periodic.Enabled,
......@@ -722,6 +729,13 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Migrate: *taskGroup.EphemeralDisk.Migrate,
}
if l := len(taskGroup.Spreads); l != 0 {
tg.Spreads = make([]*structs.Spread, l)
for k, spread := range taskGroup.Spreads {
tg.Spreads[k] = ApiSpreadToStructs(spread)
}
}
if taskGroup.Update != nil {
tg.Update = &structs.UpdateStrategy{
Stagger: *taskGroup.Update.Stagger,
......@@ -922,3 +936,19 @@ func ApiAffinityToStructs(a1 *api.Affinity) *structs.Affinity {
Weight: a1.Weight,
}
}
func ApiSpreadToStructs(a1 *api.Spread) *structs.Spread {
ret := &structs.Spread{}
ret.Attribute = a1.Attribute
ret.Weight = a1.Weight
if a1.SpreadTarget != nil {
ret.SpreadTarget = make([]*structs.SpreadTarget, len(a1.SpreadTarget))
for i, st := range a1.SpreadTarget {
ret.SpreadTarget[i] = &structs.SpreadTarget{
Value: st.Value,
Percent: st.Percent,
}
}
}
return ret
}
......@@ -1229,6 +1229,18 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
AutoRevert: helper.BoolToPtr(false),
Canary: helper.IntToPtr(1),
},
Spreads: []*api.Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*api.SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
},
Periodic: &api.PeriodicConfig{
Enabled: helper.BoolToPtr(true),
Spec: helper.StringToPtr("spec"),
......@@ -1284,6 +1296,18 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
MinHealthyTime: helper.TimeToPtr(12 * time.Hour),
HealthyDeadline: helper.TimeToPtr(12 * time.Hour),
},
Spreads: []*api.Spread{
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*api.SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
},
EphemeralDisk: &api.EphemeralDisk{
SizeMB: helper.IntToPtr(100),
Sticky: helper.BoolToPtr(true),
......@@ -1475,6 +1499,18 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Weight: 50,
},
},
Spreads: []*structs.Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
},
Update: structs.UpdateStrategy{
Stagger: 1 * time.Second,
MaxParallel: 5,
......@@ -1520,6 +1556,18 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Delay: 10 * time.Second,
Mode: "delay",
},
Spreads: []*structs.Spread{
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
},
ReschedulePolicy: &structs.ReschedulePolicy{
Interval: 12 * time.Hour,
Attempts: 5,
......
......@@ -111,6 +111,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
delete(m, "reschedule")
delete(m, "update")
delete(m, "vault")
delete(m, "spread")
// Set the ID and name to the object key
result.ID = helper.StringToPtr(obj.Keys[0].Token.Value().(string))
......@@ -134,6 +135,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
"all_at_once",
"constraint",
"affinity",
"spread",
"datacenters",
"group",
"id",
......@@ -184,6 +186,13 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse spread
if o := listVal.Filter("spread"); len(o.Items) > 0 {
if err := parseSpread(&result.Spreads, o); err != nil {
return multierror.Prefix(err, "spread ->")
}
}
// If we have a parameterized definition, then parse that
if o := listVal.Filter("parameterized"); len(o.Items) > 0 {
if err := parseParameterizedJob(&result.ParameterizedJob, o); err != nil {
......@@ -305,6 +314,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"reschedule",
"vault",
"migrate",
"spread",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
......@@ -323,6 +333,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
delete(m, "update")
delete(m, "vault")
delete(m, "migrate")
delete(m, "spread")
// Build the group with the basic decode
var g api.TaskGroup
......@@ -352,6 +363,13 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse spread
if o := listVal.Filter("spread"); len(o.Items) > 0 {
if err := parseSpread(&g.Spreads, o); err != nil {
return multierror.Prefix(err, "spread ->")
}
}
// Parse reschedule policy
if o := listVal.Filter("reschedule"); len(o.Items) > 0 {
if err := parseReschedulePolicy(&g.ReschedulePolicy, o); err != nil {
......@@ -703,6 +721,95 @@ func parseEphemeralDisk(result **api.EphemeralDisk, list *ast.ObjectList) error
return nil
}
func parseSpread(result *[]*api.Spread, list *ast.ObjectList) error {
for _, o := range list.Elem().Items {
// Check for invalid keys
valid := []string{
"attribute",
"weight",
"target",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return err
}
// We need this later
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("spread should be an object")
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
delete(m, "target")
// Build spread
var s api.Spread
if err := mapstructure.WeakDecode(m, &s); err != nil {
return err
}
// Parse spread target
if o := listVal.Filter("target"); len(o.Items) > 0 {
if err := parseSpreadTarget(&s.SpreadTarget, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("error parsing spread target"))
}
}
*result = append(*result, &s)
}
return nil
}
func parseSpreadTarget(result *[]*api.SpreadTarget, list *ast.ObjectList) error {
seen := make(map[string]struct{})
for _, item := range list.Items {
n := item.Keys[0].Token.Value().(string)
// Make sure we haven't already found this
if _, ok := seen[n]; ok {
return fmt.Errorf("target '%s' defined more than once", n)
}
seen[n] = struct{}{}
// We need this later
var listVal *ast.ObjectList
if ot, ok := item.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("target should be an object")
}
// Check for invalid keys
valid := []string{
"percent",
"value",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err
}
// Decode spread target
var g api.SpreadTarget
g.Value = n
if err := mapstructure.WeakDecode(m, &g); err != nil {
return err
}
*result = append(*result, &g)
}
return nil
}
// parseBool takes an interface value and tries to convert it to a boolean and
// returns an error if the type can't be converted.
func parseBool(value interface{}) (bool, error) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment