Unverified Commit 68a9c4c6 authored by Hchen's avatar Hchen Committed by GitHub
Browse files

Merge pull request #84 from cloudwego/release/v0.1.0

chore: release v0.1.0
parents 3304c59c 350d1ece
Showing with 479 additions and 154 deletions
+479 -154
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]
**Smartphone (please complete the following information):**
- Device: [e.g. iPhone6]
- OS: [e.g. iOS8.1]
- Browser [e.g. stock browser, safari]
- Version [e.g. 22]
**Additional context**
Add any other context about the problem here.
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
......@@ -26,7 +26,7 @@ We developed the RPC framework [KiteX][KiteX] and HTTP
framework [Hertz][Hertz] (to be open sourced) based
on [Netpoll][Netpoll], both with industry-leading performance.
[Examples][Netpoll-Benchmark] show how to build RPC client and server
[Examples][netpoll-benchmark] show how to build RPC client and server
using [Netpoll][Netpoll].
For more information, please refer to [Document](#document).
......@@ -56,49 +56,19 @@ For more information, please refer to [Document](#document).
# Performance
Benchmark is not a digital game, it should meet the requirements of industrial use first. In the RPC scenario,
concurrent calls and waiting timeout are necessary support items.
Benchmark should meet the requirements of industrial use.
In the RPC scenario, concurrency and timeout are necessary support items.
Therefore, we set that the benchmark should meet the following conditions:
We provide the [netpoll-benchmark][netpoll-benchmark] project to track and compare
the performance of [Netpoll][Netpoll] and other frameworks under different conditions for reference.
1. Support concurrent calls, support timeout(1s)
2. Use protocol: header(4 bytes) indicates the total length of payload
More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmark][hertz-benchmark] (open source soon).
Similar repositories such as [net][net]
, [evio][evio], [gnet][gnet]. We compared their performance
through [Benchmarks][Benchmarks], as shown below.
For more benchmark reference [Netpoll-Benchmark][Netpoll-Benchmark]
, [KiteX-Benchmark][KiteX-Benchmark] and [Hertz-Benchmark][Hertz-Benchmark] .
### Environment
* CPU: Intel(R) Xeon(R) Gold 5118 CPU @ 2.30GHz, 4 cores
* Memory: 8GB
* OS: Debian 5.4.56.bsk.1-amd64 x86_64 GNU/Linux
* Go: 1.15.4
### Concurrent Performance (Echo 1KB)
![image](docs/images/c_tp99.png)
![image](docs/images/c_qps.png)
### Transport Performance (concurrent=100)
![image](docs/images/s_tp99.png)
![image](docs/images/s_qps.png)
### Benchmark Conclusion
Compared with [net][net]
, [Netpoll][Netpoll] latency about 34% and qps about 110%
(continue to pressurize, net latency is too high to reference)
# Document
# Reference
* [Official Website](https://www.cloudwego.io)
* [Getting Started](docs/guide/guide_en.md)
* [Design](docs/reference/design_en.md)
* [Change Log](docs/reference/change_log.md)
* [Why DATA RACE](docs/reference/explain.md)
[Netpoll]: https://github.com/cloudwego/netpoll
......@@ -110,10 +80,9 @@ Compared with [net][net]
[KiteX]: https://github.com/cloudwego/kitex
[Hertz]: https://github.com/cloudwego/hertz
[Benchmarks]: https://github.com/cloudwego/netpoll-benchmark
[Netpoll-Benchmark]: https://github.com/cloudwego/netpoll-benchmark
[KiteX-Benchmark]: https://github.com/cloudwego/kitex
[Hertz-Benchmark]: https://github.com/cloudwego/hertz
[netpoll-benchmark]: https://github.com/cloudwego/netpoll-benchmark
[kitex-benchmark]: https://github.com/cloudwego/kitex
[hertz-benchmark]: https://github.com/cloudwego/hertz
[ByteDance]: https://www.bytedance.com
[Redis]: https://redis.io
......
......@@ -21,7 +21,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
基于 [Netpoll][Netpoll] 开发的 RPC 框架 [KiteX][KiteX] 和 HTTP
框架 [Hertz][Hertz] (即将开源),性能均业界领先。
[范例][Netpoll-Benchmark] 展示了如何使用 [Netpoll][Netpoll]
[范例][netpoll-benchmark] 展示了如何使用 [Netpoll][Netpoll]
构建 RPC Client 和 Server。
更多信息请参阅 [文档](#文档)
......@@ -51,46 +51,17 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
# 性能
性能测试并非数字游戏,首先应满足工业级使用要求,在 RPC 场景下,并发请求、等待超时是必要的支持项。
性能测试应满足工业级使用要求,在 RPC 场景下,并发请求、等待超时是必要的支持项。
因此我们设定,性能测试应满足如下条件:
我们提供了 [netpoll-benchmark][netpoll-benchmark] 项目用来长期追踪和比较 [Netpoll][Netpoll] 与其他框架在不同情况下的性能数据以供参考。
1. 支持并发请求, 支持超时(1s)
2. 使用协议: header(4 byte) 表明总长
更多测试参考 [kitex-benchmark][kitex-benchmark][hertz-benchmark][hertz-benchmark] (即将开源)
对比项目为 [net][net], [evio][evio]
, [gnet][gnet] ,我们通过 [测试代码][Benchmarks] 比较了它们的性能。
更多测试参考 [Netpoll-Benchmark][Netpoll-Benchmark]
, [KiteX-Benchmark][KiteX-Benchmark][Hertz-Benchmark][Hertz-Benchmark]
### 测试环境
* CPU: Intel(R) Xeon(R) Gold 5118 CPU @ 2.30GHz, 4 cores
* Memory: 8GB
* OS: Debian 5.4.56.bsk.1-amd64 x86_64 GNU/Linux
* Go: 1.15.4
### 并发表现 (echo 1KB)
![image](docs/images/c_tp99.png)
![image](docs/images/c_qps.png)
### 传输表现 (并发 100)
![image](docs/images/s_tp99.png)
![image](docs/images/s_qps.png)
### 测试结论
相比 [net][net][Netpoll][Netpoll] 延迟约 34%,qps
约 110%(继续加压 net 延迟过高,数据失真)
# 文档
# 参考
* [官方网站](https://www.cloudwego.io)
* [使用文档](docs/guide/guide_cn.md)
* [设计文档](docs/reference/design_cn.md)
* [Change Log](docs/reference/change_log.md)
* [DATA RACE 说明](docs/reference/explain.md)
[Netpoll]: https://github.com/cloudwego/netpoll
......@@ -102,10 +73,9 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
[KiteX]: https://github.com/cloudwego/kitex
[Hertz]: https://github.com/cloudwego/hertz
[Benchmarks]: https://github.com/cloudwego/netpoll-benchmark
[Netpoll-Benchmark]: https://github.com/cloudwego/netpoll-benchmark
[KiteX-Benchmark]: https://github.com/cloudwego/kitex
[Hertz-Benchmark]: https://github.com/cloudwego/hertz
[netpoll-benchmark]: https://github.com/cloudwego/netpoll-benchmark
[kitex-benchmark]: https://github.com/cloudwego/kitex
[hertz-benchmark]: https://github.com/cloudwego/hertz
[ByteDance]: https://www.bytedance.com
[Redis]: https://redis.io
......
......@@ -41,6 +41,8 @@ type connection struct {
inputBarrier *barrier
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
}
var _ Connection = &connection{}
......@@ -106,6 +108,18 @@ func (c *connection) Skip(n int) (err error) {
// Release implements Connection.
func (c *connection) Release() (err error) {
// Check inputBuffer length first to reduce contention in mux situation.
if c.inputBuffer.Len() == 0 && c.lock(reading) {
// Double check length to calculate the maxSize
if c.inputBuffer.Len() == 0 {
maxSize := c.inputBuffer.calcMaxSize()
if maxSize > c.maxSize {
c.maxSize = maxSize
}
c.inputBuffer.resetTail(c.maxSize)
}
c.unlock(reading)
}
return c.inputBuffer.Release()
}
......@@ -165,12 +179,12 @@ func (c *connection) MallocLen() (length int) {
// If empty, it will call syscall.Write to send data directly,
// otherwise the buffer will be sent asynchronously by the epoll trigger.
func (c *connection) Flush() error {
if c.IsActive() && c.lock(outputBuffer) {
c.outputBuffer.Flush()
c.unlock(outputBuffer)
return c.flush()
if !c.lock(flushing) {
return Exception(ErrConnClosed, "when flush")
}
return Exception(ErrConnClosed, "when flush")
defer c.unlock(flushing)
c.outputBuffer.Flush()
return c.flush()
}
// MallocAck implements Connection.
......@@ -179,7 +193,7 @@ func (c *connection) MallocAck(n int) (err error) {
}
// Append implements Connection.
func (c *connection) Append(w Writer) (n int, err error) {
func (c *connection) Append(w Writer) (err error) {
return c.outputBuffer.Append(w)
}
......@@ -260,6 +274,7 @@ func (c *connection) init(conn Conn, prepare OnPrepare) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan struct{}, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = block1k/2, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)
c.setFinalizer()
......@@ -304,6 +319,7 @@ func (c *connection) initFDOperator() {
func (c *connection) setFinalizer() {
c.AddCloseCallback(func(connection Connection) error {
c.stop(flushing)
c.netFD.Close()
c.closeBuffer()
freeop(c.operator)
......@@ -327,11 +343,10 @@ func (c *connection) triggerWrite(err error) {
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
leftover := n - c.inputBuffer.Len()
if leftover <= 0 {
if n <= c.inputBuffer.Len() {
return nil
}
atomic.StoreInt32(&c.waitReadSize, int32(leftover))
atomic.StoreInt32(&c.waitReadSize, int32(n))
defer atomic.StoreInt32(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
......@@ -359,24 +374,31 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
} else {
c.readTimer.Reset(c.readTimeout)
}
for c.inputBuffer.Len() < n {
if c.IsActive() {
select {
case <-c.readTimer.C:
return Exception(ErrReadTimeout, c.readTimeout.String())
case <-c.readTrigger:
continue
if !c.IsActive() {
// cannot return directly, stop timer before !
// confirm that fd is still valid.
if atomic.LoadUint32(&c.netFD.closed) == 0 {
err = c.fill(n)
} else {
err = Exception(ErrConnClosed, "wait read")
}
break
}
// cannot return directly, stop timer before !
// confirm that fd is still valid.
if atomic.LoadUint32(&c.netFD.closed) == 0 {
err = c.fill(n)
} else {
err = Exception(ErrConnClosed, "wait read")
select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case <-c.readTrigger:
continue
}
break
}
// clean timer.C
if !c.readTimer.Stop() {
<-c.readTimer.C
......
......@@ -29,12 +29,25 @@ const (
type key int32
/* State Diagram
+--------------+ +--------------+
| processing |-------->| flushing |
+-------+------+ +-------+------+
|
| +--------------+
+--------------->| closing |
+--------------+
- "processing" locks onRequest handler, and doesn't exist in dialer.
- "flushing" locks outputBuffer
- "closing" should wait for flushing finished and call the closeCallback after that.
*/
const (
closing key = iota
processing
writing
inputBuffer
outputBuffer
flushing
reading
// total must be at the bottom.
total
)
......
......@@ -90,14 +90,14 @@ func (c *connection) onPrepare(prepare OnPrepare) (err error) {
}
// onRequest is also responsible for executing the callbacks after the connection has been closed.
func (c *connection) onRequest() (err error) {
func (c *connection) onRequest() (needTrigger bool) {
var process = c.process.Load()
if process == nil {
return nil
return true
}
// Buffer has been fully processed, or task already exists
if !c.lock(processing) {
return nil
return true
}
// add new task
var task = func() {
......@@ -116,8 +116,8 @@ func (c *connection) onRequest() (err error) {
c.closeCallback(false)
return
}
// Double check when exiting.
c.unlock(processing)
// Double check when exiting.
if c.Reader().Len() > 0 {
if !c.lock(processing) {
return
......@@ -126,7 +126,7 @@ func (c *connection) onRequest() (err error) {
}
}
runTask(c.ctx, task)
return nil
return false
}
// closeCallback .
......
......@@ -58,29 +58,20 @@ func (c *connection) onClose() error {
// closeBuffer recycle input & output LinkBuffer.
func (c *connection) closeBuffer() {
c.stop(writing)
if c.lock(inputBuffer) {
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
}
if c.lock(outputBuffer) {
c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
}
c.inputBuffer.Close()
barrierPool.Put(c.inputBarrier)
c.outputBuffer.Close()
barrierPool.Put(c.outputBarrier)
}
// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
n := int(atomic.LoadInt32(&c.waitReadSize))
if n <= pagesize {
return c.inputBuffer.Book(pagesize, vs)
}
n -= c.inputBuffer.Len()
if n < pagesize {
n = pagesize
if !c.lock(reading) {
return rs
}
return c.inputBuffer.Book(n, vs)
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
return vs[:1]
}
// inputAck implements FDOperator.
......@@ -88,20 +79,30 @@ func (c *connection) inputAck(n int) (err error) {
if n < 0 {
n = 0
}
leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
err = c.inputBuffer.BookAck(n, leftover <= 0)
c.triggerRead()
c.onRequest()
const maxBookSize = 16 * pagesize
// Auto size bookSize.
if n == c.bookSize && c.bookSize < maxBookSize {
c.bookSize <<= 1
}
length, _ := c.inputBuffer.bookAck(n)
if c.maxSize < length {
c.maxSize = length
}
c.unlock(reading)
var needTrigger = true
if length == n {
needTrigger = c.onRequest()
}
if needTrigger && length >= int(atomic.LoadInt32(&c.waitReadSize)) {
c.triggerRead()
}
return err
}
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
if !c.lock(writing) {
return rs, c.supportZeroCopy
}
if c.outputBuffer.IsEmpty() {
c.unlock(writing)
c.rw2r()
return rs, c.supportZeroCopy
}
......@@ -115,8 +116,6 @@ func (c *connection) outputAck(n int) (err error) {
c.outputBuffer.Skip(n)
c.outputBuffer.Release()
}
// must unlock before check empty
c.unlock(writing)
if c.outputBuffer.IsEmpty() {
c.rw2r()
}
......@@ -131,15 +130,6 @@ func (c *connection) rw2r() {
// flush write data directly.
func (c *connection) flush() error {
if !c.lock(writing) {
return nil
}
locked := true
defer func() {
if locked {
c.unlock(writing)
}
}()
if c.outputBuffer.IsEmpty() {
return nil
}
......@@ -165,8 +155,6 @@ func (c *connection) flush() error {
return Exception(err, "when flush")
}
locked = false
c.unlock(writing)
err = <-c.writeTrigger
return err
}
......@@ -141,7 +141,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) {
for atomic.LoadInt32(&rconn.waitReadSize) <= 0 {
runtime.Gosched()
}
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size/2))
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size))
syscall.Write(w, msg[size/2:])
wg.Wait()
}
......
......@@ -60,9 +60,11 @@ import (
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
...
eventLoop, _ := netpoll.NewEventLoop(
eventLoop, _ = netpoll.NewEventLoop(
handle,
netpoll.WithOnPrepare(prepare),
netpoll.WithReadTimeout(time.Second),
......@@ -78,6 +80,12 @@ func main() {
```go
package main
import (
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
...
// start listen loop ...
......@@ -95,8 +103,11 @@ package main
import (
"context"
"time"
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
// stop server ...
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
......
......@@ -64,6 +64,8 @@ import (
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
...
eventLoop, _ := netpoll.NewEventLoop(
......@@ -83,6 +85,12 @@ func main() {
```go
package main
import (
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
...
// start listen loop ...
......@@ -100,8 +108,11 @@ package main
import (
"context"
"time"
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
// stop server ...
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
......
docs/images/c_qps.png

47.1 KB

docs/images/c_tp99.png

51.1 KB

docs/images/s_qps.png

65.3 KB

docs/images/s_tp99.png

41 KB

* **v0.0.1**
* public [Netpoll](https://github.com/cloudwego/netpoll) which used by [ByteDance](https://www.bytedance.com)
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mux
import (
"testing"
)
func MustNil(t *testing.T, val interface{}) {
t.Helper()
Assert(t, val == nil, val)
if val != nil {
t.Fatal("assertion nil failed, val=", val)
}
}
func MustTrue(t *testing.T, cond bool) {
t.Helper()
if !cond {
t.Fatal("assertion true failed.")
}
}
func Equal(t *testing.T, got, expect interface{}) {
t.Helper()
if got != expect {
t.Fatalf("assertion equal failed, got=[%v], expect=[%v]", got, expect)
}
}
func Assert(t *testing.T, cond bool, val ...interface{}) {
t.Helper()
if !cond {
if len(val) > 0 {
val = append([]interface{}{"assertion failed:"}, val...)
t.Fatal(val...)
} else {
t.Fatal("assertion failed")
}
}
}
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mux
import (
"runtime"
"sync/atomic"
"github.com/cloudwego/netpoll"
)
/* DOC:
* ShardQueue uses the netpoll's nocopy API to merge and send data.
* The Data Flush is passively triggered by ShardQueue.Add and does not require user operations.
* If there is an error in the data transmission, the connection will be closed.
*
* ShardQueue.Add: add the data to be sent.
* NewShardQueue: create a queue with netpoll.Connection.
* ShardSize: the recommended number of shards is 32.
*/
const ShardSize = 32
// NewShardQueue .
func NewShardQueue(size int32, conn netpoll.Connection) (queue *ShardQueue) {
queue = &ShardQueue{
conn: conn,
size: size,
getters: make([][]WriterGetter, size),
swap: make([]WriterGetter, 0, 64),
locks: make([]int32, size),
}
for i := range queue.getters {
queue.getters[i] = make([]WriterGetter, 0, 64)
}
return queue
}
// WriterGetter is used to get a netpoll.Writer.
type WriterGetter func() (buf netpoll.Writer, isNil bool)
// ShardQueue uses the netpoll's nocopy API to merge and send data.
// The Data Flush is passively triggered by ShardQueue.Add and does not require user operations.
// If there is an error in the data transmission, the connection will be closed.
// ShardQueue.Add: add the data to be sent.
type ShardQueue struct {
conn netpoll.Connection
idx, size int32
getters [][]WriterGetter // len(getters) = size
swap []WriterGetter // use for swap
locks []int32 // len(locks) = size
trigger, runNum int32
}
// Add adds to q.getters[shard]
func (q *ShardQueue) Add(gts ...WriterGetter) {
shard := atomic.AddInt32(&q.idx, 1) % q.size
q.lock(shard)
trigger := len(q.getters[shard]) == 0
q.getters[shard] = append(q.getters[shard], gts...)
q.unlock(shard)
if trigger {
q.triggering(shard)
}
}
// triggering shard.
func (q *ShardQueue) triggering(shard int32) {
if atomic.AddInt32(&q.trigger, 1) > 1 {
return
}
q.foreach(shard)
}
// foreach swap r & w. It's not concurrency safe.
func (q *ShardQueue) foreach(shard int32) {
if atomic.AddInt32(&q.runNum, 1) > 1 {
return
}
go func() {
var tmp []WriterGetter
for ; atomic.LoadInt32(&q.trigger) > 0; shard = (shard + 1) % q.size {
// lock & swap
q.lock(shard)
if len(q.getters[shard]) == 0 {
q.unlock(shard)
continue
}
// swap
tmp = q.getters[shard]
q.getters[shard] = q.swap[:0]
q.swap = tmp
q.unlock(shard)
atomic.AddInt32(&q.trigger, -1)
// deal
q.deal(q.swap)
}
q.flush()
// quit & check again
atomic.StoreInt32(&q.runNum, 0)
if atomic.LoadInt32(&q.trigger) > 0 {
q.foreach(shard)
}
}()
}
// deal is used to get deal of netpoll.Writer.
func (q *ShardQueue) deal(gts []WriterGetter) {
writer := q.conn.Writer()
for _, gt := range gts {
buf, isNil := gt()
if !isNil {
err := writer.Append(buf)
if err != nil {
q.conn.Close()
return
}
}
}
}
// flush is used to flush netpoll.Writer.
func (q *ShardQueue) flush() {
err := q.conn.Writer().Flush()
if err != nil {
q.conn.Close()
return
}
}
// lock shard.
func (q *ShardQueue) lock(shard int32) {
for !atomic.CompareAndSwapInt32(&q.locks[shard], 0, 1) {
runtime.Gosched()
}
}
// unlock shard.
func (q *ShardQueue) unlock(shard int32) {
atomic.StoreInt32(&q.locks[shard], 0)
}
// Copyright 2021 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mux
import (
"net"
"runtime"
"testing"
"time"
"github.com/cloudwego/netpoll"
)
func TestShardQueue(t *testing.T) {
var svrConn net.Conn
network, address := "tcp", ":1234"
ln, err := net.Listen("tcp", ":1234")
MustNil(t, err)
stop := make(chan int, 1)
defer close(stop)
go func() {
var err error
for {
select {
case <-stop:
err = ln.Close()
MustNil(t, err)
return
default:
}
svrConn, err = ln.Accept()
MustNil(t, err)
}
}()
conn, err := netpoll.DialConnection(network, address, time.Second)
MustNil(t, err)
for svrConn == nil {
runtime.Gosched()
}
// test
queue := NewShardQueue(4, conn)
count, pkgsize := 16, 11
for i := 0; i < int(count); i++ {
var getter WriterGetter = func() (buf netpoll.Writer, isNil bool) {
buf = netpoll.NewLinkBuffer(pkgsize)
buf.Malloc(pkgsize)
return buf, false
}
queue.Add(getter)
}
total := count * pkgsize
recv := make([]byte, total)
rn, err := svrConn.Read(recv)
MustNil(t, err)
Equal(t, rn, total)
}
// TODO: need mock flush
func BenchmarkShardQueue(b *testing.B) {
b.Skip()
}
......@@ -123,6 +123,7 @@ func TestDialerFdAlloc(t *testing.T) {
fd := conn.(*TCPConnection).fd
conn.Write([]byte("hello world"))
for conn.IsActive() {
runtime.Gosched()
}
time.Sleep(time.Millisecond)
syscall.SetNonblock(fd, true)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment