Commit 0ad63293 authored by Seth Hoenig's avatar Seth Hoenig
Browse files

ci: purge consul/sdk in favor of nomad/sdk

This PR substitutes consul/sdk for nomad/sdk.

Major difference in the implementation of freeport, where the
new version is based on ephemoral ports instead of pre-allocated
port blocks.
parent 334c2583
Showing with 49 additions and 826 deletions
+49 -826
......@@ -9,7 +9,6 @@ import (
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
)
......@@ -63,8 +62,7 @@ func TestDanglingContainerRemoval(t *testing.T) {
testutil.DockerCompatible(t)
// start two containers: one tracked nomad container, and one unrelated container
task, cfg, ports := dockerTask(t)
defer freeport.Return(ports)
task, cfg, _ := dockerTask(t)
require.NoError(t, task.EncodeConcreteDriverConfig(cfg))
client, d, handle, cleanup := dockerSetup(t, task, nil)
......@@ -166,8 +164,7 @@ func TestDanglingContainerRemoval_Stopped(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
_, cfg, ports := dockerTask(t)
defer freeport.Return(ports)
_, cfg, _ := dockerTask(t)
client := newTestDockerClient(t)
container, err := client.CreateContainer(docker.CreateContainerOptions{
......
......@@ -10,8 +10,11 @@ replace (
github.com/hashicorp/hcl => github.com/hashicorp/hcl v1.0.1-0.20201016140508-a07e7d50bbee
)
// Nomad is built using the current source of the API module
replace github.com/hashicorp/nomad/api => ./api
// Nomad is built using the current source of the API, SDK modules
replace (
github.com/hashicorp/nomad/api => ./api
github.com/hashicorp/nomad/sdk => ./sdk
)
require (
github.com/LK4D4/joincontext v0.0.0-20171026170139-1724345da6d5
......@@ -45,7 +48,6 @@ require (
github.com/hashicorp/consul v1.7.8
github.com/hashicorp/consul-template v0.28.1-0.20220406001259-e710909d8054
github.com/hashicorp/consul/api v1.12.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/cronexpr v1.1.1
github.com/hashicorp/go-bexpr v0.1.11
github.com/hashicorp/go-checkpoint v0.0.0-20171009173528-1545e56e46de
......@@ -77,6 +79,7 @@ require (
github.com/hashicorp/memberlist v0.3.1
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/nomad/api v0.0.0-20220401211553-29eff9ab2a92
github.com/hashicorp/nomad/sdk v0.0.0-00010101000000-000000000000
github.com/hashicorp/raft v1.3.5
github.com/hashicorp/raft-boltdb/v2 v2.2.0
github.com/hashicorp/serf v0.9.7
......@@ -125,7 +128,7 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
gopkg.in/tomb.v2 v2.0.0-20140626144623-14b3d72120e8
oss.indeed.com/go/libtime v1.5.0
oss.indeed.com/go/libtime v1.6.0
)
require (
......
......@@ -756,11 +756,11 @@ github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1 h1:SMGUnbpAcat8rIKHkBPjf
github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1/go.mod h1:Ch/bf00Qnx77MZd49JRgHYqHQjtEmTgGU2faufpVZb0=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.1/go.mod h1:gKOamz3EwoIoJq7mlMIRBpVTAUn8qPCrEclOKKWhD3U=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 h1:kes8mmyCpxJsI7FTwtzRqEy9CdjCtrXrXGuOpxEA7Ts=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 h1:kes8mmyCpxJsI7FTwtzRqEy9CdjCtrXrXGuOpxEA7Ts=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.1 h1:Yc026VyMyIpq1UWRnakHRG01U8fJm+nEfEmjoAb00n8=
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.1/go.mod h1:l8slYwnJA26yBz+ErHpp2IRCLr0vuOMGBORIz4rRiAs=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 h1:kes8mmyCpxJsI7FTwtzRqEy9CdjCtrXrXGuOpxEA7Ts=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
......@@ -1938,8 +1938,8 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
oss.indeed.com/go/libtime v1.5.0 h1:wulKS+oHhb3P2wFi1fcA+g8CXiC8+ygFECUQea5ZqLU=
oss.indeed.com/go/libtime v1.5.0/go.mod h1:B2sdEcuzB0zhTKkAuHy4JInKRc7Al3tME4qWam6R7mA=
oss.indeed.com/go/libtime v1.6.0 h1:XQyczJihse/wQGo59OfPF3f4f+Sywv4R8vdGB3S9BfU=
oss.indeed.com/go/libtime v1.6.0/go.mod h1:B2sdEcuzB0zhTKkAuHy4JInKRc7Al3tME4qWam6R7mA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
......
//go:build darwin
// +build darwin
package freeport
import (
"fmt"
"os/exec"
"regexp"
"strconv"
)
/*
$ sysctl net.inet.ip.portrange.first net.inet.ip.portrange.last
net.inet.ip.portrange.first: 49152
net.inet.ip.portrange.last: 65535
*/
const (
ephPortFirst = "net.inet.ip.portrange.first"
ephPortLast = "net.inet.ip.portrange.last"
command = "sysctl"
)
var ephPortRe = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s*$`)
func getEphemeralPortRange() (int, int, error) {
cmd := exec.Command(command, "-n", ephPortFirst, ephPortLast)
out, err := cmd.Output()
if err != nil {
return 0, 0, err
}
val := string(out)
m := ephPortRe.FindStringSubmatch(val)
if m != nil {
min, err1 := strconv.Atoi(m[1])
max, err2 := strconv.Atoi(m[2])
if err1 == nil && err2 == nil {
return min, max, nil
}
}
return 0, 0, fmt.Errorf("unexpected sysctl value %q for keys %q %q", val, ephPortFirst, ephPortLast)
}
//go:build darwin
// +build darwin
package freeport
import (
"testing"
)
func TestGetEphemeralPortRange(t *testing.T) {
min, max, err := getEphemeralPortRange()
if err != nil {
t.Fatalf("err: %v", err)
}
if min <= 0 || max <= 0 || min > max {
t.Fatalf("unexpected values: min=%d, max=%d", min, max)
}
t.Logf("min=%d, max=%d", min, max)
}
//go:build freebsd
// +build freebsd
package freeport
import (
"fmt"
"os/exec"
"regexp"
"strconv"
)
/*
$ sysctl net.inet.ip.portrange.first net.inet.ip.portrange.last
net.inet.ip.portrange.first: 49152
net.inet.ip.portrange.last: 65535
*/
const (
ephPortFirst = "net.inet.ip.portrange.first"
ephPortLast = "net.inet.ip.portrange.last"
command = "sysctl"
)
var ephPortRe = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s*$`)
func getEphemeralPortRange() (int, int, error) {
cmd := exec.Command(command, "-n", ephPortFirst, ephPortLast)
out, err := cmd.Output()
if err != nil {
return 0, 0, err
}
val := string(out)
m := ephPortRe.FindStringSubmatch(val)
if m != nil {
min, err1 := strconv.Atoi(m[1])
max, err2 := strconv.Atoi(m[2])
if err1 == nil && err2 == nil {
return min, max, nil
}
}
return 0, 0, fmt.Errorf("unexpected sysctl value %q for keys %q %q", val, ephPortFirst, ephPortLast)
}
//go:build linux
// +build linux
package freeport
import (
"fmt"
"os/exec"
"regexp"
"strconv"
)
/*
$ sysctl -n net.ipv4.ip_local_port_range
32768 60999
*/
const ephemeralPortRangeSysctlKey = "net.ipv4.ip_local_port_range"
var ephemeralPortRangePatt = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s*$`)
func getEphemeralPortRange() (int, int, error) {
cmd := exec.Command("sysctl", "-n", ephemeralPortRangeSysctlKey)
out, err := cmd.Output()
if err != nil {
return 0, 0, err
}
val := string(out)
m := ephemeralPortRangePatt.FindStringSubmatch(val)
if m != nil {
min, err1 := strconv.Atoi(m[1])
max, err2 := strconv.Atoi(m[2])
if err1 == nil && err2 == nil {
return min, max, nil
}
}
return 0, 0, fmt.Errorf("unexpected sysctl value %q for key %q", val, ephemeralPortRangeSysctlKey)
}
//go:build linux
// +build linux
package freeport
import (
"testing"
)
func TestGetEphemeralPortRange(t *testing.T) {
min, max, err := getEphemeralPortRange()
if err != nil {
t.Fatalf("err: %v", err)
}
if min <= 0 || max <= 0 || min > max {
t.Fatalf("unexpected values: min=%d, max=%d", min, max)
}
t.Logf("min=%d, max=%d", min, max)
}
//go:build windows
// +build windows
package freeport
// For now we hard-code the Windows ephemeral port range, which is documented by
// Microsoft to be in this range for Vista / Server 2008 and newer.
//
// https://support.microsoft.com/en-us/help/832017/service-overview-and-network-port-requirements-for-windows
func getEphemeralPortRange() (int, int, error) {
return 49152, 65535, nil
}
// Copied from github.com/hashicorp/consul/sdk/freeport
//
// and tweaked for use by Nomad.
package freeport
import (
"container/list"
"fmt"
"math/rand"
"net"
"os"
"runtime"
"sync"
"time"
)
// todo(shoenig)
// There is a conflict between this copy of the updated sdk/freeport package
// and the lib/freeport package that is vendored as of nomad v0.10.x, which
// means we need to be careful to avoid the ports that transitive dependency
// is going to use (i.e. 10,000+). For now, we use the 9XXX port range with
// small blocks which means some tests will have to wait, and we need to be
// very careful not to leak ports.
const (
// blockSize is the size of the allocated port block. ports are given out
// consecutively from that block and after that point in a LRU fashion.
// blockSize = 1500
blockSize = 100 // todo(shoenig) revert once consul dependency is updated
// maxBlocks is the number of available port blocks before exclusions.
// maxBlocks = 30
maxBlocks = 10 // todo(shoenig) revert once consul dependency is updated
// lowPort is the lowest port number that should be used.
// lowPort = 10000
lowPort = 9000 // todo(shoenig) revert once consul dependency is updated
// attempts is how often we try to allocate a port block
// before giving up.
attempts = 10
)
var (
// effectiveMaxBlocks is the number of available port blocks.
// lowPort + effectiveMaxBlocks * blockSize must be less than 65535.
effectiveMaxBlocks int
// firstPort is the first port of the allocated block.
firstPort int
// lockLn is the system-wide mutex for the port block.
lockLn net.Listener
// mu guards:
// - pendingPorts
// - freePorts
// - total
mu sync.Mutex
// once is used to do the initialization on the first call to retrieve free
// ports
once sync.Once
// condNotEmpty is a condition variable to wait for freePorts to be not
// empty. Linked to 'mu'
condNotEmpty *sync.Cond
// freePorts is a FIFO of all currently free ports. Take from the front,
// and return to the back.
freePorts *list.List
// pendingPorts is a FIFO of recently freed ports that have not yet passed
// the not-in-use check.
pendingPorts *list.List
// total is the total number of available ports in the block for use.
total int
)
// initialize is used to initialize freeport.
func initialize() {
var err error
effectiveMaxBlocks, err = adjustMaxBlocks()
if err != nil {
panic("freeport: ephemeral port range detection failed: " + err.Error())
}
if effectiveMaxBlocks < 0 {
panic("freeport: no blocks of ports available outside of ephemeral range")
}
if lowPort+effectiveMaxBlocks*blockSize > 65535 {
panic("freeport: block size too big or too many blocks requested")
}
rand.Seed(time.Now().UnixNano())
firstPort, lockLn = alloc()
condNotEmpty = sync.NewCond(&mu)
freePorts = list.New()
pendingPorts = list.New()
// fill with all available free ports
for port := firstPort + 1; port < firstPort+blockSize; port++ {
if used := isPortInUse(port); !used {
freePorts.PushBack(port)
}
}
total = freePorts.Len()
go checkFreedPorts()
}
func checkFreedPorts() {
ticker := time.NewTicker(250 * time.Millisecond)
for {
<-ticker.C
checkFreedPortsOnce()
}
}
func checkFreedPortsOnce() {
mu.Lock()
defer mu.Unlock()
pending := pendingPorts.Len()
remove := make([]*list.Element, 0, pending)
for elem := pendingPorts.Front(); elem != nil; elem = elem.Next() {
port := elem.Value.(int)
if used := isPortInUse(port); !used {
freePorts.PushBack(port)
remove = append(remove, elem)
}
}
retained := pending - len(remove)
if retained > 0 {
logf("WARN", "%d out of %d pending ports are still in use; something probably didn't wait around for the port to be closed!", retained, pending)
}
if len(remove) == 0 {
return
}
for _, elem := range remove {
pendingPorts.Remove(elem)
}
condNotEmpty.Broadcast()
}
// adjustMaxBlocks avoids having the allocation ranges overlap the ephemeral
// port range.
func adjustMaxBlocks() (int, error) {
ephemeralPortMin, ephemeralPortMax, err := getEphemeralPortRange()
if err != nil {
return 0, err
}
if ephemeralPortMin <= 0 || ephemeralPortMax <= 0 {
logf("INFO", "ephemeral port range detection not configured for GOOS=%q", runtime.GOOS)
return maxBlocks, nil
}
logf("INFO", "detected ephemeral port range of [%d, %d]", ephemeralPortMin, ephemeralPortMax)
for block := 0; block < maxBlocks; block++ {
min := lowPort + block*blockSize
max := min + blockSize
overlap := intervalOverlap(min, max-1, ephemeralPortMin, ephemeralPortMax)
if overlap {
logf("INFO", "reducing max blocks from %d to %d to avoid the ephemeral port range", maxBlocks, block)
return block, nil
}
}
return maxBlocks, nil
}
// alloc reserves a port block for exclusive use for the lifetime of the
// application. lockLn serves as a system-wide mutex for the port block and is
// implemented as a TCP listener which is bound to the firstPort and which will
// be automatically released when the application terminates.
func alloc() (int, net.Listener) {
for i := 0; i < attempts; i++ {
block := int(rand.Int31n(int32(effectiveMaxBlocks)))
firstPort := lowPort + block*blockSize
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", firstPort))
if err != nil {
continue
}
// logf("DEBUG", "allocated port block %d (%d-%d)", block, firstPort, firstPort+blockSize-1)
return firstPort, ln
}
panic("freeport: cannot allocate port block")
}
// MustTake is the same as Take except it panics on error.
func MustTake(n int) (ports []int) {
ports, err := Take(n)
if err != nil {
panic(err)
}
return ports
}
// Take returns a list of free ports from the allocated port block. It is safe
// to call this method concurrently. Ports have been tested to be available on
// 127.0.0.1 TCP but there is no guarantee that they will remain free in the
// future.
func Take(n int) (ports []int, err error) {
if n <= 0 {
return nil, fmt.Errorf("freeport: cannot take %d ports", n)
}
mu.Lock()
defer mu.Unlock()
// Reserve a port block
once.Do(initialize)
if n > total {
return nil, fmt.Errorf("freeport: block size too small")
}
for len(ports) < n {
for freePorts.Len() == 0 {
if total == 0 {
return nil, fmt.Errorf("freeport: impossible to satisfy request; there are no actual free ports in the block anymore")
}
condNotEmpty.Wait()
}
elem := freePorts.Front()
freePorts.Remove(elem)
port := elem.Value.(int)
if used := isPortInUse(port); used {
// Something outside of the test suite has stolen this port, possibly
// due to assignment to an ephemeral port, remove it completely.
logf("WARN", "leaked port %d due to theft; removing from circulation", port)
total--
continue
}
ports = append(ports, port)
}
// logf("DEBUG", "free ports: %v", ports)
return ports, nil
}
// Return returns a block of ports back to the general pool. These ports should
// have been returned from a call to Take().
func Return(ports []int) {
if len(ports) == 0 {
return // convenience short circuit for test ergonomics
}
mu.Lock()
defer mu.Unlock()
for _, port := range ports {
if port > firstPort && port < firstPort+blockSize {
pendingPorts.PushBack(port)
}
}
}
func isPortInUse(port int) bool {
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", port))
if err != nil {
return true
}
_ = ln.Close()
return false
}
func tcpAddr(ip string, port int) *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP(ip), Port: port}
}
// intervalOverlap returns true if the doubly-inclusive integer intervals
// represented by [min1, max1] and [min2, max2] overlap.
func intervalOverlap(min1, max1, min2, max2 int) bool {
if min1 > max1 {
logf("WARN", "interval1 is not ordered [%d, %d]", min1, max1)
return false
}
if min2 > max2 {
logf("WARN", "interval2 is not ordered [%d, %d]", min2, max2)
return false
}
return min1 <= max2 && min2 <= max1
}
func logf(severity string, format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, "["+severity+"] freeport: "+format+"\n", a...)
}
package freeport
import (
"fmt"
"io"
"net"
"sync"
"testing"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
// reset will reverse the setup from initialize() and then redo it (for tests)
func reset() {
mu.Lock()
defer mu.Unlock()
logf("INFO", "resetting the freeport package state")
effectiveMaxBlocks = 0
firstPort = 0
if lockLn != nil {
lockLn.Close()
lockLn = nil
}
once = sync.Once{}
freePorts = nil
pendingPorts = nil
total = 0
}
// peekFree returns the next port that will be returned by Take to aid in testing.
func peekFree() int {
mu.Lock()
defer mu.Unlock()
return freePorts.Front().Value.(int)
}
// peekAllFree returns all free ports that could be returned by Take to aid in testing.
func peekAllFree() []int {
mu.Lock()
defer mu.Unlock()
var out []int
for elem := freePorts.Front(); elem != nil; elem = elem.Next() {
port := elem.Value.(int)
out = append(out, port)
}
return out
}
// stats returns diagnostic data to aid in testing
func stats() (numTotal, numPending, numFree int) {
mu.Lock()
defer mu.Unlock()
return total, pendingPorts.Len(), freePorts.Len()
}
func TestTakeReturn(t *testing.T) {
// NOTE: for global var reasons this cannot execute in parallel
// ci.Parallel(t)
// Since this test is destructive (i.e. it leaks all ports) it means that
// any other test cases in this package will not function after it runs. To
// help out we reset the global state after we run this test.
defer reset()
// OK: do a simple take/return cycle to trigger the package initialization
func() {
ports, err := Take(1)
if err != nil {
t.Fatalf("err: %v", err)
}
defer Return(ports)
if len(ports) != 1 {
t.Fatalf("expected %d but got %d ports", 1, len(ports))
}
}()
waitForStatsReset := func() (numTotal int) {
t.Helper()
numTotal, numPending, numFree := stats()
if numTotal != numFree+numPending {
t.Fatalf("expected total (%d) and free+pending (%d) ports to match", numTotal, numFree+numPending)
}
retry.Run(t, func(r *retry.R) {
numTotal, numPending, numFree = stats()
if numPending != 0 {
r.Fatalf("pending is still non zero: %d", numPending)
}
if numTotal != numFree {
r.Fatalf("total (%d) does not equal free (%d)", numTotal, numFree)
}
})
return numTotal
}
// Reset
numTotal := waitForStatsReset()
// --------------------
// OK: take the max
func() {
ports, err := Take(numTotal)
if err != nil {
t.Fatalf("err: %v", err)
}
defer Return(ports)
if len(ports) != numTotal {
t.Fatalf("expected %d but got %d ports", numTotal, len(ports))
}
}()
// Reset
numTotal = waitForStatsReset()
expectError := func(expected string, got error) {
t.Helper()
if got == nil {
t.Fatalf("expected error but was nil")
}
if got.Error() != expected {
t.Fatalf("expected error %q but got %q", expected, got.Error())
}
}
// --------------------
// ERROR: take too many ports
func() {
ports, err := Take(numTotal + 1)
defer Return(ports)
expectError("freeport: block size too small", err)
}()
// --------------------
// ERROR: invalid ports request (negative)
func() {
_, err := Take(-1)
expectError("freeport: cannot take -1 ports", err)
}()
// --------------------
// ERROR: invalid ports request (zero)
func() {
_, err := Take(0)
expectError("freeport: cannot take 0 ports", err)
}()
// --------------------
// OK: Steal a port under the covers and let freeport detect the theft and compensate
leakedPort := peekFree()
func() {
leakyListener, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", leakedPort))
if err != nil {
t.Fatalf("err: %v", err)
}
defer leakyListener.Close()
func() {
ports, err := Take(3)
if err != nil {
t.Fatalf("err: %v", err)
}
defer Return(ports)
if len(ports) != 3 {
t.Fatalf("expected %d but got %d ports", 3, len(ports))
}
for _, port := range ports {
if port == leakedPort {
t.Fatalf("did not expect for Take to return the leaked port")
}
}
}()
newNumTotal := waitForStatsReset()
if newNumTotal != numTotal-1 {
t.Fatalf("expected total to drop to %d but got %d", numTotal-1, newNumTotal)
}
numTotal = newNumTotal // update outer variable for later tests
}()
// --------------------
// OK: sequence it so that one Take must wait on another Take to Return.
func() {
mostPorts, err := Take(numTotal - 5)
if err != nil {
t.Fatalf("err: %v", err)
}
type reply struct {
ports []int
err error
}
ch := make(chan reply, 1)
go func() {
ports, err := Take(10)
ch <- reply{ports: ports, err: err}
}()
Return(mostPorts)
r := <-ch
if r.err != nil {
t.Fatalf("err: %v", r.err)
}
defer Return(r.ports)
if len(r.ports) != 10 {
t.Fatalf("expected %d ports but got %d", 10, len(r.ports))
}
}()
// Reset
numTotal = waitForStatsReset()
// --------------------
// ERROR: Now we end on the crazy "Ocean's 11" level port theft where we
// orchestrate a situation where all ports are stolen and we don't find out
// until Take.
func() {
// 1. Grab all of the ports.
allPorts := peekAllFree()
// 2. Leak all of the ports
leaked := make([]io.Closer, 0, len(allPorts))
defer func() {
for _, c := range leaked {
c.Close()
}
}()
for _, port := range allPorts {
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", port))
if err != nil {
t.Fatalf("err: %v", err)
}
leaked = append(leaked, ln)
}
// 3. Request 1 port which will detect the leaked ports and fail.
_, err := Take(1)
expectError("freeport: impossible to satisfy request; there are no actual free ports in the block anymore", err)
// 4. Wait for the block to zero out.
newNumTotal := waitForStatsReset()
if newNumTotal != 0 {
t.Fatalf("expected total to drop to %d but got %d", 0, newNumTotal)
}
}()
}
func TestIntervalOverlap(t *testing.T) {
cases := []struct {
min1, max1, min2, max2 int
overlap bool
}{
{0, 0, 0, 0, true},
{1, 1, 1, 1, true},
{1, 3, 1, 3, true}, // same
{1, 3, 4, 6, false}, // serial
{1, 4, 3, 6, true}, // inner overlap
{1, 6, 3, 4, true}, // nest
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%d:%d vs %d:%d", tc.min1, tc.max1, tc.min2, tc.max2), func(t *testing.T) {
if tc.overlap != intervalOverlap(tc.min1, tc.max1, tc.min2, tc.max2) { // 1 vs 2
t.Fatalf("expected %v but got %v", tc.overlap, !tc.overlap)
}
if tc.overlap != intervalOverlap(tc.min2, tc.max2, tc.min1, tc.max1) { // 2 vs 1
t.Fatalf("expected %v but got %v", tc.overlap, !tc.overlap)
}
})
}
}
......@@ -6,8 +6,8 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/sdk/portfree"
"github.com/stretchr/testify/require"
)
......@@ -18,20 +18,17 @@ func newTestPool(t *testing.T) *ConnPool {
}
func TestConnPool_ConnListener(t *testing.T) {
require := require.New(t)
port := portfree.New(t).GetOne()
ports := freeport.MustTake(1)
defer freeport.Return(ports)
addrStr := fmt.Sprintf("127.0.0.1:%d", ports[0])
addrStr := fmt.Sprintf("127.0.0.1:%d", port)
addr, err := net.ResolveTCPAddr("tcp", addrStr)
require.Nil(err)
require.Nil(t, err)
exitCh := make(chan struct{})
defer close(exitCh)
go func() {
ln, err := net.Listen("tcp", addrStr)
require.Nil(err)
require.Nil(t, err)
defer ln.Close()
conn, _ := ln.Accept()
defer conn.Close()
......@@ -50,7 +47,7 @@ func TestConnPool_ConnListener(t *testing.T) {
// Make an RPC
_, err = pool.acquire("test", addr)
require.Nil(err)
require.Nil(t, err)
// Assert we get a connection.
select {
......@@ -60,7 +57,7 @@ func TestConnPool_ConnListener(t *testing.T) {
}
// Test that the channel is closed when the pool shuts down.
require.Nil(pool.Shutdown())
require.Nil(t, pool.Shutdown())
_, ok := <-c
require.False(ok)
require.False(t, ok)
}
......@@ -5,15 +5,14 @@ import (
"crypto/rand"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
......@@ -125,8 +124,7 @@ func makeRaft(t *testing.T, dir string) (*raft.Raft, *MockFSM) {
}
func TestSnapshot(t *testing.T) {
dir := testutil.TempDir(t, "snapshot")
defer os.RemoveAll(dir)
dir := t.TempDir()
// Make a Raft and populate it with some data. We tee everything we
// apply off to a buffer for checking post-snapshot.
......@@ -149,7 +147,7 @@ func TestSnapshot(t *testing.T) {
}
// Take a snapshot.
logger := testutil.Logger(t)
logger := testlog.HCLogger(t)
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
......@@ -234,8 +232,7 @@ func TestSnapshot_BadVerify(t *testing.T) {
}
func TestSnapshot_TruncatedVerify(t *testing.T) {
dir := testutil.TempDir(t, "snapshot")
defer os.RemoveAll(dir)
dir := t.TempDir()
// Make a Raft and populate it with some data. We tee everything we
// apply off to a buffer for checking post-snapshot.
......@@ -257,10 +254,12 @@ func TestSnapshot_TruncatedVerify(t *testing.T) {
}
// Take a snapshot.
logger := testutil.Logger(t)
logger := testlog.HCLogger(t)
snap, err := New(logger, before)
require.NoError(t, err)
defer snap.Close()
t.Cleanup(func() {
_ = snap.Close()
})
var data []byte
{
......@@ -282,8 +281,7 @@ func TestSnapshot_TruncatedVerify(t *testing.T) {
}
func TestSnapshot_BadRestore(t *testing.T) {
dir := testutil.TempDir(t, "snapshot")
defer os.RemoveAll(dir)
dir := t.TempDir()
// Make a Raft and populate it with some data.
before, _ := makeRaft(t, filepath.Join(dir, "before"))
......@@ -300,7 +298,7 @@ func TestSnapshot_BadRestore(t *testing.T) {
}
// Take a snapshot.
logger := testutil.Logger(t)
logger := testlog.HCLogger(t)
snap, err := New(logger, before)
if err != nil {
t.Fatalf("err: %v", err)
......
......@@ -4,9 +4,9 @@ import (
"fmt"
"testing"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/sdk/retry"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
......
......@@ -3,9 +3,9 @@ package apitests
import (
"testing"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/sdk/retry"
"github.com/stretchr/testify/require"
)
......
......@@ -6,8 +6,8 @@ import (
"time"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/sdk/retry"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
......
......@@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
......@@ -17,6 +16,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/sdk/retry"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
......
......@@ -19,11 +19,11 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/sdk/portfree"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
......@@ -148,12 +148,11 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
ports := freeport.MustTake(1)
defer freeport.Return(ports)
port := portfree.New(t).GetOne()
// Try to remove a peer that's not there.
arg := structs.RaftPeerByAddressRequest{
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", port)),
}
arg.Region = s1.config.Region
var reply struct{}
......@@ -216,11 +215,9 @@ func TestOperator_RaftRemovePeerByAddress_ACL(t *testing.T) {
// Create ACL token
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
ports := freeport.MustTake(1)
defer freeport.Return(ports)
port := portfree.New(t).GetOne()
arg := structs.RaftPeerByAddressRequest{
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])),
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", port)),
}
arg.Region = s1.config.Region
......@@ -276,12 +273,10 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
ports := freeport.MustTake(1)
defer freeport.Return(ports)
// Add it manually to Raft.
{
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
port := portfree.New(t).GetOne()
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", port)), 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
......@@ -337,12 +332,10 @@ func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) {
}
arg.Region = s1.config.Region
ports := freeport.MustTake(1)
defer freeport.Return(ports)
// Add peer manually to Raft.
{
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", ports[0])), 0, 0)
port := portfree.New(t).GetOne()
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", port)), 0, 0)
assert.Nil(future.Error())
}
......
......@@ -9,12 +9,12 @@ import (
"time"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/sdk/portfree"
"github.com/hashicorp/nomad/version"
"github.com/stretchr/testify/require"
)
......@@ -110,7 +110,7 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
for i := 10; i >= 0; i-- {
// Get random ports, need to cleanup later
ports := freeport.MustTake(2)
ports := portfree.New(t).Get(2)
config.RPCAddr = &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
......@@ -131,8 +131,6 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
if err != nil {
ch <- fmt.Errorf("failed to shutdown server: %w", err)
}
freeport.Return(ports)
}()
select {
......@@ -145,12 +143,10 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
}
}, nil
} else if i == 0 {
freeport.Return(ports)
return nil, nil, err
} else {
if server != nil {
_ = server.Shutdown()
freeport.Return(ports)
}
wait := time.Duration(rand.Int31n(2000)) * time.Millisecond
time.Sleep(wait)
......
# sdk
Module `sdk` provides helpers for Nomad development.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment