Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
小 白蛋
Nomad
Commits
86359d2b
Commit
86359d2b
authored
7 years ago
by
Alex Dadgar
Browse files
Options
Download
Email Patches
Plain Diff
test stream framer
parent
0a282f25
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
client/lib/streamframer/framer_test.go
+38
-182
client/lib/streamframer/framer_test.go
with
38 additions
and
182 deletions
+38
-182
client/lib/streamframer/framer_test.go
+
38
-
182
View file @
86359d2b
package
framer
import
(
"io"
)
type
WriteCloseChecker
struct
{
io
.
WriteCloser
Closed
bool
}
"bytes"
"reflect"
"strconv"
"testing"
"time"
func
(
w
*
WriteCloseChecker
)
Close
()
error
{
w
.
Closed
=
true
return
w
.
WriteCloser
.
Close
()
}
"github.com/hashicorp/nomad/testutil"
)
/*
// This test checks, that even if the frame size has not been hit, a flush will
// periodically occur.
func
TestStreamFramer_Flush
(
t
*
testing
.
T
)
{
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
frames
:=
make
(
chan
*
StreamFrame
,
10
)
hRate
,
bWindow
:=
100
*
time
.
Millisecond
,
100
*
time
.
Millisecond
sf := NewStreamFramer(
w
ra
ppedW, false
, hRate, bWindow, 100)
sf
:=
NewStreamFramer
(
f
ra
mes
,
hRate
,
bWindow
,
100
)
sf
.
Run
()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
f
:=
"foo"
fe
:=
"bar"
d
:=
[]
byte
{
0xa
}
...
...
@@ -37,10 +28,7 @@ func TestStreamFramer_Flush(t *testing.T) {
resultCh
:=
make
(
chan
struct
{})
go
func
()
{
for
{
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
frame
:=
<-
frames
if
frame
.
IsHeartbeat
()
{
continue
...
...
@@ -65,10 +53,8 @@ func TestStreamFramer_Flush(t *testing.T) {
t
.
Fatalf
(
"failed to flush"
)
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
// Shutdown
sf
.
Destroy
()
select
{
case
<-
sf
.
ExitCh
()
:
...
...
@@ -76,25 +62,21 @@ func TestStreamFramer_Flush(t *testing.T) {
t
.
Fatalf
(
"exit channel should close"
)
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
if
_
,
ok
:=
<-
frames
;
ok
{
t
.
Fatal
(
"out channel should be closed"
)
}
}
// This test checks that frames will be batched till the frame size is hit (in
// the case that is before the flush).
func
TestStreamFramer_Batch
(
t
*
testing
.
T
)
{
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate
,
bWindow
:=
100
*
time
.
Millisecond
,
500
*
time
.
Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
sf.Run()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
// Create the stream framer
frames
:=
make
(
chan
*
StreamFrame
,
10
)
sf
:=
NewStreamFramer
(
frames
,
hRate
,
bWindow
,
3
)
sf
.
Run
()
f
:=
"foo"
fe
:=
"bar"
...
...
@@ -105,11 +87,7 @@ func TestStreamFramer_Batch(t *testing.T) {
resultCh
:=
make
(
chan
struct
{})
go
func
()
{
for
{
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
frame
:=
<-
frames
if
frame
.
IsHeartbeat
()
{
continue
}
...
...
@@ -145,10 +123,8 @@ func TestStreamFramer_Batch(t *testing.T) {
t
.
Fatalf
(
"Did not receive data after batch size reached"
)
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
// Shutdown
sf
.
Destroy
()
select
{
case
<-
sf
.
ExitCh
()
:
...
...
@@ -156,32 +132,23 @@ func TestStreamFramer_Batch(t *testing.T) {
t
.
Fatalf
(
"exit channel should close"
)
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
if
_
,
ok
:=
<-
frames
;
ok
{
t
.
Fatal
(
"out channel should be closed"
)
}
}
func
TestStreamFramer_Heartbeat
(
t
*
testing
.
T
)
{
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
frames
:=
make
(
chan
*
StreamFrame
,
10
)
hRate
,
bWindow
:=
100
*
time
.
Millisecond
,
100
*
time
.
Millisecond
sf := NewStreamFramer(
w
ra
ppedW, false
, hRate, bWindow, 100)
sf
:=
NewStreamFramer
(
f
ra
mes
,
hRate
,
bWindow
,
100
)
sf
.
Run
()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
// Start the reader
resultCh
:=
make
(
chan
struct
{})
go
func
()
{
for
{
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
frame
:=
<-
frames
if
frame
.
IsHeartbeat
()
{
resultCh
<-
struct
{}{}
return
...
...
@@ -195,10 +162,8 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
t
.
Fatalf
(
"failed to heartbeat"
)
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
// Shutdown
sf
.
Destroy
()
select
{
case
<-
sf
.
ExitCh
()
:
...
...
@@ -206,25 +171,20 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
t
.
Fatalf
(
"exit channel should close"
)
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
if
_
,
ok
:=
<-
frames
;
ok
{
t
.
Fatal
(
"out channel should be closed"
)
}
}
// This test checks that frames are received in order
func
TestStreamFramer_Order
(
t
*
testing
.
T
)
{
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate
,
bWindow
:=
100
*
time
.
Millisecond
,
10
*
time
.
Millisecond
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
// Create the stream framer
frames
:=
make
(
chan
*
StreamFrame
,
10
)
sf
:=
NewStreamFramer
(
frames
,
hRate
,
bWindow
,
10
)
sf
.
Run
()
// Create a decoder
dec := codec.NewDecoder(r, structs.JsonHandle)
files
:=
[]
string
{
"1"
,
"2"
,
"3"
,
"4"
,
"5"
}
input
:=
bytes
.
NewBuffer
(
make
([]
byte
,
0
,
100000
))
for
i
:=
0
;
i
<=
1000
;
i
++
{
...
...
@@ -242,11 +202,7 @@ func TestStreamFramer_Order(t *testing.T) {
resultCh
:=
make
(
chan
struct
{})
go
func
()
{
for
{
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
t.Fatalf("failed to decode")
}
frame
:=
<-
frames
if
frame
.
IsHeartbeat
()
{
continue
}
...
...
@@ -288,10 +244,8 @@ func TestStreamFramer_Order(t *testing.T) {
}
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
// Shutdown
sf
.
Destroy
()
select
{
case
<-
sf
.
ExitCh
()
:
...
...
@@ -299,105 +253,7 @@ func TestStreamFramer_Order(t *testing.T) {
t
.
Fatalf
(
"exit channel should close"
)
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}
// This test checks that frames are received in order
func TestStreamFramer_Order_PlainText(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
sf.Run()
files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}
expected := bytes.NewBuffer(make([]byte, 0, 100000))
for range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))
// Start the reader
resultCh := make(chan struct{})
go func() {
OUTER:
for {
if _, err := receivedBuf.ReadFrom(r); err != nil {
if strings.Contains(err.Error(), "closed pipe") {
resultCh <- struct{}{}
return
}
t.Fatalf("bad read: %v", err)
}
if expected.Len() != receivedBuf.Len() {
continue
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
continue OUTER
}
}
resultCh <- struct{}{}
return
}
}()
// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}
if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}
// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if expected.Len() != receivedBuf.Len() {
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
}
}
}
// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}
sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
if
_
,
ok
:=
<-
frames
;
ok
{
t
.
Fatal
(
"out channel should be closed"
)
}
}
*/
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment
Menu
Projects
Groups
Snippets
Help