Unverified Commit 709abbc6 authored by Danielle Lancashire's avatar Danielle Lancashire
Browse files

scheduler: Add a feasability checker for Host Vols

parent a216daed
Showing with 169 additions and 14 deletions
+169 -14
......@@ -96,6 +96,69 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
return NewStaticIterator(ctx, nodes)
}
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumes map[string][]*structs.VolumeRequest
}
// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
}
}
// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
nm := make(map[string][]*structs.VolumeRequest)
// Convert the map from map[DesiredName]Request to map[Source][]Request to improve
// lookup performance. Also filter non-host volumes.
for _, req := range volumes {
if req.Volume.Type != "host" {
continue
}
cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
// Could not parse host volume config, skip the volume for now.
continue
}
nm[cfg.Source] = append(nm[cfg.Source], req)
}
h.volumes = nm
}
func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
if h.hasVolumes(candidate) {
return true
}
h.ctx.Metrics().FilterNode(candidate, "missing compatible host volumes")
return false
}
func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
hLen := len(h.volumes)
nLen := len(n.HostVolumes)
// Fast path: Requesting more volumes than the node has, can't meet the criteria.
if hLen > nLen {
return false
}
for source := range h.volumes {
if _, ok := n.HostVolumes[source]; !ok {
return false
}
}
return true
}
// DriverChecker is a FeasibilityChecker which returns whether a node has the
// drivers necessary to scheduler a task group.
type DriverChecker struct {
......
......@@ -81,6 +81,88 @@ func TestRandomIterator(t *testing.T) {
}
}
func TestHostVolumeChecker(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {Name: "foo"}}
nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
"foo": {},
"bar": {},
}
nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
"foo": {},
"bar": {},
}
nodes[4].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
"foo": {},
"baz": {},
}
noVolumes := map[string]*structs.VolumeRequest{}
volumes := map[string]*structs.VolumeRequest{
"foo": {
Volume: &structs.Volume{Type: "host"},
Config: map[string]interface{}{"source": "foo"},
},
"bar": {
Volume: &structs.Volume{Type: "host"},
Config: map[string]interface{}{"source": "bar"},
},
"baz": {
Volume: &structs.Volume{Type: "nothost"},
Config: map[string]interface{}{"source": "baz"},
},
}
checker := NewHostVolumeChecker(ctx)
cases := []struct {
Node *structs.Node
RequestedVolumes map[string]*structs.VolumeRequest
Result bool
}{
{ // Nil Volumes, some requested
Node: nodes[0],
RequestedVolumes: volumes,
Result: false,
},
{ // Mismatched set of volumes
Node: nodes[1],
RequestedVolumes: volumes,
Result: false,
},
{ // Happy Path
Node: nodes[2],
RequestedVolumes: volumes,
Result: true,
},
{ // No Volumes requested or available
Node: nodes[3],
RequestedVolumes: noVolumes,
Result: true,
},
{ // No Volumes requested, some available
Node: nodes[4],
RequestedVolumes: noVolumes,
Result: true,
},
}
for i, c := range cases {
checker.SetVolumes(c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
}
}
func TestDriverChecker(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
......
......@@ -44,12 +44,13 @@ type GenericStack struct {
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
distinctHostsConstraint *DistinctHostsIterator
distinctPropertyConstraint *DistinctPropertyIterator
......@@ -129,6 +130,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
......@@ -165,12 +167,13 @@ type SystemStack struct {
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
......@@ -199,6 +202,9 @@ func NewSystemStack(ctx Context) *SystemStack {
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
......@@ -207,7 +213,7 @@ func NewSystemStack(ctx Context) *SystemStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)
// Filter on distinct property constraints.
......@@ -260,6 +266,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.binPack.SetTaskGroup(tg)
......
......@@ -31,12 +31,15 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs)
// Filter on distinct host constraints.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment