Commit 1d427d75 authored by Angela Nguyen's avatar Angela Nguyen
Browse files

merge in working branch

No related merge requests found
Showing with 189 additions and 8 deletions
+189 -8
......@@ -121,8 +121,9 @@ func (s *svc) UpdateTableCapacity(ctx context.Context, region string, tableName
return stat, nil
}
func (s *svc) UpdateGSICapacity(ctx context.Context, region string, tableName string, indexName string, targetRCU int64, targetWCU int64) error {
return nil
func (s *svc) UpdateGSICapacity(ctx context.Context, region string, tableName string, indexName string, targetRCU int64, targetWCU int64) (dynamodbv1.Status, error) {
stat := dynamodbv1.Status(3)
return stat, nil
}
func (s *svc) Regions() []string {
......
......@@ -3,9 +3,6 @@ package dynamodb
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
dynamodbv1 "github.com/lyft/clutch/backend/api/aws/dynamodb/v1"
"github.com/lyft/clutch/backend/service/aws"
)
......@@ -39,5 +36,10 @@ func (a *dynamodbAPI) UpdateTableCapacity(ctx context.Context, req *dynamodbv1.U
}
func (a *dynamodbAPI) UpdateGSICapacity(ctx context.Context, req *dynamodbv1.UpdateGSICapacityRequest) (*dynamodbv1.UpdateGSICapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
result, err := a.client.UpdateGSICapacity(ctx, req.Region, req.TableName, req.IndexName, req.TargetIndexRcu, req.TargetIndexWcu)
if err != nil {
return nil, err
}
return &dynamodbv1.UpdateGSICapacityResponse{TableName: req.TableName, TableStatus: result}, nil
}
......@@ -116,6 +116,7 @@ type Client interface {
DescribeTable(ctx context.Context, region string, tableName string) (*dynamodbv1.Table, error)
UpdateTableCapacity(ctx context.Context, region string, tableName string, targetTableRcu int64, targetTableWcu int64) (dynamodbv1.Status, error)
UpdateGSICapacity(ctx context.Context, region string, tableName string, indexName string, targetIndexRcu int64, targetIndexWcu int64) (dynamodbv1.Status, error)
Regions() []string
}
......
......@@ -72,6 +72,7 @@ func getTable(ctx context.Context, client *regionalClient, tableName string) (*d
return client.dynamodb.DescribeTable(ctx, input)
}
// takes raw list of GSIs from table description and creates new GlobalSecondaryIndex structs
func getGlobalSecondaryIndexes(indexes []types.GlobalSecondaryIndexDescription) []*dynamodbv1.GlobalSecondaryIndex {
gsis := make([]*dynamodbv1.GlobalSecondaryIndex, len(indexes))
for idx, i := range indexes {
......@@ -80,6 +81,16 @@ func getGlobalSecondaryIndexes(indexes []types.GlobalSecondaryIndexDescription)
return gsis
}
// retrieve one GSI from list
func getGlobalSecondaryIndex(indexes []types.GlobalSecondaryIndexDescription, targetIndexName string) (*types.GlobalSecondaryIndexDescription, error) {
for _, i := range indexes {
if *i.IndexName == targetIndexName {
return &i, nil
}
}
return nil, status.Error(codes.NotFound, "Global secondary index not found.")
}
func newProtoForTableStatus(s types.TableStatus) dynamodbv1.Status {
value, ok := dynamodbv1.Status_value[string(s)]
if !ok {
......@@ -127,6 +138,18 @@ func isValidIncrease(client *regionalClient, current *types.ProvisionedThroughpu
return nil
}
func increaseCapacity(ctx context.Context, cl *regionalClient, input *dynamodb.UpdateTableInput) (dynamodbv1.Status, error) {
result, err := cl.dynamodb.UpdateTable(ctx, input)
if err != nil {
return 0, err
}
tableStatus := newProtoForTableStatus(result.TableDescription.TableStatus)
return tableStatus, nil
}
func (c *client) UpdateTableCapacity(ctx context.Context, region string, tableName string, targetTableRcu int64, targetTableWcu int64) (dynamodbv1.Status, error) {
cl, err := c.getRegionalClient(region)
if err != nil {
......@@ -156,14 +179,67 @@ func (c *client) UpdateTableCapacity(ctx context.Context, region string, tableNa
ProvisionedThroughput: &targetCapacity,
}
result, err := cl.dynamodb.UpdateTable(ctx, input)
tableStatus, err := increaseCapacity(ctx, cl, input)
if err != nil {
c.log.Error("update table failed", zap.Error(err))
return 0, err
}
tableStatus := newProtoForTableStatus(result.TableDescription.TableStatus)
return tableStatus, nil
}
func (c *client) UpdateGSICapacity(ctx context.Context, region string, tableName string, indexName string, targetIndexRcu int64, targetIndexWcu int64) (dynamodbv1.Status, error) {
cl, err := c.getRegionalClient(region)
if err != nil {
c.log.Error("unable to get regional client", zap.Error(err))
return 0, err
}
result, err := getTable(ctx, cl, tableName)
if err != nil {
c.log.Error("unable to find table", zap.Error(err))
return 0, err
}
index, err := getGlobalSecondaryIndex(result.Table.GlobalSecondaryIndexes, indexName)
if err != nil {
c.log.Error("specified GSI not found", zap.Error(err))
return 0, err
}
targetCapacity := types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(targetIndexRcu),
WriteCapacityUnits: aws.Int64(targetIndexWcu),
}
err = isValidIncrease(cl, index.ProvisionedThroughput, targetCapacity)
if err != nil {
c.log.Error("invalid requested amount for capacity increase", zap.Error(err))
return 0, err
}
input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
GlobalSecondaryIndexUpdates: []types.GlobalSecondaryIndexUpdate{
types.GlobalSecondaryIndexUpdate{
Update: &types.UpdateGlobalSecondaryIndexAction{
IndexName: aws.String(indexName),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(targetIndexRcu),
WriteCapacityUnits: aws.Int64(targetIndexWcu),
},
},
},
},
}
tableStatus, err := increaseCapacity(ctx, cl, input)
if err != nil {
c.log.Error("update table failed", zap.Error(err))
return 0, err
}
return tableStatus, nil
}
......@@ -59,6 +59,13 @@ var testDynamodbTableWithGSI = &types.TableDescription{
WriteCapacityUnits: aws.Int64(20),
},
},
{IndexName: aws.String("test-gsi-two"),
KeySchema: []types.KeySchemaElement{},
ProvisionedThroughput: &types.ProvisionedThroughputDescription{
ReadCapacityUnits: aws.Int64(100),
WriteCapacityUnits: aws.Int64(200),
},
},
},
TableStatus: "ACTIVE",
}
......@@ -78,6 +85,13 @@ var testTableWithGSIOutput = &dynamodbv1.Table{
WriteCapacityUnits: 20,
},
},
{
Name: "test-gsi-two",
ProvisionedThroughput: &dynamodbv1.ProvisionedThroughput{
ReadCapacityUnits: 100,
WriteCapacityUnits: 200,
},
},
},
Status: dynamodbv1.Status(5),
}
......@@ -276,6 +290,93 @@ func TestUpdateTableCapacityWithCustomLimits(t *testing.T) {
}
}
func TestGetGlobalSecondaryIndex(t *testing.T) {
testIndexes := testDynamodbTableWithGSI.GlobalSecondaryIndexes
validIndex := "test-gsi"
index, err := getGlobalSecondaryIndex(testIndexes, validIndex)
assert.NoError(t, err)
assert.NotNil(t, index)
assert.Equal(t, validIndex, *index.IndexName)
invalidIndex := "fake-gsi"
ret, err := getGlobalSecondaryIndex(testIndexes, invalidIndex)
assert.Error(t, err)
assert.Nil(t, ret)
}
func TestUpdateGSICapacitySuccess(t *testing.T) {
m := &mockDynamodb{
table: testDynamodbTableWithGSI,
}
ds := getScalingLimits(cfg)
d := &awsv1.DynamodbConfig{
ScalingLimits: &awsv1.ScalingLimits{
MaxReadCapacityUnits: ds.MaxReadCapacityUnits,
MaxWriteCapacityUnits: ds.MaxWriteCapacityUnits,
MaxScaleFactor: ds.MaxScaleFactor,
EnableOverride: ds.EnableOverride,
},
}
c := &client{
log: zaptest.NewLogger(t),
clients: map[string]*regionalClient{"us-east-1": {region: "us-east-1", dynamodbCfg: d, dynamodb: m}},
}
got, err := c.UpdateGSICapacity(context.Background(), "us-east-1", "test-table", "test-gsi-two", 101, 202)
assert.NotNil(t, got)
assert.Nil(t, err)
}
func TestUpdateGSICapacityErrors(t *testing.T) {
tests := []struct {
name string
inputRCU int64
inputWCU int64
want string
}{
{"rcu above max", 100000, 250, "rpc error: code = FailedPrecondition desc = Target read capacity exceeds maximum allowed limits [40000]"},
{"wcu above max", 100, 100000, "rpc error: code = FailedPrecondition desc = Target write capacity exceeds maximum allowed limits [40000]"},
{"rcu lower than current", 1, 1500, "rpc error: code = FailedPrecondition desc = Target read capacity [1] is lower than current capacity [100]"},
{"wcu lower than current", 1500, 1, "rpc error: code = FailedPrecondition desc = Target write capacity [1] is lower than current capacity [200]"},
{"rcu change scale too high", 400, 200, "rpc error: code = FailedPrecondition desc = Target read capacity exceeds the scale limit of [2.0]x current capacity"},
{"wcu change scale too high", 100, 600, "rpc error: code = FailedPrecondition desc = Target write capacity exceeds the scale limit of [2.0]x current capacity"},
}
m := &mockDynamodb{
table: testDynamodbTableWithGSI,
}
ds := getScalingLimits(cfg)
d := &awsv1.DynamodbConfig{
ScalingLimits: &awsv1.ScalingLimits{
MaxReadCapacityUnits: ds.MaxReadCapacityUnits,
MaxWriteCapacityUnits: ds.MaxWriteCapacityUnits,
MaxScaleFactor: ds.MaxScaleFactor,
EnableOverride: ds.EnableOverride,
},
}
c := &client{
log: zaptest.NewLogger(t),
clients: map[string]*regionalClient{"us-east-1": {region: "us-east-1", dynamodbCfg: d, dynamodb: m}},
}
for _, tt := range tests {
tt := tt // capture range variable
t.Run(tt.name, func(t *testing.T) {
status, err := c.UpdateGSICapacity(context.Background(), "us-east-1", "test-table", "test-gsi-two", tt.inputRCU, tt.inputWCU)
if err.Error() != tt.want {
t.Errorf("\nWant error msg: %s\nGot error msg: %s", tt.want, err)
}
assert.Equal(t, dynamodbv1.Status(0), status)
})
}
}
type mockDynamodb struct {
dynamodbClient
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment