"git@git.gitsec.cn:baidan/OpenSpace.git" did not exist on "0125fb7a3be4d6e6529781522f5d9e9a826241fb"
Unverified Commit af5d42c0 authored by Danielle Lancashire's avatar Danielle Lancashire
Browse files

structs: Unify Volume and VolumeRequest

parent 0f5cf5fa
Showing with 67 additions and 90 deletions
+67 -90
......@@ -30,20 +30,21 @@ func (*volumeHook) Name() string {
return "volumes"
}
func validateHostVolumes(requested map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) error {
func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) error {
var result error
for n, req := range requested {
if req.Volume.Type != "host" {
for n, req := range requestedByAlias {
if req.Type != structs.VolumeTypeHost {
continue
}
cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err))
continue
}
_, ok := client[cfg.Source]
_, ok := clientVolumesByName[cfg.Source]
if !ok {
result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source))
}
......@@ -52,10 +53,13 @@ func validateHostVolumes(requested map[string]*structs.VolumeRequest, client map
return result
}
func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMount, volumes map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) {
// hostVolumeMountConfigurations takes the users requested volume mounts,
// volumes, and the client host volume configuration and converts them into a
// format that can be used by drivers.
func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) {
var mounts []*drivers.MountConfig
for _, m := range vmounts {
req, ok := volumes[m.Volume]
for _, m := range taskMounts {
req, ok := taskVolumesByAlias[m.Volume]
if !ok {
// Should never happen unless we misvalidated on job submission
return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume)
......@@ -66,7 +70,7 @@ func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMoun
return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err)
}
hostVolume, ok := client[cfg.Source]
hostVolume, ok := clientVolumesByName[cfg.Source]
if !ok {
// Should never happen, but unless the client volumes were mutated during
// the execution of this hook.
......@@ -76,7 +80,7 @@ func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMoun
mcfg := &drivers.MountConfig{
HostPath: hostVolume.Source,
TaskPath: m.Destination,
Readonly: hostVolume.ReadOnly || req.Volume.ReadOnly || m.ReadOnly,
Readonly: hostVolume.ReadOnly || req.ReadOnly || m.ReadOnly,
}
mounts = append(mounts, mcfg)
}
......
......@@ -1327,11 +1327,7 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
}
if len(a.HostVolumes) == 0 && len(b.HostVolumes) != 0 {
cc := make([]*structs.ClientHostVolumeConfig, len(b.HostVolumes))
for k, v := range b.HostVolumes {
cc[k] = v.Copy()
}
result.HostVolumes = cc
result.HostVolumes = structs.CopySliceClientHostVolumeConfig(b.HostVolumes)
} else if len(b.HostVolumes) != 0 {
result.HostVolumes = structs.HostVolumeSliceMerge(a.HostVolumes, b.HostVolumes)
}
......
......@@ -749,7 +749,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
continue
}
vol := &structs.Volume{
vol := &structs.VolumeRequest{
Name: v.Name,
Type: v.Type,
ReadOnly: v.ReadOnly,
......@@ -757,10 +757,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Config: v.Config,
}
tg.Volumes[k] = &structs.VolumeRequest{
Volume: vol,
Config: v.Config,
}
tg.Volumes[k] = vol
}
}
......
......@@ -4885,9 +4885,9 @@ func (tg *TaskGroup) Validate(j *Job) error {
// Validate the Host Volumes
for name, decl := range tg.Volumes {
if decl.Volume.Type != VolumeTypeHost {
if decl.Type != VolumeTypeHost {
// TODO: Remove this error when adding new volume types
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Volume.Type))
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type))
continue
}
......
......@@ -872,9 +872,7 @@ func TestTaskGroup_Validate(t *testing.T) {
tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Volume: &Volume{
Type: "nothost",
},
Type: "nothost",
Config: map[string]interface{}{
"sOuRcE": "foo",
},
......@@ -893,10 +891,7 @@ func TestTaskGroup_Validate(t *testing.T) {
tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Volume: &Volume{
Type: "host",
},
Config: nil,
Type: "host",
},
},
Tasks: []*Task{
......@@ -912,10 +907,7 @@ func TestTaskGroup_Validate(t *testing.T) {
tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Volume: &Volume{
Type: "host",
},
Config: nil,
Type: "host",
},
},
Tasks: []*Task{
......
......@@ -40,23 +40,36 @@ func CopyMapStringClientHostVolumeConfig(m map[string]*ClientHostVolumeConfig) m
return nm
}
func CopySliceClientHostVolumeConfig(s []*ClientHostVolumeConfig) []*ClientHostVolumeConfig {
l := len(s)
if l == 0 {
return nil
}
ns := make([]*ClientHostVolumeConfig, l)
for idx, cfg := range s {
ns[idx] = cfg.Copy()
}
return ns
}
func HostVolumeSliceMerge(a, b []*ClientHostVolumeConfig) []*ClientHostVolumeConfig {
n := make([]*ClientHostVolumeConfig, len(a))
seenKeys := make(map[string]struct{}, len(a))
seenKeys := make(map[string]int, len(a))
for k, v := range a {
if _, ok := seenKeys[v.Name]; ok {
continue
}
n[k] = v.Copy()
seenKeys[v.Name] = struct{}{}
for i, config := range a {
n[i] = config.Copy()
seenKeys[config.Name] = i
}
for k, v := range b {
if _, ok := seenKeys[v.Name]; ok {
for _, config := range b {
if fIndex, ok := seenKeys[config.Name]; ok {
n[fIndex] = config.Copy()
continue
}
n[k] = v.Copy()
seenKeys[v.Name] = struct{}{}
n = append(n, config.Copy())
}
return n
......@@ -78,8 +91,8 @@ func (h *HostVolumeConfig) Copy() *HostVolumeConfig {
return nh
}
// Volume is a representation of a storage volume that a TaskGroup wishes to use.
type Volume struct {
// VolumeRequest is a representation of a storage volume that a TaskGroup wishes to use.
type VolumeRequest struct {
Name string
Type string
ReadOnly bool
......@@ -88,11 +101,11 @@ type Volume struct {
Config map[string]interface{}
}
func (v *Volume) Copy() *Volume {
func (v *VolumeRequest) Copy() *VolumeRequest {
if v == nil {
return nil
}
nv := new(Volume)
nv := new(VolumeRequest)
*nv = *v
if i, err := copystructure.Copy(nv.Config); err != nil {
......@@ -104,13 +117,13 @@ func (v *Volume) Copy() *Volume {
return nv
}
func CopyMapVolumes(s map[string]*Volume) map[string]*Volume {
func CopyMapVolumeRequest(s map[string]*VolumeRequest) map[string]*VolumeRequest {
if s == nil {
return nil
}
l := len(s)
c := make(map[string]*Volume, l)
c := make(map[string]*VolumeRequest, l)
for k, v := range s {
c[k] = v.Copy()
}
......@@ -147,39 +160,6 @@ func CopySliceVolumeMount(s []*VolumeMount) []*VolumeMount {
return c
}
type VolumeRequest struct {
Volume *Volume
Config map[string]interface{}
}
func (h *VolumeRequest) Copy() *VolumeRequest {
if h == nil {
return nil
}
c := new(VolumeRequest)
c.Volume = h.Volume.Copy()
if i, err := copystructure.Copy(h.Config); err != nil {
panic(err.Error())
} else {
c.Config = i.(map[string]interface{})
}
return c
}
func CopyMapVolumeRequest(m map[string]*VolumeRequest) map[string]*VolumeRequest {
if m == nil {
return nil
}
l := len(m)
c := make(map[string]*VolumeRequest, l)
for k, v := range m {
c[k] = v.Copy()
}
return c
}
func ParseHostVolumeConfig(m map[string]interface{}) (*HostVolumeConfig, error) {
var c HostVolumeConfig
err := mapstructure.Decode(m, &c)
......
......@@ -99,7 +99,10 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
ctx Context
// volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are
// a slice because a single task group may request the same volume multiple times.
volumes map[string][]*structs.VolumeRequest
}
......@@ -117,7 +120,7 @@ func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest
// Convert the map from map[DesiredName]Request to map[Source][]Request to improve
// lookup performance. Also filter non-host volumes.
for _, req := range volumes {
if req.Volume.Type != structs.VolumeTypeHost {
if req.Type != structs.VolumeTypeHost {
continue
}
......@@ -142,11 +145,16 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
}
func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
hLen := len(h.volumes)
nLen := len(n.HostVolumes)
rLen := len(h.volumes)
hLen := len(n.HostVolumes)
// Fast path: Requested no volumes. No need to check further.
if rLen == 0 {
return true
}
// Fast path: Requesting more volumes than the node has, can't meet the criteria.
if hLen > nLen {
if rLen > hLen {
return false
}
......
......@@ -109,15 +109,15 @@ func TestHostVolumeChecker(t *testing.T) {
volumes := map[string]*structs.VolumeRequest{
"foo": {
Volume: &structs.Volume{Type: "host"},
Type: "host",
Config: map[string]interface{}{"source": "foo"},
},
"bar": {
Volume: &structs.Volume{Type: "host"},
Type: "host",
Config: map[string]interface{}{"source": "bar"},
},
"baz": {
Volume: &structs.Volume{Type: "nothost"},
Type: "nothost",
Config: map[string]interface{}{"source": "baz"},
},
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment