Unverified Commit f7d7aa0b authored by Travis Nielsen's avatar Travis Nielsen
Browse files

ceph: flex driver should not allow attach before detach


If a volume is being attached, it should be verified that the volume
is safe to attach. The volume was assumed to be safe to attach if it was
for the same pod. But this was assuming the pod of the same name would be
on the same node. This is true for pods created from deployments, but not
for pods that are part of a stateful set. A stateful set will maintain the
pod name even as the pod is failed over to a new node. Therefore, the fencing
much check if the pod is from the same node before allowing the attach to
continue. Otherwise, we need to wait for the volume to be detached from the
other node.
Signed-off-by: default avatarTravis Nielsen <tnielsen@redhat.com>
parent 5b8388ee
Showing with 23 additions and 10 deletions
+23 -10
......@@ -19,12 +19,13 @@ package flexvolume
import (
"fmt"
"github.com/rook/rook/pkg/util/display"
"os"
"path"
"path/filepath"
"strings"
"github.com/rook/rook/pkg/util/display"
"github.com/coreos/pkg/capnslog"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
"github.com/rook/rook/pkg/clusterd"
......@@ -142,15 +143,21 @@ func (c *Controller) Attach(attachOpts AttachOptions, devicePath *string) error
logger.Infof("volume attachment record %s/%s exists for pod: %s/%s", volumeattachObj.Namespace, volumeattachObj.Name, attachment.PodNamespace, attachment.PodName)
// Note this could return the reference of the pod who is requesting the attach if this pod have the same name as the pod in the attachment record.
allowAttach := false
pod, err := c.context.Clientset.CoreV1().Pods(attachment.PodNamespace).Get(attachment.PodName, metav1.GetOptions{})
if err != nil || (attachment.PodNamespace == attachOpts.PodNamespace && attachment.PodName == attachOpts.Pod) {
if err != nil && !errors.IsNotFound(err) {
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get pod CRD %s/%s. %+v", attachment.PodNamespace, attachment.PodName, err)
}
allowAttach = true
logger.Infof("volume attachment record %s/%s is orphaned. Updating record with new attachment information for pod %s/%s", volumeattachObj.Namespace, volumeattachObj.Name, attachOpts.PodNamespace, attachOpts.Pod)
// Attachment is orphaned. Update attachment record and proceed with attaching
}
if err == nil && (attachment.PodNamespace == attachOpts.PodNamespace && attachment.PodName == attachOpts.Pod && attachment.Node == node) {
allowAttach = true
logger.Infof("volume attachment record %s/%s is starting on the same node. Updating record with new attachment information for pod %s/%s", volumeattachObj.Namespace, volumeattachObj.Name, attachOpts.PodNamespace, attachOpts.Pod)
}
if allowAttach {
// Update attachment record and proceed with attaching
attachment.Node = node
attachment.MountDir = attachOpts.MountDir
attachment.PodNamespace = attachOpts.PodNamespace
......@@ -162,7 +169,7 @@ func (c *Controller) Attach(attachOpts AttachOptions, devicePath *string) error
return fmt.Errorf("failed to update volume CRD %s. %+v", crdName, err)
}
} else {
// Attachment is not orphaned. Original pod still exists. Dont attach.
// Attachment is not orphaned. Original pod still exists. Don't attach.
return fmt.Errorf("failed to attach volume %s for pod %s/%s. Volume is already attached by pod %s/%s. Status %+v",
crdName, attachOpts.PodNamespace, attachOpts.Pod, attachment.PodNamespace, attachment.PodName, pod.Status.Phase)
}
......
......@@ -503,10 +503,16 @@ func TestOrphanAttachOriginalPodNameSame(t *testing.T) {
volumeManager: &manager.FakeVolumeManager{},
}
// Attach should succeed and the stale volumeattachment record should be updated to reflect the new pod information
// Attach should fail because the pod is on a different node
devicePath := ""
err = controller.Attach(opts, &devicePath)
assert.Nil(t, err)
assert.Error(t, err)
// Attach should succeed and the stale volumeattachment record should be updated to reflect the new pod information
// since the pod is restarting on the same node
os.Setenv(k8sutil.NodeNameEnvVar, "otherNode")
err = controller.Attach(opts, &devicePath)
assert.NoError(t, err)
volAtt, err := context.RookClientset.RookV1alpha2().Volumes("rook-system").Get("pvc-123", metav1.GetOptions{})
assert.Nil(t, err)
......@@ -518,7 +524,7 @@ func TestOrphanAttachOriginalPodNameSame(t *testing.T) {
PodName: opts.Pod,
MountDir: opts.MountDir,
ReadOnly: false,
Node: "node1",
Node: "otherNode",
}, volAtt.Attachments,
), "Volume crd does not contain expected attachment")
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment