Commit ea23462a authored by Michael Schurter's avatar Michael Schurter
Browse files

wip wrap boltdb to get path information

finished but doesn't handle deleting deeply nested buckets
parent 4c19733a
No related merge requests found
Showing with 392 additions and 98 deletions
+392 -98
......@@ -12,38 +12,36 @@ import (
)
type kvStore interface {
Path() []byte
Bucket(name []byte) kvStore
CreateBucket(key []byte) (kvStore, error)
CreateBucketIfNotExists(key []byte) (kvStore, error)
DeleteBucket(key []byte) error
Get(key []byte) (val []byte)
Put(key, val []byte) error
Writable() bool
}
// keyValueCodec handles encoding and decoding values from a key/value store
// such as boltdb.
type keyValueCodec struct {
// hashes maps keys to the hash of the last content written
hashes map[string][]byte
// hashes maps buckets to keys to the hash of the last content written:
// bucket -> key -> hash for example:
// allocations/1234 -> alloc -> abcd
// allocations/1234/redis -> task_state -> efff
hashes map[string]map[string][]byte
hashesLock sync.Mutex
}
func newKeyValueCodec() *keyValueCodec {
return &keyValueCodec{
hashes: make(map[string][]byte),
hashes: make(map[string]map[string][]byte),
}
}
// hashKey returns a unique key for each hashed boltdb value
func (c *keyValueCodec) hashKey(path string, key []byte) string {
return path + "-" + string(key)
}
// Put into kv store iff it has changed since the last write. A globally
// unique key is constructed for each value by concatinating the path and key
// passed in.
func (c *keyValueCodec) Put(bkt kvStore, path string, key []byte, val interface{}) error {
if !bkt.Writable() {
return fmt.Errorf("bucket must be writable")
}
func (c *keyValueCodec) Put(bkt kvStore, key []byte, val interface{}) error {
// buffer for writing serialized state to
var buf bytes.Buffer
......@@ -63,24 +61,34 @@ func (c *keyValueCodec) Put(bkt kvStore, path string, key []byte, val interface{
}
// If the hashes are equal, skip the write
hashPath := string(bkt.Path())
hashKey := string(key)
hashVal := h.Sum(nil)
hashKey := c.hashKey(path, key)
// lastHash value or nil if it hasn't been hashed yet
var lastHash []byte
c.hashesLock.Lock()
persistedHash := c.hashes[hashKey]
if hashBkt, ok := c.hashes[hashPath]; ok {
lastHash = hashBkt[hashKey]
} else {
// Create hash bucket
c.hashes[hashPath] = make(map[string][]byte, 2)
}
c.hashesLock.Unlock()
if bytes.Equal(hashVal, persistedHash) {
if bytes.Equal(hashVal, lastHash) {
return nil
}
// New value: write it to the underlying store
if err := bkt.Put(key, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write data at key %s: %v", key, err)
}
// New value written, store hash
// New value written, store hash (bucket path map was created above)
c.hashesLock.Lock()
c.hashes[hashKey] = hashVal
c.hashes[hashPath][hashKey] = hashVal
c.hashesLock.Unlock()
return nil
......@@ -102,3 +110,23 @@ func (c *keyValueCodec) Get(bkt kvStore, key []byte, obj interface{}) error {
return nil
}
// DeleteBucket or do nothing if bucket doesn't exist.
func (c *keyValueCodec) DeleteBucket(parent kvStore, bktName []byte) error {
// Get the path of the bucket being deleted
bkt := parent.Bucket(bktName)
if bkt == nil {
// Doesn't exist! Nothing to delete
return nil
}
// Delete the bucket
err := parent.DeleteBucket(bktName)
// Always purge all corresponding hashes to prevent memory leaks
c.hashesLock.Lock()
delete(c.hashes, string(bkt.Path()))
c.hashesLock.Unlock()
return err
}
......@@ -12,6 +12,26 @@ type mockKVStore struct {
puts int
}
func (mockKVStore) Path() []byte {
return []byte{}
}
func (m *mockKVStore) Bucket(name []byte) kvStore {
return m
}
func (m *mockKVStore) CreateBucket(key []byte) (kvStore, error) {
return m, nil
}
func (m *mockKVStore) CreateBucketIfNotExists(key []byte) (kvStore, error) {
return m, nil
}
func (m *mockKVStore) DeleteBucket(key []byte) error {
return nil
}
func (mockKVStore) Get(key []byte) (val []byte) {
return nil
}
......@@ -21,10 +41,6 @@ func (m *mockKVStore) Put(key, val []byte) error {
return nil
}
func (mockKVStore) Writable() bool {
return true
}
// TestKVCodec_PutHash asserts that Puts on the underlying kvstore only occur
// when the data actually changes.
func TestKVCodec_PutHash(t *testing.T) {
......@@ -33,7 +49,6 @@ func TestKVCodec_PutHash(t *testing.T) {
// Create arguments for Put
kv := new(mockKVStore)
path := "path-path"
key := []byte("key1")
val := &struct {
Val int
......@@ -42,29 +57,24 @@ func TestKVCodec_PutHash(t *testing.T) {
}
// Initial Put should be written
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(1, kv.puts)
// Writing the same values again should be a noop
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(1, kv.puts)
// Changing the value should write again
val.Val++
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(2, kv.puts)
// Changing the key should write again
key = []byte("key2")
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(3, kv.puts)
// Changing the path should write again
path = "new-path"
require.NoError(codec.Put(kv, path, key, val))
require.Equal(4, kv.puts)
// Writing the same values again should be a noop
require.NoError(codec.Put(kv, path, key, val))
require.Equal(4, kv.puts)
require.NoError(codec.Put(kv, key, val))
require.Equal(3, kv.puts)
}
package state
import "github.com/boltdb/bolt"
// namedBucket is a wrapper around bolt.Bucket's to preserve their path
// information and expose it via the Path() method.
//
// Knowing the full bucket path to a key is necessary for tracking accesses in
// another datastructure such as the hashing writer keyValueCodec.
type namedBucket struct {
path []byte
name []byte
bkt *bolt.Bucket
}
// newNamedBucket from a bolt transaction.
func newNamedBucket(tx *bolt.Tx, root []byte) *namedBucket {
b := tx.Bucket(root)
if b == nil {
return nil
}
return &namedBucket{
path: root,
name: root,
bkt: b,
}
}
// createNamedBucketIfNotExists from a bolt transaction.
func createNamedBucketIfNotExists(tx *bolt.Tx, root []byte) (*namedBucket, error) {
b, err := tx.CreateBucketIfNotExists(root)
if err != nil {
return nil, err
}
return &namedBucket{
path: root,
name: root,
bkt: b,
}, nil
}
// Path to this bucket (including this bucket).
func (n *namedBucket) Path() []byte {
return n.path
}
// Name of this bucket.
func (n *namedBucket) Name() []byte {
return n.name
}
// Bucket returns a bucket inside the current one or nil if the bucket does not
// exist.
func (n *namedBucket) Bucket(name []byte) kvStore {
b := n.bkt.Bucket(name)
if b == nil {
return nil
}
return &namedBucket{
path: n.chBkt(name),
name: name,
bkt: b,
}
}
// CreateBucketIfNotExists creates a bucket if it doesn't exist and returns it
// or an error.
func (n *namedBucket) CreateBucketIfNotExists(name []byte) (kvStore, error) {
b, err := n.bkt.CreateBucketIfNotExists(name)
if err != nil {
return nil, err
}
return &namedBucket{
path: n.chBkt(name),
name: name,
bkt: b,
}, nil
}
// CreateBucket creates a bucket and returns it.
func (n *namedBucket) CreateBucket(name []byte) (kvStore, error) {
b, err := n.bkt.CreateBucket(name)
if err != nil {
return nil, err
}
return &namedBucket{
path: n.chBkt(name),
name: name,
bkt: b,
}, nil
}
// DeleteBucket calls DeleteBucket on the underlying bolt.Bucket.
func (n *namedBucket) DeleteBucket(name []byte) error {
return n.bkt.DeleteBucket(name)
}
// Get calls Get on the underlying bolt.Bucket.
func (n *namedBucket) Get(key []byte) []byte {
return n.bkt.Get(key)
}
// Put calls Put on the underlying bolt.Bucket.
func (n *namedBucket) Put(key, value []byte) error {
return n.bkt.Put(key, value)
}
// chBkt is like chdir but for buckets: it appends the new name to the end of
// a copy of the path and returns it.
func (n *namedBucket) chBkt(name []byte) []byte {
// existing path + new path element + path separator
path := make([]byte, len(n.path)+len(name)+1)
copy(path[0:len(n.path)], n.path)
path[len(n.path)] = '/'
copy(path[len(n.path)+1:], name)
return path
}
package state
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/boltdb/bolt"
"github.com/stretchr/testify/require"
)
func setupBoltDB(t *testing.T) (*bolt.DB, func()) {
dir, err := ioutil.TempDir("", "nomadtest_")
require.NoError(t, err)
cleanup := func() {
if err := os.RemoveAll(dir); err != nil {
t.Logf("error removing test dir: %v", err)
}
}
dbFilename := filepath.Join(dir, "nomadtest.db")
db, err := bolt.Open(dbFilename, 0600, nil)
if err != nil {
cleanup()
t.Fatalf("error creating boltdb: %v", err)
}
return db, func() {
db.Close()
cleanup()
}
}
// TestNamedBucket_Path asserts that creating and changing buckets are tracked
// properly by the namedBucket wrapper.
func TestNamedBucket_Path(t *testing.T) {
t.Parallel()
require := require.New(t)
db, cleanup := setupBoltDB(t)
defer cleanup()
parentBktName, childBktName := []byte("root"), []byte("child")
parentKey, parentVal := []byte("pkey"), []byte("pval")
childKey, childVal := []byte("ckey"), []byte("cval")
require.NoError(db.Update(func(tx *bolt.Tx) error {
// Trying to open a named bucket from a nonexistent bucket
// should return nil.
require.Nil(newNamedBucket(tx, []byte("nonexistent")))
// Creating a named bucket from a bolt tx should work and set
// the path and name properly.
b, err := createNamedBucketIfNotExists(tx, parentBktName)
require.NoError(err)
require.Equal(parentBktName, b.Name())
require.Equal(parentBktName, b.Path())
// Trying to descend into a nonexistent bucket should return
// nil.
require.Nil(b.Bucket([]byte("nonexistent")))
// Descending into a new bucket should update the path.
childBkt, err := b.CreateBucket(childBktName)
require.NoError(err)
require.Equal(childBktName, childBkt.(*namedBucket).Name())
require.Equal([]byte("root/child"), childBkt.Path())
// Assert the parent bucket did not get changed.
require.Equal(parentBktName, b.Name())
require.Equal(parentBktName, b.Path())
// Add entries to both buckets
require.NoError(b.Put(parentKey, parentVal))
require.NoError(childBkt.Put(childKey, childVal))
return nil
}))
// Read buckets and values back out
require.NoError(db.View(func(tx *bolt.Tx) error {
b := newNamedBucket(tx, parentBktName)
require.NotNil(b)
require.Equal(parentVal, b.Get(parentKey))
require.Nil(b.Get(childKey))
childBkt := b.Bucket(childBktName)
require.NotNil(childBkt)
require.Nil(childBkt.Get(parentKey))
require.Equal(childVal, childBkt.Get(childKey))
return nil
}))
}
// TestNamedBucket_DeleteBucket asserts that deleting a bucket properly purges
// all related keys from the internal hashes map.
func TestNamedBucket_DeleteBucket(t *testing.T) {
t.Parallel()
require := require.New(t)
db, cleanup := setupBoltDB(t)
defer cleanup()
// Create some nested buckets and keys (key values will just be their names)
b1Name, c1Name, c2Name, c1c1Name := []byte("b1"), []byte("c1"), []byte("c2"), []byte("c1c1")
b1k1, c1k1, c2k1, c1c1k1 := []byte("b1k1"), []byte("c1k1"), []byte("c2k1"), []byte("c1c1k1")
codec := newKeyValueCodec()
// Create initial db state
require.NoError(db.Update(func(tx *bolt.Tx) error {
// Create bucket 1 and key
b1, err := createNamedBucketIfNotExists(tx, b1Name)
require.NoError(err)
require.NoError(codec.Put(b1, b1k1, b1k1))
// Create child bucket 1 and key
c1, err := b1.CreateBucketIfNotExists(c1Name)
require.NoError(err)
require.NoError(codec.Put(c1, c1k1, c1k1))
// Create child-child bucket 1 and key
c1c1, err := c1.CreateBucketIfNotExists(c1c1Name)
require.NoError(err)
require.NoError(codec.Put(c1c1, c1c1k1, c1c1k1))
// Create child bucket 2 and key
c2, err := b1.CreateBucketIfNotExists(c2Name)
require.NoError(err)
require.NoError(codec.Put(c2, c2k1, c2k1))
return nil
}))
// codec should be tracking 4 hash buckets (b1, c1, c2, c1c1)
require.Len(codec.hashes, 4)
// Delete c1
require.NoError(db.Update(func(tx *bolt.Tx) error {
b1 := newNamedBucket(tx, b1Name)
return codec.DeleteBucket(b1, c1Name)
}))
START HERE // We don't appear to be properly deleting the sub-bucket
// codec should be tracking 2 hash buckets (b1, c2)
require.Len(codec.hashes, 2)
// Assert all of c1 is gone
require.NoError(db.View(func(tx *bolt.Tx) error {
return nil
}))
}
......@@ -3,7 +3,6 @@ package state
import (
"fmt"
"path/filepath"
"strings"
"github.com/boltdb/bolt"
trstate "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
......@@ -39,56 +38,6 @@ var (
taskStateKey = []byte("task_state")
)
//TODO delete from kvcodec
// DeleteAllocationBucket is used to delete an allocation bucket if it exists.
func DeleteAllocationBucket(tx *bolt.Tx, allocID string) error {
if !tx.Writable() {
return fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return nil
}
// Check if the bucket exists
key := []byte(allocID)
if allocBkt := allocations.Bucket(key); allocBkt == nil {
return nil
}
return allocations.DeleteBucket(key)
}
//TODO delete from kvcodec
// DeleteTaskBucket is used to delete a task bucket if it exists.
func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
if !tx.Writable() {
return fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return nil
}
// Retrieve the specific allocations bucket
alloc := allocations.Bucket([]byte(allocID))
if alloc == nil {
return nil
}
// Check if the bucket exists
key := []byte(taskName)
if taskBkt := alloc.Bucket(key); taskBkt == nil {
return nil
}
return alloc.DeleteBucket(key)
}
// NewStateDBFunc creates a StateDB given a state directory.
type NewStateDBFunc func(stateDir string) (StateDB, error)
......@@ -153,7 +102,7 @@ type allocEntry struct {
}
func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map[string]error) {
allocationsBkt := tx.Bucket(allocationsBucket)
allocationsBkt := newNamedBucket(tx, allocationsBucket)
if allocationsBkt == nil {
// No allocs
return nil, nil
......@@ -163,7 +112,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map
errs := map[string]error{}
// Create a cursor for iteration.
c := allocationsBkt.Cursor()
c := allocationsBkt.bkt.Cursor()
// Iterate over all the allocation buckets
for k, _ := c.First(); k != nil; k, _ = c.Next() {
......@@ -190,7 +139,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map
func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
return s.db.Update(func(tx *bolt.Tx) error {
// Retrieve the root allocations bucket
allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket)
allocsBkt, err := createNamedBucketIfNotExists(tx, allocationsBucket)
if err != nil {
return err
}
......@@ -205,7 +154,7 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
allocState := allocEntry{
Alloc: alloc,
}
return s.codec.Put(allocBkt, alloc.ID, allocKey, &allocState)
return s.codec.Put(allocBkt, allocKey, &allocState)
})
}
......@@ -256,8 +205,7 @@ func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val inte
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
path := strings.Join([]string{allocID, taskName, string(taskLocalStateKey)}, "-")
if err := s.codec.Put(taskBkt, path, taskLocalStateKey, val); err != nil {
if err := s.codec.Put(taskBkt, taskLocalStateKey, val); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
......@@ -273,8 +221,42 @@ func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.Task
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
path := strings.Join([]string{allocID, taskName, string(taskStateKey)}, "-")
return s.codec.Put(taskBkt, path, taskStateKey, state)
return s.codec.Put(taskBkt, taskStateKey, state)
})
}
// DeleteTaskBucket is used to delete a task bucket if it exists.
func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
return s.db.Update(func(tx *bolt.Tx) error {
// Retrieve the root allocations bucket
allocations := newNamedBucket(tx, allocationsBucket)
if allocations == nil {
return nil
}
// Retrieve the specific allocations bucket
alloc := allocations.Bucket([]byte(allocID))
if alloc == nil {
return nil
}
// Check if the bucket exists
key := []byte(taskName)
return s.codec.DeleteBucket(alloc, key)
})
}
// DeleteAllocationBucket is used to delete an allocation bucket if it exists.
func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error {
return s.db.Update(func(tx *bolt.Tx) error {
// Retrieve the root allocations bucket
allocations := newNamedBucket(tx, allocationsBucket)
if allocations == nil {
return nil
}
key := []byte(allocID)
return s.codec.DeleteBucket(allocations, key)
})
}
......@@ -288,18 +270,18 @@ func (s *BoltStateDB) Close() error {
// particular allocation. If the root allocation bucket or the specific
// allocation bucket doesn't exist, it will be created as long as the
// transaction is writable.
func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
func getAllocationBucket(tx *bolt.Tx, allocID string) (kvStore, error) {
var err error
w := tx.Writable()
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
allocations := newNamedBucket(tx, allocationsBucket)
if allocations == nil {
if !w {
return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable")
}
allocations, err = tx.CreateBucket(allocationsBucket)
allocations, err = createNamedBucketIfNotExists(tx, allocationsBucket)
if err != nil {
return nil, err
}
......@@ -326,7 +308,7 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
// particular task. If the root allocation bucket, the specific
// allocation or task bucket doesn't exist, they will be created as long as the
// transaction is writable.
func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) {
func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (kvStore, error) {
alloc, err := getAllocationBucket(tx, allocID)
if err != nil {
return nil, err
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment