Unverified Commit d2aab5d5 authored by Tim Gross's avatar Tim Gross Committed by GitHub
Browse files

fix data race in dynamic plugin registry tests (#12554)

These tests have a data race where the test assertion is reading a
value that's being set in the `listenFunc` goroutines that are
subscribing to registry update events. Move the assertion into the
subscribing goroutine to remove the race. This bug was discovered
in #12098 but does not impact production Nomad code.
parent 6e0e423b
Showing with 67 additions and 61 deletions
+67 -61
......@@ -16,26 +16,36 @@ func TestPluginEventBroadcaster_SendsMessagesToAllClients(t *testing.T) {
b := newPluginEventBroadcaster()
defer close(b.stopCh)
var rcv1, rcv2 bool
var rcv1, rcv2 bool
ch1 := b.subscribe()
ch2 := b.subscribe()
listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
select {
case <-ch:
*updateBool = true
}
}
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go listenFunc(ch1, &rcv1)
go listenFunc(ch2, &rcv2)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive event on both subscriptions before timeout")
return
case <-ch1:
rcv1 = true
case <-ch2:
rcv2 = true
}
if rcv1 && rcv2 {
return
}
}
}()
b.broadcast(&PluginUpdateEvent{})
require.Eventually(t, func() bool {
return rcv1 == true && rcv2 == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}
func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {
......@@ -43,28 +53,30 @@ func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {
b := newPluginEventBroadcaster()
defer close(b.stopCh)
var rcv1 bool
ch1 := b.subscribe()
listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
select {
case e := <-ch:
if e == nil {
*updateBool = true
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive unsubscribe event on subscription before timeout")
return
case <-ch1:
return // done!
}
}
}
go listenFunc(ch1, &rcv1)
}()
b.unsubscribe(ch1)
b.broadcast(&PluginUpdateEvent{})
require.Eventually(t, func() bool {
return rcv1 == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}
func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
......@@ -72,26 +84,27 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
r := NewRegistry(nil, nil)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ch := r.PluginsUpdatedCh(ctx, "csi")
receivedRegistrationEvent := false
listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
select {
case e := <-ch:
if e == nil {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive registration event on subscription before timeout")
return
}
if e.EventType == EventTypeRegistered {
*updateBool = true
case e := <-ch:
if e != nil && e.EventType == EventTypeRegistered {
return
}
}
}
}
go listenFunc(ch, &receivedRegistrationEvent)
}()
err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
......@@ -100,10 +113,7 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
})
require.NoError(t, err)
require.Eventually(t, func() bool {
return receivedRegistrationEvent == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}
func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
......@@ -111,28 +121,27 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
r := NewRegistry(nil, nil)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ch := r.PluginsUpdatedCh(ctx, "csi")
receivedDeregistrationEvent := false
listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive deregistration event on subscription before timeout")
return
case e := <-ch:
if e == nil {
if e != nil && e.EventType == EventTypeDeregistered {
return
}
if e.EventType == EventTypeDeregistered {
*updateBool = true
}
}
}
}
go listenFunc(ch, &receivedDeregistrationEvent)
}()
err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
......@@ -144,10 +153,7 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0")
require.NoError(t, err)
require.Eventually(t, func() bool {
return receivedDeregistrationEvent == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}
func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment