Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
小 白蛋
Nomad
Commits
1345679b
Commit
1345679b
authored
5 years ago
by
Mahmood Ali
Browse files
Options
Download
Email Patches
Plain Diff
remove unused dropButLastChannel
parent
2ccade33
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
nomad/util.go
+0
-93
nomad/util.go
nomad/util_test.go
+0
-174
nomad/util_test.go
with
0 additions
and
267 deletions
+0
-267
nomad/util.go
+
0
-
93
View file @
1345679b
...
...
@@ -301,96 +301,3 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {
return
alloc
,
nil
}
// dropButLastChannel returns a channel that drops all but last value from sourceCh.
//
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
//
// This function propagates values to result quickly and drops intermediate messages
// in best effort basis. Golang scheduler may delay delivery or result in extra
// deliveries.
//
// Consider this function for example:
//
// ```
// src := make(chan bool)
// dst := dropButLastChannel(src, nil)
//
// go func() {
// src <- true
// src <- false
// }()
//
// // v can be `true` here but is very unlikely
// v := <-dst
// ```
//
func
dropButLastChannel
(
sourceCh
<-
chan
bool
,
shutdownCh
<-
chan
struct
{})
chan
bool
{
// buffer the most recent result
dst
:=
make
(
chan
bool
)
go
func
()
{
// last value received
lv
:=
false
// ok source was closed
ok
:=
false
// received message since last delivery to destination
messageReceived
:=
false
DEQUE_SOURCE
:
// wait for first message
select
{
case
lv
,
ok
=
<-
sourceCh
:
if
!
ok
{
goto
SOURCE_CLOSED
}
messageReceived
=
true
goto
ENQUEUE_DST
case
<-
shutdownCh
:
return
}
ENQUEUE_DST
:
// prioritize draining source first dequeue without blocking
for
{
select
{
case
lv
,
ok
=
<-
sourceCh
:
if
!
ok
{
goto
SOURCE_CLOSED
}
messageReceived
=
true
default
:
break
ENQUEUE_DST
}
}
// attempt to enqueue but keep monitoring source channel
select
{
case
lv
,
ok
=
<-
sourceCh
:
if
!
ok
{
goto
SOURCE_CLOSED
}
messageReceived
=
true
goto
ENQUEUE_DST
case
dst
<-
lv
:
messageReceived
=
false
// enqueued value; back to dequeing from source
goto
DEQUE_SOURCE
case
<-
shutdownCh
:
return
}
SOURCE_CLOSED
:
if
messageReceived
{
select
{
case
dst
<-
lv
:
case
<-
shutdownCh
:
return
}
}
close
(
dst
)
}()
return
dst
}
This diff is collapsed.
Click to expand it.
nomad/util_test.go
+
0
-
174
View file @
1345679b
...
...
@@ -4,7 +4,6 @@ import (
"net"
"reflect"
"testing"
"time"
version
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
...
...
@@ -259,176 +258,3 @@ func TestMaxUint64(t *testing.T) {
t
.
Fatalf
(
"bad"
)
}
}
func
TestDropButLastChannelDropsValues
(
t
*
testing
.
T
)
{
sourceCh
:=
make
(
chan
bool
)
shutdownCh
:=
make
(
chan
struct
{})
defer
close
(
shutdownCh
)
dstCh
:=
dropButLastChannel
(
sourceCh
,
shutdownCh
)
// timeout duration for any channel propagation delay
timeoutDuration
:=
5
*
time
.
Millisecond
// test that dstCh doesn't emit anything initially
select
{
case
<-
dstCh
:
require
.
Fail
(
t
,
"received a message unexpectedly"
)
case
<-
time
.
After
(
timeoutDuration
)
:
// yay no message - it could have been a default: but
// checking for goroutine effect
}
sourceCh
<-
false
select
{
case
v
:=
<-
dstCh
:
require
.
False
(
t
,
v
,
"unexpected value from dstCh Ch"
)
case
<-
time
.
After
(
timeoutDuration
)
:
require
.
Fail
(
t
,
"timed out waiting for source->dstCh propagation"
)
}
// channel is drained now
select
{
case
v
:=
<-
dstCh
:
require
.
Failf
(
t
,
"received a message unexpectedly"
,
"value: %v"
,
v
)
case
<-
time
.
After
(
timeoutDuration
)
:
// yay no message - it could have been a default: but
// checking for goroutine effect
}
// now enqueue many messages and ensure only last one is received
// enqueueing should be fast!
sourceCh
<-
false
sourceCh
<-
false
sourceCh
<-
false
sourceCh
<-
false
sourceCh
<-
true
// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
// this select before the implementation goroutine dequeues last value.
//
// However, never got it to fail in test - so leaving it now to see if it ever fails;
// and if/when test fails, we can learn of how much of an issue it is and adjust
select
{
case
v
:=
<-
dstCh
:
require
.
True
(
t
,
v
,
"unexpected value from dstCh Ch"
)
case
<-
time
.
After
(
timeoutDuration
)
:
require
.
Fail
(
t
,
"timed out waiting for source->dstCh propagation"
)
}
sourceCh
<-
true
sourceCh
<-
true
sourceCh
<-
true
sourceCh
<-
true
sourceCh
<-
true
sourceCh
<-
false
select
{
case
v
:=
<-
dstCh
:
require
.
False
(
t
,
v
,
"unexpected value from dstCh Ch"
)
case
<-
time
.
After
(
timeoutDuration
)
:
require
.
Fail
(
t
,
"timed out waiting for source->dstCh propagation"
)
}
}
// TestDropButLastChannel_DeliversMessages asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages.
// On tight loop, receivers may get some intermediary messages.
func
TestDropButLastChannel_DeliversMessages
(
t
*
testing
.
T
)
{
sourceCh
:=
make
(
chan
bool
)
shutdownCh
:=
make
(
chan
struct
{})
defer
close
(
shutdownCh
)
dstCh
:=
dropButLastChannel
(
sourceCh
,
shutdownCh
)
// timeout duration for any channel propagation delay
timeoutDuration
:=
5
*
time
.
Millisecond
sentMessages
:=
100
go
func
()
{
for
i
:=
0
;
i
<
sentMessages
-
1
;
i
++
{
sourceCh
<-
true
}
sourceCh
<-
false
}()
receivedTrue
,
receivedFalse
:=
0
,
0
var
lastReceived
*
bool
RECEIVE_LOOP
:
for
{
select
{
case
v
:=
<-
dstCh
:
lastReceived
=
&
v
if
v
{
receivedTrue
++
}
else
{
receivedFalse
++
}
case
<-
time
.
After
(
timeoutDuration
)
:
break
RECEIVE_LOOP
}
}
t
.
Logf
(
"receiver got %v out %v true messages, and %v out of %v false messages"
,
receivedTrue
,
sentMessages
-
1
,
receivedFalse
,
1
)
require
.
NotNil
(
t
,
lastReceived
)
require
.
False
(
t
,
*
lastReceived
)
require
.
Equal
(
t
,
1
,
receivedFalse
)
require
.
LessOrEqual
(
t
,
receivedTrue
,
sentMessages
-
1
)
}
// TestDropButLastChannel_DeliversMessages_Close asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages, even with a closed signal.
func
TestDropButLastChannel_DeliversMessages_Close
(
t
*
testing
.
T
)
{
sourceCh
:=
make
(
chan
bool
)
shutdownCh
:=
make
(
chan
struct
{})
defer
close
(
shutdownCh
)
dstCh
:=
dropButLastChannel
(
sourceCh
,
shutdownCh
)
// timeout duration for any channel propagation delay
timeoutDuration
:=
5
*
time
.
Millisecond
sentMessages
:=
100
go
func
()
{
for
i
:=
0
;
i
<
sentMessages
-
1
;
i
++
{
sourceCh
<-
true
}
sourceCh
<-
false
close
(
sourceCh
)
}()
receivedTrue
,
receivedFalse
:=
0
,
0
var
lastReceived
*
bool
RECEIVE_LOOP
:
for
{
select
{
case
v
,
ok
:=
<-
dstCh
:
if
!
ok
{
break
RECEIVE_LOOP
}
lastReceived
=
&
v
if
v
{
receivedTrue
++
}
else
{
receivedFalse
++
}
case
<-
time
.
After
(
timeoutDuration
)
:
require
.
Fail
(
t
,
"timed out while waiting for messages"
)
}
}
t
.
Logf
(
"receiver got %v out %v true messages, and %v out of %v false messages"
,
receivedTrue
,
sentMessages
-
1
,
receivedFalse
,
1
)
require
.
NotNil
(
t
,
lastReceived
)
require
.
False
(
t
,
*
lastReceived
)
require
.
Equal
(
t
,
1
,
receivedFalse
)
require
.
LessOrEqual
(
t
,
receivedTrue
,
sentMessages
-
1
)
}
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment