Commit 964bde23 authored by Jeff Mitchell's avatar Jeff Mitchell
Browse files

Update zookeeper dep

Fixes #3896
parent 1f3db196
No related merge requests found
Showing with 43 additions and 8 deletions
+43 -8
......@@ -101,6 +101,9 @@ type Conn struct {
reconnectLatch chan struct{}
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)
// Debug (for recurring re-auth hang)
debugCloseRecvLoop bool
debugReauthDone chan struct{}
logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged
......@@ -301,9 +304,9 @@ func WithMaxBufferSize(maxBufferSize int) connOption {
// to a limit of 1mb. This option should be used for non-standard server setup
// where znode is bigger than default 1mb.
func WithMaxConnBufferSize(maxBufferSize int) connOption {
return func(c *Conn) {
c.buf = make([]byte, maxBufferSize)
}
return func(c *Conn) {
c.buf = make([]byte, maxBufferSize)
}
}
func (c *Conn) Close() {
......@@ -389,6 +392,17 @@ func (c *Conn) connect() error {
}
func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
shouldCancel := func() bool {
select {
case <-c.shouldQuit:
return true
case <-c.closeChan:
return true
default:
return false
}
}
c.credsMu.Lock()
defer c.credsMu.Unlock()
......@@ -400,6 +414,10 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
}
for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel rer-submitting credentials")
return
}
resChan, err := c.sendRequest(
opSetAuth,
&setAuthRequest{Type: 0,
......@@ -415,7 +433,16 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
continue
}
res := <-resChan
var res response
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("Recv closed, cancel re-submitting credentials")
return
case <-c.shouldQuit:
c.logger.Printf("Should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
c.logger.Printf("Credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
......@@ -476,6 +503,9 @@ func (c *Conn) loop() {
wg.Add(1)
go func() {
<-reauthChan
if c.debugCloseRecvLoop {
close(c.debugReauthDone)
}
err := c.sendLoop()
if err != nil || c.logInfo {
c.logger.Printf("Send loop terminated: err=%v", err)
......@@ -486,7 +516,12 @@ func (c *Conn) loop() {
wg.Add(1)
go func() {
err := c.recvLoop(c.conn)
var err error
if c.debugCloseRecvLoop {
err = errors.New("DEBUG: close recv loop")
} else {
err = c.recvLoop(c.conn)
}
if err != io.EOF || c.logInfo {
c.logger.Printf("Recv loop terminated: err=%v", err)
}
......
......@@ -1495,10 +1495,10 @@
"revisionTime": "2017-01-28T01:21:29Z"
},
{
"checksumSHA1": "y33yd1uDZmV3VY4K/5FCGTeRtB8=",
"checksumSHA1": "ze1R6Lrk0kW0cX24ZdZD6ZpIDCo=",
"path": "github.com/samuel/go-zookeeper/zk",
"revision": "471cd4e61d7a78ece1791fa5faa0345dc8c7d5a5",
"revisionTime": "2017-11-17T18:40:27Z"
"revision": "c4fab1ac1bec58281ad0667dc3f0907a9476ac47",
"revisionTime": "2018-01-30T19:37:22Z"
},
{
"checksumSHA1": "eDQ6f1EsNf+frcRO/9XukSEchm8=",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment