Commit 2ee7e26d authored by Marshall Brekka's avatar Marshall Brekka Committed by Jeff Mitchell
Browse files

Add a TTL to the dynamodb lock implementation. (#2141)

parent 7c4e5a77
Showing with 289 additions and 82 deletions
+289 -82
......@@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-uuid"
)
const (
......@@ -48,6 +49,12 @@ const (
// List operations.
DynamoDBLockPrefix = "_"
// The lock TTL matches the default that Consul API uses, 15 seconds.
DynamoDBLockTTL = 15 * time.Second
// The amount of time to wait between the lock renewals
DynamoDBLockRenewInterval = 5 * time.Second
// DynamoDBLockRetryInterval is the amount of time to wait
// if a lock fails before trying again.
DynamoDBLockRetryInterval = time.Second
......@@ -84,9 +91,22 @@ type DynamoDBRecord struct {
type DynamoDBLock struct {
backend *DynamoDBBackend
value, key string
identity string
held bool
lock sync.Mutex
recovery bool
// Allow modifying the Lock durations for ease of unit testing.
renewInterval time.Duration
ttl time.Duration
watchRetryInterval time.Duration
}
type DynamoDBLockRecord struct {
Path string
Key string
Value []byte
Identity []byte
Expires int64
}
// newDynamoDBBackend constructs a DynamoDB backend. If the
......@@ -360,11 +380,19 @@ func (d *DynamoDBBackend) List(prefix string) ([]string, error) {
// LockWith is used for mutual exclusion based on the given key.
func (d *DynamoDBBackend) LockWith(key, value string) (Lock, error) {
identity, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
return &DynamoDBLock{
backend: d,
key: filepath.Join(filepath.Dir(key), DynamoDBLockPrefix+filepath.Base(key)),
value: value,
recovery: d.recovery,
backend: d,
key: filepath.Join(filepath.Dir(key), DynamoDBLockPrefix+filepath.Base(key)),
value: value,
identity: identity,
recovery: d.recovery,
renewInterval: DynamoDBLockRenewInterval,
ttl: DynamoDBLockTTL,
watchRetryInterval: DynamoDBWatchRetryInterval,
}, nil
}
......@@ -426,9 +454,10 @@ func (l *DynamoDBLock) Lock(stopCh <-chan struct{}) (doneCh <-chan struct{}, ret
select {
case <-success:
l.held = true
// after acquiring it successfully, we must watch
// the lock in order to close the leader channel
// after acquiring it successfully, we must renew the lock periodically,
// and watch the lock in order to close the leader channel
// once it is lost.
go l.periodicallyRenewLock(leader)
go l.watch(leader)
case retErr = <-errors:
close(stop)
......@@ -481,57 +510,88 @@ func (l *DynamoDBLock) Value() (bool, string, error) {
func (l *DynamoDBLock) tryToLock(stop, success chan struct{}, errors chan error) {
ticker := time.NewTicker(DynamoDBLockRetryInterval)
record := DynamoDBRecord{
Path: recordPathForVaultKey(l.key),
Key: recordKeyForVaultKey(l.key),
Value: []byte(l.value),
}
item, err := dynamodbattribute.ConvertToMap(record)
if err != nil {
errors <- err
return
}
for {
select {
case <-stop:
ticker.Stop()
case <-ticker.C:
_, err := l.backend.client.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(l.backend.table),
Item: item,
ConditionExpression: aws.String("attribute_not_exists(#p) or attribute_not_exists(#k)"),
ExpressionAttributeNames: map[string]*string{
"#p": aws.String("Path"),
"#k": aws.String("Key"),
},
})
err := l.writeItem()
if err != nil {
if err, ok := err.(awserr.Error); ok && err.Code() != "ConditionalCheckFailedException" {
errors <- err
}
if l.recovery {
_, err := l.backend.client.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(l.backend.table),
Key: map[string]*dynamodb.AttributeValue{
"Path": {S: aws.String(record.Path)},
"Key": {S: aws.String(record.Key)},
},
})
if err != nil {
errors <- fmt.Errorf("could not delete lock record: %s", err)
} else {
l.recovery = false
if err, ok := err.(awserr.Error); ok {
// Don't report a condition check failure, this means that the lock
// is already being held.
if err.Code() != dynamodb.ErrCodeConditionalCheckFailedException {
errors <- err
}
} else {
// Its not an AWS error, and is probably not transient, bail out.
errors <- err
return
}
} else {
ticker.Stop()
close(success)
return
}
}
}
}
func (l *DynamoDBLock) periodicallyRenewLock(done chan struct{}) {
ticker := time.NewTicker(l.renewInterval)
for {
select {
case <-ticker.C:
l.writeItem()
case <-done:
ticker.Stop()
return
}
}
}
// Attempts to put/update the dynamodb item using condition expressions to
// evaluate the TTL.
func (l *DynamoDBLock) writeItem() error {
now := time.Now()
_, err := l.backend.client.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String(l.backend.table),
Key: map[string]*dynamodb.AttributeValue{
"Path": &dynamodb.AttributeValue{S: aws.String(recordPathForVaultKey(l.key))},
"Key": &dynamodb.AttributeValue{S: aws.String(recordKeyForVaultKey(l.key))},
},
UpdateExpression: aws.String("SET #value=:value, #identity=:identity, #expires=:expires"),
// If both key and path already exist, we can only write if
// A. identity is equal to our identity (or the identity doesn't exist)
// or
// B. The ttl on the item is <= to the current time
ConditionExpression: aws.String(
"attribute_not_exists(#path) or " +
"attribute_not_exists(#key) or " +
// To work when upgrading from older versions that did not include the
// Identity attribute, we first check if the attr doesn't exist, and if
// it does, then we check if the identity is equal to our own.
"(attribute_not_exists(#identity) or #identity = :identity) or " +
"#expires <= :now",
),
ExpressionAttributeNames: map[string]*string{
"#path": aws.String("Path"),
"#key": aws.String("Key"),
"#identity": aws.String("Identity"),
"#expires": aws.String("Expires"),
"#value": aws.String("Value"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":identity": &dynamodb.AttributeValue{B: []byte(l.identity)},
":value": &dynamodb.AttributeValue{B: []byte(l.value)},
":now": &dynamodb.AttributeValue{N: aws.String(strconv.FormatInt(now.UnixNano(), 10))},
":expires": &dynamodb.AttributeValue{N: aws.String(strconv.FormatInt(now.Add(l.ttl).UnixNano(), 10))},
},
})
return err
}
// watch checks whether the lock has changed in the
// DynamoDB table and closes the leader channel if so.
// The interval is set by `DynamoDBWatchRetryInterval`.
......@@ -541,12 +601,19 @@ func (l *DynamoDBLock) tryToLock(stop, success chan struct{}, errors chan error)
func (l *DynamoDBLock) watch(lost chan struct{}) {
retries := DynamoDBWatchRetryMax
ticker := time.NewTicker(DynamoDBWatchRetryInterval)
ticker := time.NewTicker(l.watchRetryInterval)
WatchLoop:
for {
select {
case <-ticker.C:
item, err := l.backend.Get(l.key)
resp, err := l.backend.client.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(l.backend.table),
ConsistentRead: aws.Bool(true),
Key: map[string]*dynamodb.AttributeValue{
"Path": {S: aws.String(recordPathForVaultKey(l.key))},
"Key": {S: aws.String(recordKeyForVaultKey(l.key))},
},
})
if err != nil {
retries--
if retries == 0 {
......@@ -555,7 +622,12 @@ WatchLoop:
continue
}
if item == nil || string(item.Value) != l.value {
if resp == nil {
break WatchLoop
}
record := &DynamoDBLockRecord{}
err = dynamodbattribute.UnmarshalMap(resp.Item, record)
if err != nil || string(record.Identity) != l.identity {
break WatchLoop
}
}
......
......@@ -3,12 +3,14 @@ package physical
import (
"fmt"
"math/rand"
"net/http"
"os"
"testing"
"time"
"github.com/hashicorp/vault/helper/logformat"
log "github.com/mgutz/logxi/v1"
dockertest "gopkg.in/ory-am/dockertest.v3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
......@@ -17,26 +19,21 @@ import (
)
func TestDynamoDBBackend(t *testing.T) {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" || os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
t.SkipNow()
}
cleanup, endpoint, credsProvider := prepareDynamoDBTestContainer(t)
defer cleanup()
creds, err := credentials.NewEnvCredentials().Get()
creds, err := credsProvider.Get()
if err != nil {
t.Fatalf("err: %v", err)
}
// If the variable is empty or doesn't exist, the default
// AWS endpoints will be used
endpoint := os.Getenv("AWS_DYNAMODB_ENDPOINT")
region := os.Getenv("AWS_DEFAULT_REGION")
if region == "" {
region = "us-east-1"
}
conn := dynamodb.New(session.New(&aws.Config{
Credentials: credentials.NewEnvCredentials(),
Credentials: credsProvider,
Endpoint: aws.String(endpoint),
Region: aws.String(region),
}))
......@@ -57,6 +54,8 @@ func TestDynamoDBBackend(t *testing.T) {
"secret_key": creds.SecretAccessKey,
"session_token": creds.SessionToken,
"table": table,
"region": region,
"endpoint": endpoint,
})
if err != nil {
t.Fatalf("err: %s", err)
......@@ -67,26 +66,21 @@ func TestDynamoDBBackend(t *testing.T) {
}
func TestDynamoDBHABackend(t *testing.T) {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" || os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
t.SkipNow()
}
cleanup, endpoint, credsProvider := prepareDynamoDBTestContainer(t)
defer cleanup()
creds, err := credentials.NewEnvCredentials().Get()
creds, err := credsProvider.Get()
if err != nil {
t.Fatalf("err: %v", err)
}
// If the variable is empty or doesn't exist, the default
// AWS endpoints will be used
endpoint := os.Getenv("AWS_DYNAMODB_ENDPOINT")
region := os.Getenv("AWS_DEFAULT_REGION")
if region == "" {
region = "us-east-1"
}
conn := dynamodb.New(session.New(&aws.Config{
Credentials: credentials.NewEnvCredentials(),
Credentials: credsProvider,
Endpoint: aws.String(endpoint),
Region: aws.String(region),
}))
......@@ -106,6 +100,8 @@ func TestDynamoDBHABackend(t *testing.T) {
"secret_key": creds.SecretAccessKey,
"session_token": creds.SessionToken,
"table": table,
"region": region,
"endpoint": endpoint,
})
if err != nil {
t.Fatalf("err: %s", err)
......@@ -116,4 +112,155 @@ func TestDynamoDBHABackend(t *testing.T) {
t.Fatalf("dynamodb does not implement HABackend")
}
testHABackend(t, ha, ha)
testDynamoDBLockTTL(t, ha)
}
// Similar to testHABackend, but using internal implementation details to
// trigger the lock failure scenario by setting the lock renew period for one
// of the locks to a higher value than the lock TTL.
func testDynamoDBLockTTL(t *testing.T, ha HABackend) {
// Set much smaller lock times to speed up the test.
lockTTL := time.Second * 3
renewInterval := time.Second * 1
watchInterval := time.Second * 1
// Get the lock
origLock, err := ha.LockWith("dynamodbttl", "bar")
if err != nil {
t.Fatalf("err: %v", err)
}
// set the first lock renew period to double the expected TTL.
lock := origLock.(*DynamoDBLock)
lock.renewInterval = lockTTL * 2
lock.ttl = lockTTL
lock.watchRetryInterval = watchInterval
// Attempt to lock
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("failed to get leader ch")
}
// Check the value
held, val, err := lock.Value()
if err != nil {
t.Fatalf("err: %v", err)
}
if !held {
t.Fatalf("should be held")
}
if val != "bar" {
t.Fatalf("bad value: %v", err)
}
// Second acquisition should succeed because the first lock should
// not renew within the 3 sec TTL.
origLock2, err := ha.LockWith("dynamodbttl", "baz")
if err != nil {
t.Fatalf("err: %v", err)
}
lock2 := origLock2.(*DynamoDBLock)
lock2.renewInterval = renewInterval
lock2.ttl = lockTTL
lock2.watchRetryInterval = watchInterval
// Cancel attempt in 6 sec so as not to block unit tests forever
stopCh := make(chan struct{})
time.AfterFunc(lockTTL*2, func() {
close(stopCh)
})
// Attempt to lock should work
leaderCh2, err := lock2.Lock(stopCh)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh2 == nil {
t.Fatalf("should get leader ch")
}
// Check the value
held, val, err = lock2.Value()
if err != nil {
t.Fatalf("err: %v", err)
}
if !held {
t.Fatalf("should be held")
}
if val != "baz" {
t.Fatalf("bad value: %v", err)
}
// The first lock should have lost the leader channel
leaderChClosed := false
blocking := make(chan struct{})
time.AfterFunc(watchInterval*3, func() {
close(blocking)
})
// Attempt to read from the leader or the blocking channel, which ever one
// happens first.
go func() {
select {
case <-leaderCh:
leaderChClosed = true
close(blocking)
case <-blocking:
return
}
}()
<-blocking
if !leaderChClosed {
t.Fatalf("original lock did not have its leader channel closed.")
}
// Cleanup
lock2.Unlock()
}
func prepareDynamoDBTestContainer(t *testing.T) (cleanup func(), retAddress string, creds *credentials.Credentials) {
// If environment variable is set, assume caller wants to target a real
// DynamoDB.
if os.Getenv("AWS_DYNAMODB_ENDPOINT") != "" {
return func() {}, os.Getenv("AWS_DYNAMODB_ENDPOINT"), credentials.NewEnvCredentials()
}
pool, err := dockertest.NewPool("")
if err != nil {
t.Fatalf("Failed to connect to docker: %s", err)
}
resource, err := pool.Run("deangiberson/aws-dynamodb-local", "latest", []string{})
if err != nil {
t.Fatalf("Could not start local DynamoDB: %s", err)
}
retAddress = "http://localhost:" + resource.GetPort("8000/tcp")
cleanup = func() {
err := pool.Purge(resource)
if err != nil {
t.Fatalf("Failed to cleanup local DynamoDB: %s", err)
}
}
// exponential backoff-retry, because the DynamoDB may not be able to accept
// connections yet
if err := pool.Retry(func() error {
var err error
resp, err := http.Get(retAddress)
if err != nil {
return err
}
if resp.StatusCode != 400 {
return fmt.Errorf("Expected DynamoDB to return status code 400, got (%s) instead.", resp.Status)
}
return nil
}); err != nil {
t.Fatalf("Could not connect to docker: %s", err)
}
return cleanup, retAddress, credentials.NewStaticCredentials("fake", "fake", "")
}
......@@ -554,11 +554,10 @@ ACL check.
#### Backend Reference: DynamoDB (Community-Supported)
The DynamoDB optionally supports HA. Because Dynamo does not support session
lifetimes on its locks, a Vault node that has failed, rather than shut down in
an orderly fashion, will require manual cleanup rather than failing over
automatically. See the documentation of `recovery_mode` to better understand
this process. To enable HA, set the `ha_enabled` option.
DynamoDB optionally supports HA. Because Dynamo uses the time on the Vault
node to implement the session lifetimes on its locks, significant clock skew
on the Vault nodes could cause contention issues on the lock.
To enable HA, set the `ha_enabled` option.
The DynamoDB backend has the following options:
......@@ -599,21 +598,10 @@ The DynamoDB backend has the following options:
DynamoDB. Defaults to `"128"`.
* `ha_enabled` (optional) - Setting this to `"1"`, `"t"`, or `"true"` will
enable HA mode. Please ensure you have read the documentation for the
`recovery_mode` option before enabling this. This option can also be
provided via the environment variable `DYNAMODB_HA_ENABLED`. If you are
upgrading from a version of Vault where HA support was enabled by default,
it is _very important_ that you set this parameter _before_ upgrading!
* `recovery_mode` (optional) - When the Vault leader crashes or is killed
without being able to shut down properly, no other node can become the new
leader because the DynamoDB table still holds the old leader's lock record.
To recover from this situation, one can start a single Vault node with this
option set to `"1"`, `"t"`, or `"true"` and the node will remove the old
lock from DynamoDB. It is important that only one node is running in
recovery mode! After this node has become the leader, other nodes can be
started with regular configuration. This option can also be provided via
the environment variable `RECOVERY_MODE`.
enable HA mode. This option can also be provided via the environment
variable `DYNAMODB_HA_ENABLED`. If you are upgrading from a version of
Vault where HA support was enabled by default, it is _very important_
that you set this parameter _before_ upgrading!
For more information about the read/write capacity of DynamoDB tables, see the
[official AWS DynamoDB
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment