Commit b7216425 authored by Tim Gross's avatar Tim Gross
Browse files

CSI: ensure we're using new snapshots after checkpoint

parent 9c3ea135
Showing with 19 additions and 7 deletions
+19 -7
......@@ -693,14 +693,18 @@ RELEASE_CLAIM:
func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("node unpublish", "vol", vol.ID)
store := v.srv.fsm.State()
// We need a new snapshot after each checkpoint
snap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}
// If the node has been GC'd or is down, we can't send it a node
// unpublish. We need to assume the node has unpublished at its
// end. If it hasn't, any controller unpublish will potentially
// hang or error and need to be retried.
if claim.NodeID != "" {
node, err := store.NodeByID(memdb.NewWatchSet(), claim.NodeID)
node, err := snap.NodeByID(memdb.NewWatchSet(), claim.NodeID)
if err != nil {
return err
}
......@@ -723,7 +727,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
// The RPC sent from the 'nomad node detach' command or GC won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node, all of which will be in PastClaims after denormalizing
vol, err := store.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
vol, err = snap.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
......@@ -793,10 +797,15 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
return nil
}
state := v.srv.fsm.State()
// We need a new snapshot after each checkpoint
snap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
plugin, err := state.CSIPluginByID(ws, vol.PluginID)
plugin, err := snap.CSIPluginByID(ws, vol.PluginID)
if err != nil {
return fmt.Errorf("could not query plugin: %v", err)
} else if plugin == nil {
......@@ -808,7 +817,7 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
return nil
}
vol, err = state.CSIVolumeDenormalize(ws, vol)
vol, err = snap.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
......
......@@ -626,7 +626,10 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Unpublish", req,
&structs.CSIVolumeUnpublishResponse{})
vol, volErr := state.CSIVolumeByID(nil, ns, volID)
snap, snapErr := state.Snapshot()
must.NoError(t, snapErr)
vol, volErr := snap.CSIVolumeByID(nil, ns, volID)
must.NoError(t, volErr)
must.NotNil(t, vol)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment