Commit 1e182e58 authored by Adnan Abdulhussein's avatar Adnan Abdulhussein Committed by KubeKween
Browse files

record restic backup progress in PodVolumeBackup (#1821)


* record restic backup progress in PodVolumeBackup
Signed-off-by: default avatarAdnan Abdulhussein <aadnan@vmware.com>
parent fc39ac6d
Showing with 274 additions and 12 deletions
+274 -12
......@@ -18,9 +18,9 @@ LABEL maintainer="Steve Kriss <krisss@vmware.com>"
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates wget bzip2 && \
wget --quiet https://github.com/restic/restic/releases/download/v0.9.4/restic_0.9.4_linux_amd64.bz2 && \
bunzip2 restic_0.9.4_linux_amd64.bz2 && \
mv restic_0.9.4_linux_amd64 /usr/bin/restic && \
wget --quiet https://github.com/restic/restic/releases/download/v0.9.5/restic_0.9.5_linux_amd64.bz2 && \
bunzip2 restic_0.9.5_linux_amd64.bz2 && \
mv restic_0.9.5_linux_amd64 /usr/bin/restic && \
chmod +x /usr/bin/restic && \
apt-get remove -y wget bzip2 && \
rm -rf /var/lib/apt/lists/*
......
......@@ -18,8 +18,8 @@ LABEL maintainer="Steve Kriss <krisss@vmware.com>"
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates wget && \
wget --quiet https://oplab9.parqtec.unicamp.br/pub/ppc64el/restic/restic-0.9.4 && \
mv restic-0.9.4 /usr/bin/restic && \
wget --quiet https://oplab9.parqtec.unicamp.br/pub/ppc64el/restic/restic-0.9.5 && \
mv restic-0.9.5 /usr/bin/restic && \
chmod +x /usr/bin/restic && \
apt-get remove -y wget && \
rm -rf /var/lib/apt/lists/*
......
report backup progress in PodVolumeBackups and expose progress in the velero backup describe --details command. Also upgrades restic to v0.9.5
......@@ -80,6 +80,11 @@ type PodVolumeBackupStatus struct {
// Completion time is recorded before uploading the backup object.
// The server's time is used for CompletionTimestamps
CompletionTimestamp metav1.Time `json:"completionTimestamp"`
// Progress holds the total number of bytes of the volume and the current
// number of backed up bytes. This can be used to display progress information
// about the backup operation.
Progress PodVolumeOperationProgress `json:"progress"`
}
// +genclient
......
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
// PodVolumeOperationProgress represents the progress of a
// PodVolumeBackup/Restore (restic) operation
type PodVolumeOperationProgress struct {
TotalBytes int64 `json:"totalBytes"`
BytesDone int64 `json:"bytesDone"`
}
......@@ -722,6 +722,7 @@ func (in *PodVolumeBackupStatus) DeepCopyInto(out *PodVolumeBackupStatus) {
*out = *in
in.StartTimestamp.DeepCopyInto(&out.StartTimestamp)
in.CompletionTimestamp.DeepCopyInto(&out.CompletionTimestamp)
out.Progress = in.Progress
return
}
......@@ -735,6 +736,22 @@ func (in *PodVolumeBackupStatus) DeepCopy() *PodVolumeBackupStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PodVolumeOperationProgress) DeepCopyInto(out *PodVolumeOperationProgress) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeOperationProgress.
func (in *PodVolumeOperationProgress) DeepCopy() *PodVolumeOperationProgress {
if in == nil {
return nil
}
out := new(PodVolumeOperationProgress)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *PodVolumeRestore) DeepCopyInto(out *PodVolumeRestore) {
*out = *in
......
......@@ -377,7 +377,7 @@ func DescribePodVolumeBackups(d *Describer, backups []velerov1api.PodVolumeBacku
backupsByPod := new(volumesByPod)
for _, backup := range backupsByPhase[phase] {
backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume)
backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume, phase, backup.Status.Progress)
}
d.Printf("\t%s:\n", phase)
......@@ -423,13 +423,18 @@ type volumesByPod struct {
// Add adds a pod volume with the specified pod namespace, name
// and volume to the appropriate group.
func (v *volumesByPod) Add(namespace, name, volume string) {
func (v *volumesByPod) Add(namespace, name, volume, phase string, progress velerov1api.PodVolumeOperationProgress) {
if v.volumesByPodMap == nil {
v.volumesByPodMap = make(map[string]*podVolumeGroup)
}
key := fmt.Sprintf("%s/%s", namespace, name)
// append backup progress percentage if backup is in progress
if phase == "In Progress" && progress != (velerov1api.PodVolumeOperationProgress{}) {
volume = fmt.Sprintf("%s (%.2f%%)", volume, float64(progress.BytesDone)/float64(progress.TotalBytes)*100)
}
if group, ok := v.volumesByPodMap[key]; !ok {
group := &podVolumeGroup{
label: key,
......
......@@ -188,7 +188,8 @@ func describePodVolumeRestores(d *Describer, restores []v1.PodVolumeRestore, det
restoresByPod := new(volumesByPod)
for _, restore := range restoresByPhase[phase] {
restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume)
// TODO(adnan): replace last parameter with progress from status (#1749)
restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume, phase, v1.PodVolumeOperationProgress{})
}
d.Printf("\t%s:\n", phase)
......
......@@ -39,7 +39,6 @@ import (
informers "github.com/heptio/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/heptio/velero/pkg/generated/listers/velero/v1"
"github.com/heptio/velero/pkg/restic"
veleroexec "github.com/heptio/velero/pkg/util/exec"
"github.com/heptio/velero/pkg/util/filesystem"
"github.com/heptio/velero/pkg/util/kube"
)
......@@ -254,7 +253,7 @@ func (c *podVolumeBackupController) processBackup(req *velerov1api.PodVolumeBack
var stdout, stderr string
var emptySnapshot bool
if stdout, stderr, err = veleroexec.RunCommand(resticCmd.Cmd()); err != nil {
if stdout, stderr, err = restic.RunBackup(resticCmd, log, c.updateBackupProgressFunc(req, log)); err != nil {
if strings.Contains(stderr, "snapshot is empty") {
emptySnapshot = true
} else {
......@@ -361,6 +360,18 @@ func (c *podVolumeBackupController) patchPodVolumeBackup(req *velerov1api.PodVol
return req, nil
}
// updateBackupProgressFunc returns a func that takes progress info and patches
// the PVB with the new progress
func (c *podVolumeBackupController) updateBackupProgressFunc(req *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
if _, err := c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) {
r.Status.Progress = progress
}); err != nil {
log.WithError(err).Error("error updating PodVolumeBackup progress")
}
}
}
func (c *podVolumeBackupController) fail(req *velerov1api.PodVolumeBackup, msg string, log logrus.FieldLogger) error {
if _, err := c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) {
r.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
......
......@@ -34,7 +34,7 @@ func BackupCommand(repoIdentifier, passwordFile, path string, tags map[string]st
PasswordFile: passwordFile,
Dir: path,
Args: []string{"."},
ExtraFlags: append(backupTagFlags(tags), "--host=velero"),
ExtraFlags: append(backupTagFlags(tags), "--host=velero", "--json"),
}
}
......
......@@ -32,7 +32,7 @@ func TestBackupCommand(t *testing.T) {
assert.Equal(t, "path", c.Dir)
assert.Equal(t, []string{"."}, c.Args)
expected := []string{"--tag=foo=bar", "--tag=c=d", "--host=velero"}
expected := []string{"--tag=foo=bar", "--tag=c=d", "--host=velero", "--json"}
sort.Strings(expected)
sort.Strings(c.ExtraFlags)
assert.Equal(t, expected, c.ExtraFlags)
......
......@@ -17,13 +17,29 @@ limitations under the License.
package restic
import (
"bytes"
"encoding/json"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
velerov1api "github.com/heptio/velero/pkg/apis/velero/v1"
"github.com/heptio/velero/pkg/util/exec"
)
const backupProgressCheckInterval = 10 * time.Second
type backupStatusLine struct {
MessageType string `json:"message_type"`
// seen in status lines
TotalBytes int64 `json:"total_bytes"`
BytesDone int64 `json:"bytes_done"`
// seen in summary line at the end
TotalBytesProcessed int64 `json:"total_bytes_processed"`
}
// GetSnapshotID runs a 'restic snapshots' command to get the ID of the snapshot
// in the specified repo matching the set of provided tags, or an error if a
// unique snapshot cannot be identified.
......@@ -53,3 +69,105 @@ func GetSnapshotID(repoIdentifier, passwordFile string, tags map[string]string,
return snapshots[0].ShortID, nil
}
// RunBackup runs a `restic backup` command and watches the output to provide
// progress updates to the caller.
func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) {
// buffers for copying command stdout/err output into
stdoutBuf := new(bytes.Buffer)
stderrBuf := new(bytes.Buffer)
// create a channel to signal when to end the goroutine scanning for progress
// updates
quit := make(chan struct{})
cmd := backupCmd.Cmd()
cmd.Stdout = stdoutBuf
cmd.Stderr = stderrBuf
cmd.Start()
go func() {
ticker := time.NewTicker(backupProgressCheckInterval)
for {
select {
case <-ticker.C:
lastLine := getLastLine(stdoutBuf.Bytes())
stat, err := decodeBackupStatusLine(lastLine)
if err != nil {
log.WithError(err).Errorf("error getting restic backup progress")
}
// if the line contains a non-empty bytes_done field, we can update the
// caller with the progress
if stat.BytesDone != 0 {
updateFunc(velerov1api.PodVolumeOperationProgress{
TotalBytes: stat.TotalBytes,
BytesDone: stat.BytesDone,
})
}
case <-quit:
ticker.Stop()
return
}
}
}()
cmd.Wait()
quit <- struct{}{}
summary, err := getSummaryLine(stdoutBuf.Bytes())
if err != nil {
return stdoutBuf.String(), stderrBuf.String(), err
}
stat, err := decodeBackupStatusLine(summary)
if err != nil {
return stdoutBuf.String(), stderrBuf.String(), err
}
if stat.MessageType != "summary" {
return stdoutBuf.String(), stderrBuf.String(), errors.WithStack(fmt.Errorf("error getting restic backup summary: %s", string(summary)))
}
// update progress to 100%
updateFunc(velerov1api.PodVolumeOperationProgress{
TotalBytes: stat.TotalBytesProcessed,
BytesDone: stat.TotalBytesProcessed,
})
return string(summary), stderrBuf.String(), nil
}
func decodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) {
var stat backupStatusLine
if err := json.Unmarshal(lastLine, &stat); err != nil {
return stat, errors.Wrapf(err, "unable to decode backup JSON line: %s", string(lastLine))
}
return stat, nil
}
// getLastLine returns the last line of a byte array. The string is assumed to
// have a newline at the end of it, so this returns the substring between the
// last two newlines.
func getLastLine(b []byte) []byte {
// subslice the byte array to ignore the newline at the end of the string
lastNewLineIdx := bytes.LastIndex(b[:len(b)-1], []byte("\n"))
return b[lastNewLineIdx+1 : len(b)-1]
}
// getSummaryLine looks for the summary JSON line
// (`{"message_type:"summary",...`) in the restic backup command output. Due to
// an issue in Restic, this might not always be the last line
// (https://github.com/restic/restic/issues/2389). It returns an error if it
// can't be found.
func getSummaryLine(b []byte) ([]byte, error) {
summaryLineIdx := bytes.LastIndex(b, []byte(`{"message_type":"summary"`))
if summaryLineIdx < 0 {
return nil, errors.New("unable to find summary in restic backup command output")
}
// find the end of the summary line
newLineIdx := bytes.Index(b[summaryLineIdx:], []byte("\n"))
if newLineIdx < 0 {
return nil, errors.New("unable to get summary line from restic backup command output")
}
return b[summaryLineIdx : summaryLineIdx+newLineIdx], nil
}
/*
Copyright 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_getSummaryLine(t *testing.T) {
summaryLine := `{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0,"dirs_changed":0,"dirs_unmodified":0,"data_blobs":0,"tree_blobs":0,"data_added":0,"total_files_processed":3,"total_bytes_processed":13238272000,"total_duration":0.319265105,"snapshot_id":"38515bb5"}`
tests := []struct {
name string
output string
wantErr bool
}{
{"no summary", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000}
{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000}
`, true},
{"no newline after summary", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000}
{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000}
{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0`, true},
{"summary at end", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000}
{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000}
{"message_type":"status","percent_done":1,"total_files":3,"files_done":3,"total_bytes":13238272000,"bytes_done":13238272000}
{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0,"dirs_changed":0,"dirs_unmodified":0,"data_blobs":0,"tree_blobs":0,"data_added":0,"total_files_processed":3,"total_bytes_processed":13238272000,"total_duration":0.319265105,"snapshot_id":"38515bb5"}
`, false},
{"summary before status", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000}
{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000}
{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0,"dirs_changed":0,"dirs_unmodified":0,"data_blobs":0,"tree_blobs":0,"data_added":0,"total_files_processed":3,"total_bytes_processed":13238272000,"total_duration":0.319265105,"snapshot_id":"38515bb5"}
{"message_type":"status","percent_done":1,"total_files":3,"files_done":3,"total_bytes":13238272000,"bytes_done":13238272000}
`, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
summary, err := getSummaryLine([]byte(tt.output))
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, summaryLine, string(summary))
}
})
}
}
func Test_getLastLine(t *testing.T) {
tests := []struct {
output string
want string
}{
{`last line
`, "last line"},
{`first line
second line
third line
`, "third line"},
}
for _, tt := range tests {
t.Run(tt.want, func(t *testing.T) {
assert.Equal(t, []byte(tt.want), getLastLine([]byte(tt.output)))
})
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment