Unverified Commit b1fce594 authored by 凡羊羊's avatar 凡羊羊 Committed by GitHub
Browse files

Merge branch 'V5.1' into issue/650

Showing with 31 additions and 5 deletions
+31 -5
......@@ -46,6 +46,11 @@ func (w *WebsocketMessage) Encode() []byte {
return reb
}
type sendMessage struct {
messageType int
data []byte
}
//PubContext websocket context
type PubContext struct {
ID string
......@@ -57,6 +62,7 @@ type PubContext struct {
chans map[string]*Chan
lock sync.Mutex
close chan struct{}
sendQueue chan sendMessage
}
//Chan handle
......@@ -82,6 +88,7 @@ func NewPubContext(upgrader websocket.Upgrader,
httpRequest: httpRequest,
server: s,
chans: make(map[string]*Chan, 2),
sendQueue: make(chan sendMessage, 1024),
close: make(chan struct{}),
}
}
......@@ -221,7 +228,7 @@ func (p *PubContext) readMessage(closed chan struct{}) {
continue
}
if messageType == websocket.PingMessage {
p.conn.WriteMessage(websocket.PongMessage, []byte{})
p.SendWebsocketMessage(websocket.PongMessage)
continue
}
if messageType == websocket.BinaryMessage {
......@@ -230,16 +237,34 @@ func (p *PubContext) readMessage(closed chan struct{}) {
}
}
func (p *PubContext) send() {
for {
select {
case m, ok := <-p.sendQueue:
if !ok {
return
}
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := p.conn.WriteMessage(m.messageType, m.data); err != nil {
p.server.log.Debugf("write websocket message failure %s", err.Error())
}
case <-p.close:
p.server.log.Debugf("pub context send chan closed")
return
}
}
}
//SendMessage send websocket message
func (p *PubContext) SendMessage(message WebsocketMessage) error {
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
return p.conn.WriteMessage(websocket.TextMessage, message.Encode())
p.sendQueue <- sendMessage{messageType: websocket.TextMessage, data: message.Encode()}
return nil
}
//SendWebsocketMessage send websocket message
func (p *PubContext) SendWebsocketMessage(message int) error {
p.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
return p.conn.WriteMessage(message, []byte{})
p.sendQueue <- sendMessage{messageType: message, data: []byte{}}
return nil
}
func (p *PubContext) sendPing(closed chan struct{}) {
......@@ -263,6 +288,7 @@ func (p *PubContext) Start() {
p.server.log.Error("Create web socket conn error.", err.Error())
return
}
go p.send()
pingclosed := make(chan struct{})
readclosed := make(chan struct{})
go p.sendPing(pingclosed)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment