Unverified Commit 9778c3ef authored by Danielle Lancashire's avatar Danielle Lancashire
Browse files

volume_manager: Add support for publishing volumes

parent c0442f03
Showing with 161 additions and 26 deletions
+161 -26
......@@ -13,6 +13,8 @@ var (
)
type MountInfo struct {
Source string
IsDevice bool
}
type VolumeMounter interface {
......
......@@ -59,6 +59,10 @@ func (v *volumeManager) stagingDirForVolume(vol *structs.CSIVolume) string {
return filepath.Join(v.mountRoot, StagingDirName, vol.ID, "todo-provide-usage-options")
}
func (v *volumeManager) allocDirForVolume(vol *structs.CSIVolume, alloc *structs.Allocation) string {
return filepath.Join(v.mountRoot, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options")
}
// ensureStagingDir attempts to create a directory for use when staging a volume
// and then validates that the path is not already a mount point for e.g an
// existing volume stage.
......@@ -83,23 +87,31 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string,
return !isNotMount, stagingPath, nil
}
// stageVolume prepares a volume for use by allocations. When a plugin exposes
// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a
// given usage mode before the volume can be NodePublish-ed.
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error {
logger := hclog.FromContext(ctx)
logger.Trace("Preparing volume staging environment")
existingMount, stagingPath, err := v.ensureStagingDir(vol)
if err != nil {
return err
// ensureAllocDir attempts to create a directory for use when publishing a volume
// and then validates that the path is not already a mount point (e.g when reattaching
// to existing allocs).
//
// Returns whether the directory is a pre-existing mountpoint, the publish path,
// and any errors that occurred.
func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation) (bool, string, error) {
allocPath := v.allocDirForVolume(vol, alloc)
// Make the alloc path, owned by the Nomad User
if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) {
return false, "", fmt.Errorf("failed to create allocation directory for volume (%s): %v", vol.ID, err)
}
logger.Trace("Volume staging environment", "pre-existing_mount", existingMount, "staging_path", stagingPath)
if existingMount {
logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath)
return nil
// Validate that it is not already a mount point
m := mount.New()
isNotMount, err := m.IsNotAMountPoint(allocPath)
if err != nil {
return false, "", fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err)
}
return !isNotMount, allocPath, nil
}
func capabilitiesFromVolume(vol *structs.CSIVolume) (*csi.VolumeCapability, error) {
var accessType csi.VolumeAccessType
switch vol.AttachmentMode {
case structs.CSIVolumeAttachmentModeBlockDevice:
......@@ -111,7 +123,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume)
// final check during transformation into the requisite CSI Data type to
// defend against development bugs and corrupted state - and incompatible
// nomad versions in the future.
return fmt.Errorf("Unknown volume attachment mode: %s", vol.AttachmentMode)
return nil, fmt.Errorf("Unknown volume attachment mode: %s", vol.AttachmentMode)
}
var accessMode csi.VolumeAccessMode
......@@ -131,7 +143,38 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume)
// final check during transformation into the requisite CSI Data type to
// defend against development bugs and corrupted state - and incompatible
// nomad versions in the future.
return fmt.Errorf("Unknown volume access mode: %v", vol.AccessMode)
return nil, fmt.Errorf("Unknown volume access mode: %v", vol.AccessMode)
}
return &csi.VolumeCapability{
AccessType: accessType,
AccessMode: accessMode,
VolumeMountOptions: &csi.VolumeMountOptions{
// GH-7007: Currently we have no way to provide these
},
}, nil
}
// stageVolume prepares a volume for use by allocations. When a plugin exposes
// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a
// given usage mode before the volume can be NodePublish-ed.
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) (string, error) {
logger := hclog.FromContext(ctx)
logger.Trace("Preparing volume staging environment")
existingMount, stagingPath, err := v.ensureStagingDir(vol)
if err != nil {
return "", err
}
logger.Trace("Volume staging environment", "pre-existing_mount", existingMount, "staging_path", stagingPath)
if existingMount {
logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath)
return stagingPath, nil
}
capability, err := capabilitiesFromVolume(vol)
if err != nil {
return "", err
}
// We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable)
......@@ -139,21 +182,48 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume)
// In the future, we can provide more useful error messages based on
// different types of error. For error documentation see:
// https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors
return v.plugin.NodeStageVolume(ctx,
return stagingPath, v.plugin.NodeStageVolume(ctx,
vol.ID,
nil, /* TODO: Get publishContext from Server */
stagingPath,
&csi.VolumeCapability{
AccessType: accessType,
AccessMode: accessMode,
VolumeMountOptions: &csi.VolumeMountOptions{
// GH-7007: Currently we have no way to provide these
},
},
capability,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
}
func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, stagingPath string) (*MountInfo, error) {
logger := hclog.FromContext(ctx)
preexistingMountForAlloc, targetPath, err := v.ensureAllocDir(vol, alloc)
if err != nil {
return nil, err
}
if preexistingMountForAlloc {
logger.Debug("Re-using existing published volume for allocation")
return &MountInfo{Source: targetPath}, nil
}
capabilities, err := capabilitiesFromVolume(vol)
if err != nil {
return nil, err
}
err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeID: vol.ID,
PublishContext: nil, // TODO: get publishcontext from server
StagingTargetPath: stagingPath,
TargetPath: targetPath,
VolumeCapability: capabilities,
},
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
return &MountInfo{Source: targetPath}, err
}
// MountVolume performs the steps required for using a given volume
......@@ -164,14 +234,17 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID)
ctx = hclog.WithContext(ctx, logger)
var stagingPath string
var err error
if v.requiresStaging {
err := v.stageVolume(ctx, vol)
stagingPath, err = v.stageVolume(ctx, vol)
if err != nil {
return nil, err
}
}
return nil, fmt.Errorf("Unimplemented")
return v.publishVolume(ctx, vol, alloc, stagingPath)
}
// unstageVolume is the inverse operation of `stageVolume` and must be called
......
......@@ -167,7 +167,7 @@ func TestVolumeManager_stageVolume(t *testing.T) {
manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true)
ctx := context.Background()
err := manager.stageVolume(ctx, tc.Volume)
_, err := manager.stageVolume(ctx, tc.Volume)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
......@@ -230,3 +230,63 @@ func TestVolumeManager_unstageVolume(t *testing.T) {
})
}
}
func TestVolumeManager_publishVolume(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
Allocation *structs.Allocation
Volume *structs.CSIVolume
PluginErr error
ExpectedErr error
ExpectedCSICallCount int64
}{
{
Name: "Returns an error when the plugin returns an error",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: errors.New("Some Unknown Error"),
ExpectedErr: errors.New("Some Unknown Error"),
ExpectedCSICallCount: 1,
},
{
Name: "Happy Path",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: nil,
ExpectedErr: nil,
ExpectedCSICallCount: 1,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tmpPath := tmpDir(t)
defer os.RemoveAll(tmpPath)
csiFake := &csifake.Client{}
csiFake.NextNodePublishVolumeErr = tc.PluginErr
manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true)
ctx := context.Background()
_, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, "")
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodePublishVolumeCallCount)
})
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment