Unverified Commit 687e1cf0 authored by Chris Baker's avatar Chris Baker Committed by GitHub
Browse files

Merge pull request #5146 from hashicorp/b-1173-log-spam

updated to latest hashicorp/raft and hashicorp/memberlist to pull
parents 5805c64a 8c10992e
Showing with 2107 additions and 128 deletions
+2107 -128
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
# BTree implementation for Go
![Travis CI Build Status](https://api.travis-ci.org/google/btree.svg?branch=master)
This package provides an in-memory B-Tree implementation for Go, useful as
an ordered, mutable data structure.
The API is based off of the wonderful
http://godoc.org/github.com/petar/GoLLRB/llrb, and is meant to allow btree to
act as a drop-in replacement for gollrb trees.
See http://godoc.org/github.com/google/btree for documentation.
This diff is collapsed.
......@@ -29,6 +29,11 @@ func (b *memberlistBroadcast) Invalidates(other Broadcast) bool {
return b.node == mb.node
}
// memberlist.NamedBroadcast optional interface
func (b *memberlistBroadcast) Name() string {
return b.node
}
func (b *memberlistBroadcast) Message() []byte {
return b.msg
}
......
module github.com/hashicorp/memberlist
require (
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-sockaddr v0.0.0-20190103214136-e92cdb5343bb
github.com/kr/pretty v0.1.0 // indirect
github.com/miekg/dns v1.0.14
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529
github.com/stretchr/testify v1.2.2
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 // indirect
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 // indirect
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5 // indirect
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect
)
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c h1:BTAbnbegUIMB6xmQCwWE8yRzbA4XSpnZY5hvRJC188I=
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-sockaddr v0.0.0-20190103214136-e92cdb5343bb h1:YrwA8w5SBkUIH5BzN2pMYhno+txUCOD5+PVXwLS6ddI=
github.com/hashicorp/go-sockaddr v0.0.0-20190103214136-e92cdb5343bb/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5 h1:x6r4Jo0KNzOOzYd8lbcRsqjuqEASK6ob3auvWYM4/8U=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 h1:kb0VV7NuIojvRfzwslQeP3yArBqJHW9tOl4t38VS1jM=
gopkg.in/vmihailenco/msgpack.v2 v2.9.1/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8=
......@@ -221,6 +221,16 @@ func (t *NetTransport) Shutdown() error {
// and hands them off to the stream channel.
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
defer t.wg.Done()
// baseDelay is the initial delay after an AcceptTCP() error before attempting again
const baseDelay = 5 * time.Millisecond
// maxDelay is the maximum delay after an AcceptTCP() error before attempting again.
// In the case that tcpListen() is error-looping, it will delay the shutdown check.
// Therefore, changes to maxDelay may have an effect on the latency of shutdown.
const maxDelay = 1 * time.Second
var loopDelay time.Duration
for {
conn, err := tcpLn.AcceptTCP()
if err != nil {
......@@ -228,9 +238,22 @@ func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
break
}
if loopDelay == 0 {
loopDelay = baseDelay
} else {
loopDelay *= 2
}
if loopDelay > maxDelay {
loopDelay = maxDelay
}
t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
time.Sleep(loopDelay)
continue
}
// No error, reset loop delay
loopDelay = 0
t.streamCh <- conn
}
......
package memberlist
import (
"sort"
"math"
"sync"
"github.com/google/btree"
)
// TransmitLimitedQueue is used to queue messages to broadcast to
......@@ -19,35 +21,93 @@ type TransmitLimitedQueue struct {
// number of retransmissions attempted.
RetransmitMult int
sync.Mutex
bcQueue limitedBroadcasts
mu sync.Mutex
tq *btree.BTree // stores *limitedBroadcast as btree.Item
tm map[string]*limitedBroadcast
idGen int64
}
type limitedBroadcast struct {
transmits int // Number of transmissions attempted.
transmits int // btree-key[0]: Number of transmissions attempted.
msgLen int64 // btree-key[1]: copied from len(b.Message())
id int64 // btree-key[2]: unique incrementing id stamped at submission time
b Broadcast
name string // set if Broadcast is a NamedBroadcast
}
// Less tests whether the current item is less than the given argument.
//
// This must provide a strict weak ordering.
// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
// hold one of either a or b in the tree).
//
// default ordering is
// - [transmits=0, ..., transmits=inf]
// - [transmits=0:len=999, ..., transmits=0:len=2, ...]
// - [transmits=0:len=999,id=999, ..., transmits=0:len=999:id=1, ...]
func (b *limitedBroadcast) Less(than btree.Item) bool {
o := than.(*limitedBroadcast)
if b.transmits < o.transmits {
return true
} else if b.transmits > o.transmits {
return false
}
if b.msgLen > o.msgLen {
return true
} else if b.msgLen < o.msgLen {
return false
}
return b.id > o.id
}
// for testing; emits in transmit order if reverse=false
func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()
out := make([]*limitedBroadcast, 0, len(q.bcQueue))
if reverse {
for i := 0; i < len(q.bcQueue); i++ {
out = append(out, q.bcQueue[i])
}
} else {
for i := len(q.bcQueue) - 1; i >= 0; i-- {
out = append(out, q.bcQueue[i])
}
}
out := make([]*limitedBroadcast, 0, q.lenLocked())
q.walkReadOnlyLocked(reverse, func(cur *limitedBroadcast) bool {
out = append(out, cur)
return true
})
return out
}
type limitedBroadcasts []*limitedBroadcast
// walkReadOnlyLocked calls f for each item in the queue traversing it in
// natural order (by Less) when reverse=false and the opposite when true. You
// must hold the mutex.
//
// This method panics if you attempt to mutate the item during traversal. The
// underlying btree should also not be mutated during traversal.
func (q *TransmitLimitedQueue) walkReadOnlyLocked(reverse bool, f func(*limitedBroadcast) bool) {
if q.lenLocked() == 0 {
return
}
iter := func(item btree.Item) bool {
cur := item.(*limitedBroadcast)
prevTransmits := cur.transmits
prevMsgLen := cur.msgLen
prevID := cur.id
keepGoing := f(cur)
if prevTransmits != cur.transmits || prevMsgLen != cur.msgLen || prevID != cur.id {
panic("edited queue while walking read only")
}
return keepGoing
}
if reverse {
q.tq.Descend(iter) // end with transmit 0
} else {
q.tq.Ascend(iter) // start with transmit 0
}
}
// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
......@@ -65,123 +125,298 @@ type Broadcast interface {
Finished()
}
// NamedBroadcast is an optional extension of the Broadcast interface that
// gives each message a unique string name, and that is used to optimize
//
// You shoud ensure that Invalidates() checks the same uniqueness as the
// example below:
//
// func (b *foo) Invalidates(other Broadcast) bool {
// nb, ok := other.(NamedBroadcast)
// if !ok {
// return false
// }
// return b.Name() == nb.Name()
// }
//
// Invalidates() isn't currently used for NamedBroadcasts, but that may change
// in the future.
type NamedBroadcast interface {
Broadcast
// The unique identity of this broadcast message.
Name() string
}
// UniqueBroadcast is an optional interface that indicates that each message is
// intrinsically unique and there is no need to scan the broadcast queue for
// duplicates.
//
// You should ensure that Invalidates() always returns false if implementing
// this interface. Invalidates() isn't currently used for UniqueBroadcasts, but
// that may change in the future.
type UniqueBroadcast interface {
Broadcast
// UniqueBroadcast is just a marker method for this interface.
UniqueBroadcast()
}
// QueueBroadcast is used to enqueue a broadcast
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
q.Lock()
defer q.Unlock()
// Check if this message invalidates another
n := len(q.bcQueue)
for i := 0; i < n; i++ {
if b.Invalidates(q.bcQueue[i].b) {
q.bcQueue[i].b.Finished()
copy(q.bcQueue[i:], q.bcQueue[i+1:])
q.bcQueue[n-1] = nil
q.bcQueue = q.bcQueue[:n-1]
n--
q.queueBroadcast(b, 0)
}
// lazyInit initializes internal data structures the first time they are
// needed. You must already hold the mutex.
func (q *TransmitLimitedQueue) lazyInit() {
if q.tq == nil {
q.tq = btree.New(32)
}
if q.tm == nil {
q.tm = make(map[string]*limitedBroadcast)
}
}
// queueBroadcast is like QueueBroadcast but you can use a nonzero value for
// the initial transmit tier assigned to the message. This is meant to be used
// for unit testing.
func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int) {
q.mu.Lock()
defer q.mu.Unlock()
q.lazyInit()
if q.idGen == math.MaxInt64 {
// it's super duper unlikely to wrap around within the retransmit limit
q.idGen = 1
} else {
q.idGen++
}
id := q.idGen
lb := &limitedBroadcast{
transmits: initialTransmits,
msgLen: int64(len(b.Message())),
id: id,
b: b,
}
unique := false
if nb, ok := b.(NamedBroadcast); ok {
lb.name = nb.Name()
} else if _, ok := b.(UniqueBroadcast); ok {
unique = true
}
// Check if this message invalidates another.
if lb.name != "" {
if old, ok := q.tm[lb.name]; ok {
old.b.Finished()
q.deleteItem(old)
}
} else if !unique {
// Slow path, hopefully nothing hot hits this.
var remove []*limitedBroadcast
q.tq.Ascend(func(item btree.Item) bool {
cur := item.(*limitedBroadcast)
// Special Broadcasts can only invalidate each other.
switch cur.b.(type) {
case NamedBroadcast:
// noop
case UniqueBroadcast:
// noop
default:
if b.Invalidates(cur.b) {
cur.b.Finished()
remove = append(remove, cur)
}
}
return true
})
for _, cur := range remove {
q.deleteItem(cur)
}
}
// Append to the queue
q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
// Append to the relevant queue.
q.addItem(lb)
}
// deleteItem removes the given item from the overall datastructure. You
// must already hold the mutex.
func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) {
_ = q.tq.Delete(cur)
if cur.name != "" {
delete(q.tm, cur.name)
}
if q.tq.Len() == 0 {
// At idle there's no reason to let the id generator keep going
// indefinitely.
q.idGen = 0
}
}
// addItem adds the given item into the overall datastructure. You must already
// hold the mutex.
func (q *TransmitLimitedQueue) addItem(cur *limitedBroadcast) {
_ = q.tq.ReplaceOrInsert(cur)
if cur.name != "" {
q.tm[cur.name] = cur
}
}
// getTransmitRange returns a pair of min/max values for transmit values
// represented by the current queue contents. Both values represent actual
// transmit values on the interval [0, len). You must already hold the mutex.
func (q *TransmitLimitedQueue) getTransmitRange() (minTransmit, maxTransmit int) {
if q.lenLocked() == 0 {
return 0, 0
}
minItem, maxItem := q.tq.Min(), q.tq.Max()
if minItem == nil || maxItem == nil {
return 0, 0
}
min := minItem.(*limitedBroadcast).transmits
max := maxItem.(*limitedBroadcast).transmits
return min, max
}
// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
// and applying a per-message overhead as provided.
func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()
// Fast path the default case
if len(q.bcQueue) == 0 {
if q.lenLocked() == 0 {
return nil
}
transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
bytesUsed := 0
var toSend [][]byte
for i := len(q.bcQueue) - 1; i >= 0; i-- {
// Check if this is within our limits
b := q.bcQueue[i]
msg := b.b.Message()
if bytesUsed+overhead+len(msg) > limit {
var (
bytesUsed int
toSend [][]byte
reinsert []*limitedBroadcast
)
// Visit fresher items first, but only look at stuff that will fit.
// We'll go tier by tier, grabbing the largest items first.
minTr, maxTr := q.getTransmitRange()
for transmits := minTr; transmits <= maxTr; /*do not advance automatically*/ {
free := int64(limit - bytesUsed - overhead)
if free <= 0 {
break // bail out early
}
// Search for the least element on a given tier (by transmit count) as
// defined in the limitedBroadcast.Less function that will fit into our
// remaining space.
greaterOrEqual := &limitedBroadcast{
transmits: transmits,
msgLen: free,
id: math.MaxInt64,
}
lessThan := &limitedBroadcast{
transmits: transmits + 1,
msgLen: math.MaxInt64,
id: math.MaxInt64,
}
var keep *limitedBroadcast
q.tq.AscendRange(greaterOrEqual, lessThan, func(item btree.Item) bool {
cur := item.(*limitedBroadcast)
// Check if this is within our limits
if int64(len(cur.b.Message())) > free {
// If this happens it's a bug in the datastructure or
// surrounding use doing something like having len(Message())
// change over time. There's enough going on here that it's
// probably sane to just skip it and move on for now.
return true
}
keep = cur
return false
})
if keep == nil {
// No more items of an appropriate size in the tier.
transmits++
continue
}
msg := keep.b.Message()
// Add to slice to send
bytesUsed += overhead + len(msg)
toSend = append(toSend, msg)
// Check if we should stop transmission
b.transmits++
if b.transmits >= transmitLimit {
b.b.Finished()
n := len(q.bcQueue)
q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
q.bcQueue = q.bcQueue[:n-1]
q.deleteItem(keep)
if keep.transmits+1 >= transmitLimit {
keep.b.Finished()
} else {
// We need to bump this item down to another transmit tier, but
// because it would be in the same direction that we're walking the
// tiers, we will have to delay the reinsertion until we are
// finished our search. Otherwise we'll possibly re-add the message
// when we ascend to the next tier.
keep.transmits++
reinsert = append(reinsert, keep)
}
}
// If we are sending anything, we need to re-sort to deal
// with adjusted transmit counts
if len(toSend) > 0 {
q.bcQueue.Sort()
for _, cur := range reinsert {
q.addItem(cur)
}
return toSend
}
// NumQueued returns the number of queued messages
func (q *TransmitLimitedQueue) NumQueued() int {
q.Lock()
defer q.Unlock()
return len(q.bcQueue)
q.mu.Lock()
defer q.mu.Unlock()
return q.lenLocked()
}
// Reset clears all the queued messages
func (q *TransmitLimitedQueue) Reset() {
q.Lock()
defer q.Unlock()
for _, b := range q.bcQueue {
b.b.Finished()
// lenLocked returns the length of the overall queue datastructure. You must
// hold the mutex.
func (q *TransmitLimitedQueue) lenLocked() int {
if q.tq == nil {
return 0
}
q.bcQueue = nil
return q.tq.Len()
}
// Reset clears all the queued messages. Should only be used for tests.
func (q *TransmitLimitedQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.walkReadOnlyLocked(false, func(cur *limitedBroadcast) bool {
cur.b.Finished()
return true
})
q.tq = nil
q.tm = nil
q.idGen = 0
}
// Prune will retain the maxRetain latest messages, and the rest
// will be discarded. This can be used to prevent unbounded queue sizes
func (q *TransmitLimitedQueue) Prune(maxRetain int) {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()
// Do nothing if queue size is less than the limit
n := len(q.bcQueue)
if n < maxRetain {
return
}
// Invalidate the messages we will be removing
for i := 0; i < n-maxRetain; i++ {
q.bcQueue[i].b.Finished()
for q.tq.Len() > maxRetain {
item := q.tq.Max()
if item == nil {
break
}
cur := item.(*limitedBroadcast)
cur.b.Finished()
q.deleteItem(cur)
}
// Move the messages, and retain only the last maxRetain
copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
q.bcQueue = q.bcQueue[:maxRetain]
}
func (b limitedBroadcasts) Len() int {
return len(b)
}
func (b limitedBroadcasts) Less(i, j int) bool {
return b[i].transmits < b[j].transmits
}
func (b limitedBroadcasts) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}
func (b limitedBroadcasts) Sort() {
sort.Sort(sort.Reverse(b))
}
......@@ -34,7 +34,7 @@ and `StableStore`.
## Tagged Releases
As of September 2017, Hashicorp will start using tags for this library to clearly indicate
As of September 2017, HashiCorp will start using tags for this library to clearly indicate
major version updates. We recommend you vendor your application's dependency on this library.
* v0.1.0 is the original stable version of the library that was in master and has been maintained
......
......@@ -164,7 +164,7 @@ type Raft struct {
// configuration on all the Voter servers. There is no need to bootstrap
// Nonvoter and Staging servers.
//
// One sane approach is to boostrap a single server with a configuration
// One sane approach is to bootstrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
......@@ -717,12 +717,12 @@ func (r *Raft) RemovePeer(peer ServerAddress) Future {
}
// AddVoter will add the given server to the cluster as a staging server. If the
// server is already in the cluster as a voter, this does nothing. This must be
// run on the leader or it will fail. The leader will promote the staging server
// to a voter once that server is ready. If nonzero, prevIndex is the index of
// the only configuration upon which this change may be applied; if another
// configuration entry has been added in the meantime, this request will fail.
// If nonzero, timeout is how long this server should wait before the
// server is already in the cluster as a voter, this updates the server's address.
// This must be run on the leader or it will fail. The leader will promote the
// staging server to a voter once that server is ready. If nonzero, prevIndex is
// the index of the only configuration upon which this change may be applied; if
// another configuration entry has been added in the meantime, this request will
// fail. If nonzero, timeout is how long this server should wait before the
// configuration change log entry is appended.
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 2 {
......@@ -739,9 +739,9 @@ func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, ti
// AddNonvoter will add the given server to the cluster but won't assign it a
// vote. The server will receive log entries, but it won't participate in
// elections or log entry commitment. If the server is already in the cluster as
// a staging server or voter, this does nothing. This must be run on the leader
// or it will fail. For prevIndex and timeout, see AddVoter.
// elections or log entry commitment. If the server is already in the cluster,
// this updates the server's address. This must be run on the leader or it will
// fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
......
......@@ -9,7 +9,7 @@ import (
// replication goroutines report in newly written entries with Match(), and
// this notifies on commitCh when the commit index has advanced.
type commitment struct {
// protectes matchIndexes and commitIndex
// protects matchIndexes and commitIndex
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}
......
......@@ -115,7 +115,7 @@ type configurationChangeRequest struct {
// prior one has been committed).
//
// One downside to storing just two configurations is that if you try to take a
// snahpsot when your state machine hasn't yet applied the committedIndex, we
// snapshot when your state machine hasn't yet applied the committedIndex, we
// have no record of the configuration that would logically fit into that
// snapshot. We disallow snapshots in that case now. An alternative approach,
// which LogCabin uses, is to track every configuration change in the
......@@ -198,7 +198,7 @@ func nextConfiguration(current Configuration, currentIndex uint64, change config
// TODO: barf on new address?
newServer := Server{
// TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promoton to Voter is not yet
// promoted to Voter later. However, the promotion to Voter is not yet
// implemented, and doing so is not trivial with the way the leader loop
// coordinates with the replication goroutines today. So, for now, the
// server will have a vote right away, and the Promote case below is
......
package raft
import (
"errors"
"sync"
)
......@@ -106,7 +107,11 @@ func (i *InmemStore) Set(key []byte, val []byte) error {
func (i *InmemStore) Get(key []byte) ([]byte, error) {
i.l.RLock()
defer i.l.RUnlock()
return i.kv[string(key)], nil
val := i.kv[string(key)]
if val == nil {
return nil, errors.New("not found")
}
return val, nil
}
// SetUint64 implements the StableStore interface.
......
......@@ -43,9 +43,11 @@ type InmemTransport struct {
timeout time.Duration
}
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
// NewInmemTransportWithTimeout is used to initialize a new transport and
// generates a random local address if none is specified. The given timeout
// will be used to decide how long to wait for a connected peer to process the
// RPCs that we're sending it. See also Connect() and Consumer().
func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (ServerAddress, *InmemTransport) {
if string(addr) == "" {
addr = NewInmemAddr()
}
......@@ -53,11 +55,17 @@ func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
consumerCh: make(chan RPC, 16),
localAddr: addr,
peers: make(map[ServerAddress]*InmemTransport),
timeout: 50 * time.Millisecond,
timeout: timeout,
}
return addr, trans
}
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
return NewInmemTransportWithTimeout(addr, 50*time.Millisecond)
}
// SetHeartbeatHandler is used to set optional fast-path for
// heartbeats, not supported for this transport.
func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC)) {
......@@ -76,16 +84,15 @@ func (i *InmemTransport) LocalAddr() ServerAddress {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) {
i.RLock()
i.Lock()
defer i.Unlock()
peer, ok := i.peers[target]
i.RUnlock()
if !ok {
return nil, fmt.Errorf("failed to connect to peer: %v", target)
}
pipeline := newInmemPipeline(i, peer, target)
i.Lock()
i.pipelines = append(i.pipelines, pipeline)
i.Unlock()
return pipeline, nil
}
......
......@@ -309,7 +309,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil {
n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err)
n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err)
} else {
return serverAddressOverride
}
......@@ -461,16 +461,38 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
// listen is used to handling incoming connections.
func (n *NetworkTransport) listen() {
const baseDelay = 5 * time.Millisecond
const maxDelay = 1 * time.Second
var loopDelay time.Duration
for {
// Accept incoming connections
conn, err := n.stream.Accept()
if err != nil {
if n.IsShutdown() {
if loopDelay == 0 {
loopDelay = baseDelay
} else {
loopDelay *= 2
}
if loopDelay > maxDelay {
loopDelay = maxDelay
}
if !n.IsShutdown() {
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err)
}
select {
case <-n.shutdownCh:
return
case <-time.After(loopDelay):
continue
}
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err)
continue
}
// No error, reset loop delay
loopDelay = 0
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr())
// Handle the connection in dedicated routine
......
......@@ -444,6 +444,7 @@ func (r *Raft) startStopReplication() {
currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1,
lastContact: time.Now(),
notify: make(map[*verifyFuture]struct{}),
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
......@@ -555,11 +556,17 @@ func (r *Raft) leaderLoop() {
r.logger.Printf("[WARN] raft: New leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
......@@ -639,7 +646,7 @@ func (r *Raft) verifyLeader(v *verifyFuture) {
// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock()
repl.notify = append(repl.notify, v)
repl.notify[v] = struct{}{}
repl.notifyLock.Unlock()
asyncNotifyCh(repl.notifyCh)
}
......
......@@ -31,7 +31,7 @@ type followerReplication struct {
peer Server
// commitment tracks the entries acknowledged by followers so that the
// leader's commit index can advance. It is updated on successsful
// leader's commit index can advance. It is updated on successful
// AppendEntries responses.
commitment *commitment
......@@ -64,9 +64,9 @@ type followerReplication struct {
// notifyCh is notified to send out a heartbeat, which is used to check that
// this server is still leader.
notifyCh chan struct{}
// notify is a list of futures to be resolved upon receipt of an
// acknowledgement, then cleared from this list.
notify []*verifyFuture
// notify is a map of futures to be resolved upon receipt of an
// acknowledgement, then cleared from this map.
notify map[*verifyFuture]struct{}
// notifyLock protects 'notify'.
notifyLock sync.Mutex
......@@ -85,15 +85,22 @@ func (s *followerReplication) notifyAll(leader bool) {
// Clear the waiting notifies minimizing lock time
s.notifyLock.Lock()
n := s.notify
s.notify = nil
s.notify = make(map[*verifyFuture]struct{})
s.notifyLock.Unlock()
// Submit our votes
for _, v := range n {
for v, _ := range n {
v.vote(leader)
}
}
// cleanNotify is used to delete notify, .
func (s *followerReplication) cleanNotify(v *verifyFuture) {
s.notifyLock.Lock()
delete(s.notify, v)
s.notifyLock.Unlock()
}
// LastContact returns the time of last contact.
func (s *followerReplication) LastContact() time.Time {
s.lastContactLock.RLock()
......
......@@ -47,8 +47,8 @@ func NewTCPTransportWithLogger(
})
}
// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of
// a TCP streaming transport layer, using a default logger and the address provider
// NewTCPTransportWithConfig returns a NetworkTransport that is built on top of
// a TCP streaming transport layer, using the given config struct.
func NewTCPTransportWithConfig(
bindAddr string,
advertise net.Addr,
......
## 0.8.2 (UNRELEASED)
FEATURES:
IMPROVEMENTS:
* agent: Fixed a missing case where gossip would stop flowing to dead nodes for a short while. [GH-451]
* agent: Uses the go-sockaddr library to look for private IP addresses, which prefers non-loopback private addresses over loopback ones when trying to automatically determine the advertise address. [GH-451]
* agent: Properly seeds Go's random number generator using the seed library. [GH-451]
* agent: Serf is now built with Go 1.10.x. [GH-520]
* agent: Improved address comparison during conflict resolution. [GH-433]
* agent: Updated memberlist to latest to pull several cleanups and fixes. [GH-491]
* agent: Improved handling of leave intent messages to make sure they propagate and are processed correctly. [GH-510]
* agent: Added CLI option to disable compression for debugging messages. [GH-529]
* library: Moved close of shutdown channel until after network resorces are released. [GH-453]
* library: Fixed several race conditions with QueryResponse [GH-460]
* library: Made snapshot writing asyncronous and will less aggressive compaction on large clusters to avoid blocking message handler on disk IO [GH-524][GH-525]
BUG FIXES:
* agent: Added defenses against invalid network coordinates with NaN and Inf values. [GH-468]
* agent: Fixed an issue on Windows where "wsarecv" errors were logged when clients accessed the RPC interface. [GH-479]
* agent: Fixed an issue where calling the serf Stats function could result in a deadlock. [[Consul Issue 4011](https://github.com/hashicorp/consul/issues/4011)]
## 0.8.1 (February 6, 2017)
IMPROVEMENTS:
* agent: Added support for relaying query responses through N other nodes for redundancy. [GH-439]
* agent: Added the ability to tune the broadcast timeout, which might be necessary in very large clusters that experience very large, simultaneous changes to the cluster. [GH-412]
* agent: Added a checksum to UDP gossip messages to guard against packet corruption. [GH-432]
* agent: Added a short window where gossip will still flow to dead nodes so that they can more quickly refute. [GH-440]
* build: Serf now builds with Go 1.7.5. [GH-443]
## 0.8 (September 14, 2016)
FEATURES:
* **Lifeguard Updates:** Implemented a new set of feedback controls for the gossip layer that help prevent degraded nodes that can't meet the soft real-time requirements from erroneously causing flapping in other, healthy nodes. This feature tunes itself automatically and requires no configuration. [GH-394]
IMRPOVEMENTS:
* Modified management of intents to be per-node to avoid intent queue overflow errors in large clusters. [GH-402]
* Joins based on a DNS lookup will use TCP and attempt to join with the full list of returned addresses. [GH-387]
* Serf's Go dependencies are now vendored using govendor. [GH-383]
* Updated all of Serf's dependencies. [GH-387] [GH-401]
* Moved dist build into a Docker container. [GH-409]
BUG FIXES:
* Updated memberlist to pull in a fix for leaking goroutines when performing TCP fallback pings. This affected users with frequent UDP connectivity problems. [GH-381]
## 0.7 (December 21, 2015)
FEATURES:
* Added new network tomography subsystem that computes network coordinates for
nodes in the cluster which can be used to estimate network round trip times
between any two nodes; exposes new `GetCoordinate` API as as well as a
a new `serf rtt` command to query RTT interactively
IMPROVEMENTS:
* Added support for configuring query request size and query response size [GH-346]
* Syslog messages are now filtered by the configured log-level
* New `statsd_addr` for sending metrics via UDP to statsd
* Added support for sending telemetry to statsite
* `serf info` command now displays event handlers [GH-312]
* Added a `MemberLeave` message to the `EventCh` for a force-leave so higher-
level applications can handle the leave event
* Lots of documentation updates
BUG FIXES:
* Fixed updating cached protocol version of a node when an update event
fires [GH-335]
* Fixed a bug where an empty remote state message would cause a crash in
`MergeRemoteState`
## 0.6.4 (Febuary 12, 2015)
IMPROVEMENTS:
* Added merge delegate to Serf library to support application
specific logic in cluster merging.
* `SERF_RPC_AUTH` environment variable can be used in place of CLI flags.
* Display if encryption is enabled in Serf stats
* Improved `join` behavior when using DNS resolution
BUG FIXES:
* Fixed snapshot file compaction on Windows
* Fixed device binding on Windows
* Fixed bug with empty keyring
* Fixed parsing of ports in some cases
* Fixing stability issues under high churn
MISC:
* Increased user event size limit to 512 bytes (previously 256)
## 0.6.3 (July 10, 2014)
IMPROVEMENTS:
* Added `statsite_addr` configuration to stream to statsite
BUG FIXES:
* Fixed issue with mDNS flooding when using IPv4 and IPv6
* Fixed issue with reloading event handlers
MISC:
* Improved failure detection reliability under load
* Reduced fsync() use in snapshot file
* Improved snapshot file performance
* Additional logging to help debug flapping
## 0.6.2 (June 16, 2014)
IMPROVEMENTS:
* Added `syslog_facility` configuration to set facility
BUG FIXES:
* Fixed memory leak in in-memory stats system
* Fixed issue that would cause syslog to deadlock
MISC:
* Fixed missing prefixes on some log messages
* Docs fixes
## 0.6.1 (May 29, 2014)
BUG FIXES:
* On Windows, a "failed to decode request header" error will no
longer be shown on every RPC request.
* Avoiding holding a lock which can cause monitor/stream commands to
fail when an event handler is blocking
* Fixing conflict response decoding errors
IMPROVEMENTS:
* Improved agent CLI usage documentation
* Warn if an event handler is slow, potentially blocking other events
## 0.6.0 (May 8, 2014)
FEATURES:
* Support for key rotation when using encryption. This adds a new
`serf keys` command, and a `-keyring-file` configuration. Thanks
to @ryanuber.
* New `-tags-file` can be specified to persist changes to tags made
via the RPC interface. Thanks to @ryanuber.
* New `serf info` command to provide operator debugging information,
and to get info about the local node.
* Adding `-retry-join` flag to agent which enables retrying the join
until success or `-retry-max` attempts have been made.
IMPROVEMENTS:
* New `-rejoin` flag can be used along with a snapshot file to
automatically rejoin a cluster.
* Agent uses circular buffer to invoke handlers, guards against unbounded
output lengths.
* Adding support for logging to syslog
* The SERF_RPC_ADDR environment variable can be used instead of the
`-rpc-addr` flags. Thanks to @lalyos [GH-209].
* `serf query` can now output the results in a JSON format.
* Unknown configuration directives generate an error [GH-186].
Thanks to @vincentbernat.
BUG FIXES:
* Fixing environmental variables with invalid characters. [GH-200].
Thanks to @arschles.
* Fixing issue with tag changes with hard restart before
failure detection.
* Fixing issue with reconnect when using dynamic ports.
MISC:
* Improved logging of various error messages
* Improved debian packaging. Thanks to @vincentbernat.
## 0.5.0 (March 12, 2014)
FEATURES:
* New `query` command provides a request/response mechanism to do realtime
queries across the cluster. [GH-139]
* Automatic conflict resolution. Serf will detect name conflicts, and use an
internal query to determine which node is in the minority and perform a shutdown.
[GH-167] [GH-119]
* New `reachability` command can be used to help diagnose network and configuration
issues.
* Added `member-reap` event to get notified of when Serf removes a failed or left
node from the cluster. The reap interval is controlled by `reconnect_timeout` and
`tombstone_timeout` respectively. [GH-172]
IMPROVEMENTS:
* New Recipes section on the site to share Serf tips. Thanks to @ryanuber. [GH-177]
* `members` command has new `-name` filter flag. Thanks to @ryanuber [GH-164]
* New RPC command "members-filtered" to move filtering logic to the agent.
Thanks to @ryanuber. [GH-149]
* `reconnect_interval` and `reconnect_timeout` can be provided to configure
agent behavior for attempting to reconnect to failed nodes. [GH-155]
* `tombstone_interval` can be provided to configure the reap time for nodes
that have gracefully left. [GH_172]
* Agent can be provided `rpc_auth` config to require that RPC is authenticated.
All commands can take a `-rpc-auth` flag now. [GH-148]
BUG FIXES:
* Fixed config folder in Upstart script. Thanks to @llchen223. [GH-174]
* Event handlers are correctly invoked when BusyBox is the shell. [GH-156]
* Event handlers were not being invoked with the correct SERF_TAG_* values
if tags were changed using the `tags` command. [GH-169]
MISC:
* Support for protocol version 1 (Serf 0.2) has been removed. Serf 0.5 cannot
join a cluster that has members running version 0.2.
## 0.4.5 (February 25, 2014)
FEATURES:
* New `tags` command is available to dynamically update tags without
reloading the agent. Thanks to @ryanuber. [GH-126]
IMPROVEMENTS:
* Upstart receipe logs output thanks to @breerly [GH-128]
* `members` can filter on any tag thanks to @hmrm [GH-124]
* Added vagrant demo to make a simple cluster
* `members` now columnizes the output thanks to @ryanuber [GH-138]
* Agent passes its own environment variables through thanks to @mcroydon [GH-142]
* `-iface` flag can be used to bind to interfaces [GH-145]
BUG FIXES:
* -config-dir would cause protocol to be set to 0 if there are no
configuration files in the directory [GH-129]
* Event handlers can filter on 'member-update'
* User event handler appends new line, this was being omitted
## 0.4.1 (February 3, 2014)
IMPROVEMENTS:
* mDNS service uses the advertise address instead of bind address
## 0.4.0 (January 31, 2014)
FEATURES:
* Static `role` has been replaced with dynamic tags. Each agent can have
multiple key/value tags associated using `-tag`. Tags can be updated using
a SIGHUP and are advertised to the cluster, causing the `member-update` event
to be triggered. [GH-111] [GH-98]
* Serf can automatically discover peers uing mDNS when provided the `-discover`
flag. In network environments supporting multicast, no explicit join is needed
to find peers. [GH-53]
* Serf collects telemetry information and simple runtime profiling. Stats can
be dumped to stderr by sending a `USR1` signal to Serf. Windows users must use
the `BREAK` signal instead. [GH-103]
* `advertise` flag can be used to set an advertise address different
from the bind address. Used for NAT traversal. Thanks to @benagricola [GH-93]
* `members` command now takes `-format` flag to specify either text or JSON
output. Fixed by @ryanuber [GH-97]
IMPROVEMENTS:
* User payload always appends a newline when invoking a shell script
* Severity of "Potential blocking operation" reduced to debug to prevent
spurious messages on slow or busy machines.
BUG FIXES:
* If an agent is restarted with the same bind address but new name, it
will not respond to the old name, causing the old name to enter the
`failed` state, instead of having duplicate entries in the `alive` state.
* `leave_on_interrupt` set to false when not specified, if
any config file is provided. This flag is deprecated for
`skip_leave_on_interrupt` instead. [GH-94]
MISC:
* `-role` configuration has been deprecated in favor of `-tag role=foo`.
The flag is still supported but will generate warnings.
* Support for protocol version 0 (Serf 0.1) has been removed. Serf 0.4 cannot
join a cluster that has members running version 0.1.
## 0.3.0 (December 5, 2013)
FEATURES:
* Dynamic port support, cluster wide consistent config not necessary
* Snapshots to automaticaly rejoin cluster after failure and prevent replays [GH-84] [GH-71]
* Adding `profile` config to agent, to support WAN, LAN, and Local modes
* MsgPack over TCP RPC protocol which can be used to control Serf, send events, and
receive events with low latency.
* New `leave` CLI command and RPC endpoint to control graceful leaves
* Signal handling is controlable, graceful leave behavior on SIGINT/SIGTERM
can be specified
* SIGHUP can be used to reload configuration
IMPROVEMENTS:
* Event handler provides lamport time of user events via SERF_USER_LTIME [GH-68]
* Memberlist encryption overhead has been reduced
* Filter output of `members` using regular expressions on role and status
* `replay_on_join` parameter to control replay with `start_join`
* `monitor` works even if the client is behind a NAT
* Serf generates warning if binding to public IP without encryption
BUG FIXES:
* Prevent unbounded transmit queues [GH-78]
* IPv6 addresses can be bound to [GH-72]
* Serf join won't hang on a slow/dead node [GH-70]
* Serf Leave won't block Shutdown [GH-1]
## 0.2.1 (November 6, 2013)
BUG FIXES:
* Member role and address not updated on re-join [GH-58]
## 0.2.0 (November 1, 2013)
FEATURES:
* Protocol versioning features so that upgrades can be done safely.
See the website on upgrading Serf for more info.
* Can now configure Serf with files or directories of files by specifying
the `-config-file` and/or `-config-dir` flags to the agent.
* New command `serf force-leave` can be used to force a "failed" node
to the "left" state.
* Serf now supports message encryption and verification so that it can
be used on untrusted networks [GH-25]
* The `-join` flag on `serf agent` can be used to join a cluster when
starting an agent. [GH-42]
IMPROVEMENTS:
* Random staggering of periodic routines to avoid cluster-wide
synchronization
* Push/Pull timer automatically slows down as cluster grows to avoid
congestion
* Messages are compressed to reduce bandwidth utilization
* `serf members` now provides node roles in output
* Joining a cluster will no longer replay all the old events by default,
but it can using the `-replay` flag.
* User events are coalesced by default, meaning duplicate events (by name)
within a short period of time are merged. [GH-8]
BUG FIXES:
* Event handlers work on Windows now by executing commands through
`cmd /C` [GH-37]
* Nodes that previously left and rejoin won't get stuck in 'leaving' state.
[GH-18]
* Fixing alignment issues on i386 for atomic operations [GH-20]
* "trace" log level works [GH-31]
## 0.1.1 (October 23, 2013)
BUG FIXES:
* Default node name is outputted when "serf agent" is called with no args.
* Remove node from reap list after join so a fast re-join doesn't lose the
member.
## 0.1.0 (October 23, 2013)
* Initial release
GOTOOLS = github.com/mitchellh/gox github.com/kardianos/govendor
VERSION = $(shell awk -F\" '/^const Version/ { print $$2; exit }' cmd/serf/version.go)
GITSHA:=$(shell git rev-parse HEAD)
GITBRANCH:=$(shell git symbolic-ref --short HEAD 2>/dev/null)
default:: test
# bin generates the releasable binaries
bin:: tools
@sh -c "'$(CURDIR)/scripts/build.sh'"
# cov generates the coverage output
cov:: tools
gocov test ./... | gocov-html > /tmp/coverage.html
open /tmp/coverage.html
# dev creates binaries for testing locally - these are put into ./bin and
# $GOPATH
dev::
@SERF_DEV=1 sh -c "'$(CURDIR)/scripts/build.sh'"
# dist creates the binaries for distibution
dist::
@sh -c "'$(CURDIR)/scripts/dist.sh' $(VERSION)"
get-tools::
go get -u -v $(GOTOOLS)
# subnet sets up the require subnet for testing on darwin (osx) - you must run
# this before running other tests if you are on osx.
subnet::
@sh -c "'$(CURDIR)/scripts/setup_test_subnet.sh'"
# test runs the test suite
test:: subnet tools
@go list ./... | grep -v -E '^github.com/hashicorp/serf/(vendor|cmd/serf/vendor)' | xargs -n1 go test $(TESTARGS)
# testrace runs the race checker
testrace:: subnet
go test -race `govendor list -no-status +local` $(TESTARGS)
tools::
@which gox 2>/dev/null ; if [ $$? -eq 1 ]; then \
$(MAKE) get-tools; \
fi
# updatedeps installs all the dependencies needed to test, build, and run
updatedeps:: tools
govendor list -no-status +vendor | xargs -n1 go get -u
govendor update +vendor
vet:: tools
@echo "--> Running go tool vet $(VETARGS) ."
@govendor list -no-status +local \
| cut -d '/' -f 4- \
| xargs -n1 \
go tool vet $(VETARGS) ;\
if [ $$? -ne 0 ]; then \
echo ""; \
echo "Vet found suspicious constructs. Please check the reported constructs"; \
echo "and fix them if necessary before submitting the code for reviewal."; \
fi
.PHONY: default bin cov dev dist get-tools subnet test testrace tools updatedeps vet
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment