Commit debffe24 authored by Tim Gross's avatar Tim Gross
Browse files

csi: update leader's ACL in volumewatcher (#11891)

The volumewatcher that runs on the leader needs to make RPC calls
rather than writing to raft (as we do in the deploymentwatcher)
because the unpublish workflow needs to make RPC calls to the
clients. This requires that the volumewatcher has access to the
leader's ACL token.

But when leadership transitions, the new leader creates a new leader
ACL token. This ACL token needs to be passed into the volumewatcher
when we enable it, otherwise the volumewatcher can find itself with a
stale token.
parent 143fb90e
Showing with 16 additions and 12 deletions
+16 -12
```release-note:bug
csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions.
```
......@@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.nodeDrainer.SetEnabled(true, s.State())
// Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State())
s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())
// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
......@@ -1074,7 +1074,7 @@ func (s *Server) revokeLeadership() error {
s.nodeDrainer.SetEnabled(false, nil)
// Disable the volume watcher
s.volumeWatcher.SetEnabled(false, nil)
s.volumeWatcher.SetEnabled(false, nil, "")
// Disable any enterprise systems required.
if err := s.revokeEnterpriseLeadership(); err != nil {
......
......@@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W
// SetEnabled is used to control if the watcher is enabled. The
// watcher should only be enabled on the active leader. When being
// enabled the state is passed in as it is no longer valid once a
// leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
// enabled the state and leader's ACL is passed in as it is no longer
// valid once a leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) {
w.wlock.Lock()
defer w.wlock.Unlock()
wasEnabled := w.enabled
w.enabled = enabled
w.leaderAcl = leaderAcl
if state != nil {
w.state = state
......
......@@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
......@@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
watcher.SetEnabled(false, nil)
watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers))
}
......@@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
......@@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// watches for that change will fire on the new watcher
// step-down (this is sync)
watcher.SetEnabled(false, nil)
watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers))
// allocation is now invalid
......@@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Eventually(func() bool {
watcher.wlock.RLock()
......@@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers))
plugin := mock.CSIPlugin()
......@@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers))
plugin := mock.CSIPlugin()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment