Unverified Commit 45c87280 authored by Nick Ethier's avatar Nick Ethier
Browse files

plugin/drivers: rework eventer and change naming stream -> consumer

parent e4f80872
Branches unavailable
No related merge requests found
Showing with 103 additions and 48 deletions
+103 -48
package utils
import (
"fmt"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/plugins/drivers"
"golang.org/x/net/context"
)
var (
//DefaultSendEventTimeout is the timeout used when publishing events to consumers
// DefaultSendEventTimeout is the timeout used when publishing events to consumers
DefaultSendEventTimeout = 2 * time.Second
)
// Eventer is a utility to control broadcast of TaskEvents to multiple consumers.
// It also implements the TaskStats func in the DriverPlugin interface so that
// It also implements the TaskEvents func in the DriverPlugin interface so that
// it can be embedded in a implementing driver struct.
type Eventer struct {
sync.RWMutex
consumersLock sync.RWMutex
// events is a channel were events to be broadcasted are sent
events chan *drivers.TaskEvent
// streamers is a slice of consumers to broadcast events to
// access is gaurded by RWMutex
streamers []*eventStreamer
// consumers is a slice of eventConsumers to broadcast events to.
// access is gaurded by consumersLock RWMutex
consumers []*eventConsumer
// stop chan to allow control of event loop shutdown
stop chan struct{}
// ctx to allow control of event loop shutdown
ctx context.Context
// done tracks if the event loop has stopped due to the ctx being done
done bool
logger hclog.Logger
}
// NewEventer returns an Eventer with a running event loop that can be stopped
// by closing the given stop channel
func NewEventer(stop chan struct{}) *Eventer {
func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
e := &Eventer{
events: make(chan *drivers.TaskEvent),
stop: stop,
ctx: ctx,
logger: logger,
}
go e.eventLoop()
return e
......@@ -46,56 +54,83 @@ func NewEventer(stop chan struct{}) *Eventer {
func (e *Eventer) eventLoop() {
for {
select {
case <-e.stop:
for _, stream := range e.streamers {
close(stream.ch)
}
case <-e.ctx.Done():
e.done = true
close(e.events)
return
case event := <-e.events:
e.RLock()
for _, stream := range e.streamers {
stream.send(event)
e.consumersLock.RLock()
for _, consumer := range e.consumers {
consumer.send(event)
}
e.RUnlock()
e.consumersLock.RUnlock()
}
}
}
type eventStreamer struct {
type eventConsumer struct {
timeout time.Duration
ctx context.Context
ch chan *drivers.TaskEvent
logger hclog.Logger
}
func (s *eventStreamer) send(event *drivers.TaskEvent) {
func (c *eventConsumer) send(event *drivers.TaskEvent) {
select {
case <-time.After(s.timeout):
case <-s.ctx.Done():
case s.ch <- event:
case <-time.After(c.timeout):
c.logger.Warn("timeout sending event", "task_id", event.TaskID, "message", event.Message)
case <-c.ctx.Done():
case c.ch <- event:
}
}
func (e *Eventer) newStream(ctx context.Context) <-chan *drivers.TaskEvent {
e.Lock()
defer e.Unlock()
func (e *Eventer) newConsumer(ctx context.Context) *eventConsumer {
e.consumersLock.Lock()
defer e.consumersLock.Unlock()
stream := &eventStreamer{
consumer := &eventConsumer{
ch: make(chan *drivers.TaskEvent),
ctx: ctx,
timeout: DefaultSendEventTimeout,
logger: e.logger,
}
e.streamers = append(e.streamers, stream)
e.consumers = append(e.consumers, consumer)
return stream.ch
return consumer
}
// TaskEvents is an implementation of the DriverPlugin.TaskEvents function
func (e *Eventer) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
stream := e.newStream(ctx)
return stream, nil
consumer := e.newConsumer(ctx)
go e.handleConsumer(consumer)
return consumer.ch, nil
}
func (e *Eventer) handleConsumer(consumer *eventConsumer) {
// wait for consumer or eventer ctx to finish
select {
case <-consumer.ctx.Done():
case <-e.ctx.Done():
}
e.consumersLock.Lock()
defer e.consumersLock.Unlock()
defer close(consumer.ch)
filtered := e.consumers[:0]
for _, c := range e.consumers {
if c != consumer {
filtered = append(filtered, c)
}
}
e.consumers = filtered
}
// EmitEvent can be used to broadcast a new event
func (e *Eventer) EmitEvent(event *drivers.TaskEvent) {
func (e *Eventer) EmitEvent(event *drivers.TaskEvent) error {
if e.done {
return fmt.Errorf("error sending event, context canceled")
}
e.events <- event
return nil
}
......@@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/stretchr/testify/require"
)
......@@ -14,8 +15,8 @@ func TestEventer(t *testing.T) {
t.Parallel()
require := require.New(t)
stop := make(chan struct{})
e := NewEventer(stop)
ctx, cancel := context.WithCancel(context.Background())
e := NewEventer(ctx, testlog.HCLogger(t))
events := []*drivers.TaskEvent{
{
......@@ -32,9 +33,11 @@ func TestEventer(t *testing.T) {
},
}
consumer1, err := e.TaskEvents(context.Background())
ctx1, cancel1 := context.WithCancel(context.Background())
consumer1, err := e.TaskEvents(ctx1)
require.NoError(err)
consumer2, err := e.TaskEvents(context.Background())
ctx2 := (context.Background())
consumer2, err := e.TaskEvents(ctx2)
require.NoError(err)
var buffer1, buffer2 []*drivers.TaskEvent
......@@ -42,31 +45,48 @@ func TestEventer(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
for {
event, ok := <-consumer1
if !ok {
return
}
var i int
for event := range consumer1 {
i++
buffer1 = append(buffer1, event)
if i == 3 {
break
}
}
}()
go func() {
defer wg.Done()
for {
event, ok := <-consumer2
if !ok {
return
}
var i int
for event := range consumer2 {
i++
buffer2 = append(buffer2, event)
if i == 3 {
break
}
}
}()
for _, event := range events {
e.EmitEvent(event)
require.NoError(e.EmitEvent(event))
}
close(stop)
wg.Wait()
require.Exactly(events, buffer1)
require.Exactly(events, buffer2)
cancel1()
time.Sleep(100 * time.Millisecond)
require.Equal(1, len(e.consumers))
require.NoError(e.EmitEvent(&drivers.TaskEvent{}))
ev, ok := <-consumer1
require.Nil(ev)
require.False(ok)
ev, ok = <-consumer2
require.NotNil(ev)
require.True(ok)
cancel()
time.Sleep(100 * time.Millisecond)
require.Zero(len(e.consumers))
require.Error(e.EmitEvent(&drivers.TaskEvent{}))
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment