Commit e6cd3fca authored by Rémi Lapeyre's avatar Rémi Lapeyre
Browse files

backport of commit 5f444a68

parent d76c5e6f
No related merge requests found
Showing with 72 additions and 52 deletions
+72 -52
...@@ -198,72 +198,92 @@ func (c *RemoteClient) Put(data []byte) error { ...@@ -198,72 +198,92 @@ func (c *RemoteClient) Put(data []byte) error {
verb = consulapi.KVSet verb = consulapi.KVSet
} }
// If the payload is too large we first write the chunks and replace it // The payload may be too large to store in a single KV entry in Consul. We
// 524288 is the default value, we just hope the user did not set a smaller // could try to determine whether it will fit or not before sending the
// one but there is really no reason for them to do so, if they changed it // request but since we are using the Transaction API and not the KV API,
// it is certainly to set a larger value. // it grows by about a 1/3 when it is base64 encoded plus the overhead of
limit := 524288 // the fields specific to the Transaction API.
if len(payload) > limit { // Rather than trying to calculate the overhead (which could change from
md5 := md5.Sum(data) // one version of Consul to another, and between Consul Community Edition
chunks := split(payload, limit) // and Consul Enterprise), we try to send the whole state in one request, if
chunkPaths := make([]string, 0) // it fails because it is too big we then split it in chunks and send each
// chunk separately.
// First we write the new chunks // When splitting in chunks, we make each chunk 524288 bits, which is the
for i, p := range chunks { // default max size for raft. If the user changed it, we still may send
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i) // chunks too big and fail but this is not a setting that should be fiddled
chunkPaths = append(chunkPaths, path) // with anyway.
_, err := kv.Put(&consulapi.KVPair{
Key: path, store := func(payload []byte) error {
Value: p, // KV.Put doesn't return the new index, so we use a single operation
}, nil) // transaction to get the new index with a single request.
txOps := consulapi.KVTxnOps{
if err != nil { &consulapi.KVTxnOp{
return err Verb: verb,
} Key: c.Path,
Value: payload,
Index: c.modifyIndex,
},
} }
// We update the link to point to the new chunks ok, resp, _, err := kv.Txn(txOps, nil)
payload, err = json.Marshal(map[string]interface{}{
"current-hash": fmt.Sprintf("%x", md5),
"chunks": chunkPaths,
})
if err != nil { if err != nil {
return err return err
} }
} // transaction was rolled back
if !ok {
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
}
var txOps consulapi.KVTxnOps if len(resp.Results) != 1 {
// KV.Put doesn't return the new index, so we use a single operation // this probably shouldn't happen
// transaction to get the new index with a single request. return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
txOps = consulapi.KVTxnOps{ }
&consulapi.KVTxnOp{
Verb: verb, c.modifyIndex = resp.Results[0].ModifyIndex
Key: c.Path,
Value: payload, // We remove all the old chunks
Index: c.modifyIndex, cleanupOldChunks()
},
return nil
} }
ok, resp, _, err := kv.Txn(txOps, nil) if err = store(payload); err == nil {
if err != nil { // The payload was small enough to be stored
return nil
} else if !strings.Contains(err.Error(), "too large") {
// We failed for some other reason, report this to the user
return err return err
} }
// transaction was rolled back
if !ok {
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
}
if len(resp.Results) != 1 { // The payload was too large so we split it in multiple chunks
// this probably shouldn't happen
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
}
c.modifyIndex = resp.Results[0].ModifyIndex md5 := md5.Sum(data)
chunks := split(payload, 524288)
chunkPaths := make([]string, 0)
// We remove all the old chunks // First we write the new chunks
cleanupOldChunks() for i, p := range chunks {
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
chunkPaths = append(chunkPaths, path)
_, err := kv.Put(&consulapi.KVPair{
Key: path,
Value: p,
}, nil)
return nil if err != nil {
return err
}
}
// Then we update the link to point to the new chunks
payload, err = json.Marshal(map[string]interface{}{
"current-hash": fmt.Sprintf("%x", md5),
"chunks": chunkPaths,
})
if err != nil {
return err
}
return store(payload)
} }
func (c *RemoteClient) Delete() error { func (c *RemoteClient) Delete() error {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment