VictoriaMetrics 官方文档对vmbackup和vmrestore的介绍比较少,近期整理了相关的文档,现在将源码阅读的内容分享给大家。
vmbackup 简介
vmbackup 从即时快照创建 VictoriaMetrics 的数据备份。
vmbackup支持增量备份和完整备份,如果目标路径(-dst参数)包含先前备份的数据,则会自动创建增量备份,可以通过-origin指向同一远端存储上的已有的备份来加速备份,在这种情况下,vmbackup会执行服务器端复制,来节省数据传输的成本和时间。
vmbackup备份进程可以随时中断。当使用相同参数重新启动 vmbackup 时,它会自动从中断点恢复。
支持的存储类型
-
GCS. Example:
gs://<bucket>/<path/to/backup>
-
S3. Example:
s3://<bucket>/<path/to/backup>
-
Azure Blob Storage. Example:
azblob://<container>/<path/to/backup>
- 任何兼容S3的存储,如: MinIO, Ceph or Swift. See these docs for details.
- 本地文件系统. Example: fs://</absolute/path/to/backup>.
可以使用以下命令进行定期备份:
./vmbackup -storageDataPath=</path/to/victoria-metrics-data> -snapshot.createURL=http://localhost:8428/snapshot/create -dst=gs://<bucket>/<path/to/new/backup>
vmbackup源码解析
入口文件:VictoriaMetrics/app/vmbackup/main.go
// ...
func main() {
// ...
// 完成flag、usage、环境变量、日志等的初始化
// ...
deleteSnapshot := func() {}
// 判断是否指定了snapshotCreateURL参数
if len(*snapshotCreateURL) > 0 {
// 调用vmstorage的api,创建文件快照
createURL, err := url.Parse(*snapshotCreateURL)
if err != nil {
logger.Fatalf("cannot parse snapshotCreateURL: %s", err)
}
// 不要同时指定snapshotCreateURL 和 snapshotName参数
if len(*snapshotName) > 0 {
logger.Fatalf("-snapshotName shouldn't be set if -snapshot.createURL is set, since snapshots are created automatically in this case")
}
logger.Infof("Snapshot create url %s", createURL.Redacted())
if len(*snapshotDeleteURL) <= 0 {
err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
if err != nil {
logger.Fatalf("Failed to set snapshot.deleteURL flag: %v", err)
}
}
// 构造要删除快照的url
deleteURL, err := url.Parse(*snapshotDeleteURL)
if err != nil {
logger.Fatalf("cannot parse snapshotDeleteURL: %s", err)
}
logger.Infof("Snapshot delete url %s", deleteURL.Redacted())
// 创建快照
name, err := snapshot.Create(createURL.String())
if err != nil {
logger.Fatalf("cannot create snapshot: %s", err)
}
// 将创建的快照名称赋值给flag的snapshotName变量,供后续使用
err = flag.Set("snapshotName", name)
if err != nil {
logger.Fatalf("cannot set snapshotName flag: %v", err)
}
deleteSnapshot = func() {
err := snapshot.Delete(deleteURL.String(), name)
if err != nil {
logger.Fatalf("cannot delete snapshot: %s", err)
}
}
}
// 开启vmbackup的http端口,用于指标暴露,供prometheus采集
listenAddrs := []string{*httpListenAddr}
go httpserver.Serve(listenAddrs, nil, nil)
pushmetrics.Init()
err := makeBackup() // makeBackup是备份的核心逻辑
deleteSnapshot() // 调用vmbackup api删除快照
// ... 后续收尾工作
}
func makeBackup() error {
// 创建目标备份对象,一般是远端存储的对象存储实例,从-dst参数中解析出存储类型(gcs、s3、azblob、fs等等),解析出对象存储桶和目录
// 使用-customS3Endpoint参数连接远端对象存储
// 使用-credsFilePath文件路径做校验(key、secrets)
dstFS, err := newDstFS()
if err != nil {
return err
}
if *snapshotName == "" {
// 做远端存储的服务端的复制 from -origin to -dst
originFS, err := newRemoteOriginFS()
if err != nil {
return err
}
a := &actions.RemoteBackupCopy{
Concurrency: *concurrency,
Src: originFS,
Dst: dstFS,
}
if err := a.Run(); err != nil {
return err
}
originFS.MustStop()
} else {
// 创建src实例
// 从快照的数据目录创建实例
srcFS, err := newSrcFS()
if err != nil {
return err
}
// 创建origin实例,如果-origin为空,创建的实例为空,主要是用于加速复制
originFS, err := newOriginFS()
if err != nil {
return err
}
a := &actions.Backup{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
Origin: originFS,
}
// 备份的核心逻辑在run方法中
if err := a.Run(); err != nil {
return err
}
srcFS.MustStop()
originFS.MustStop()
}
dstFS.MustStop()
return nil
}
// ...
下面对Run()方法进行解析
VictoriaMetrics/lib/backup//actions/backup.go
// Run runs b with the provided settings.
func (b *Backup) Run() error {
concurrency := b.Concurrency
src := b.Src
dst := b.Dst
origin := b.Origin
if origin != nil && origin.String() == dst.String() {
origin = nil
}
if origin == nil {
origin = &fsnil.FS{}
}
// 删除远端存储的 backup_complete.ignore 文件,该文件是在上一次备份完成后创建的,表示备份成功
if err := dst.DeleteFile(backupnames.BackupCompleteFilename); err != nil {
return fmt.Errorf("cannot delete `backup complete` file at %s: %w", dst, err)
}
// 运行备份程序
if err := runBackup(src, dst, origin, concurrency); err != nil {
return err
}
// 创建备份的元数据,里面保存了备份的创建时间和完成备份的时间
if err := storeMetadata(src, dst); err != nil {
return fmt.Errorf("cannot store backup metadata: %w", err)
}
if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil {
return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err)
}
return nil
}
// ...
func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, concurrency int) error {
startTime := time.Now()
logger.Infof("starting backup from %s to %s using origin %s", src, dst, origin)
// 从src中读取所有快照的文件
srcParts, err := src.ListParts()
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
logger.Infof("obtained %d parts from src %s", len(srcParts), src)
// 从dst中读取存在的所有文件
dstParts, err := dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts: %w", err)
}
logger.Infof("obtained %d parts from dst %s", len(dstParts), dst)
// 从origin中读取所有存在的文件
originParts, err := origin.ListParts()
if err != nil {
return fmt.Errorf("cannot list origin parts: %w", err)
}
logger.Infof("obtained %d parts from origin %s", len(originParts), origin)
// 计算src所有parts的数据容量
backupSize := getPartsSize(srcParts)
// 判断在src中不存在,在dst中存在的文件
partsToDelete := common.PartsDifference(dstParts, srcParts)
deleteSize := getPartsSize(partsToDelete)
// 删除在src中不存在,在dst中存在的文件,通常是一些小文件,vmstore将小文件合并成了大文件然后删除了小文件
// 在vmstore本地已经不存在了,所有在dst中也进行删除
if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil {
return fmt.Errorf("cannot delete unneeded parts at dst: %w", err)
}
// 统计在src中存在,在dst中不存在的文件,就是新增并需要备份的文件
partsToCopy := common.PartsDifference(srcParts, dstParts)
// 计算需要备份的文件和origin中文件的交集,origin通常也是指向远端的对象存储,用于加速备份
originPartsToCopy := common.PartsIntersect(originParts, partsToCopy)
copySize := getPartsSize(originPartsToCopy)
// 将需要备份的文件和origin中文件的交集,进行远端复制
if err := copySrcParts(origin, dst, originPartsToCopy, concurrency); err != nil {
return fmt.Errorf("cannot server-side copy origin parts to dst: %w", err)
}
// 计算需要备份的文件且在origin中不存在的文件,这是最终需要备份的文件
srcCopyParts := common.PartsDifference(partsToCopy, originParts)
uploadSize := getPartsSize(srcCopyParts)
if len(srcCopyParts) > 0 {
logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst)
var bytesUploaded atomic.Uint64
// 并发执行 func(p common.Part) error
err = runParallel(concurrency, srcCopyParts, func(p common.Part) error {
logger.Infof("uploading %s from %s to %s", &p, src, dst)
rc, err := src.NewReadCloser(p)
if err != nil {
return fmt.Errorf("cannot create reader for %s from %s: %w", &p, src, err)
}
sr := &statReader{
r: rc,
bytesRead: &bytesUploaded,
}
// 上传文件
if err := dst.UploadPart(p, sr); err != nil {
return fmt.Errorf("cannot upload %s to %s: %w", &p, dst, err)
}
if err = rc.Close(); err != nil {
return fmt.Errorf("cannot close reader for %s from %s: %w", &p, src, err)
}
return nil
}, func(elapsed time.Duration) {
n := bytesUploaded.Load()
prc := 100 * float64(n) / float64(uploadSize)
logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed)
})
if err != nil {
return err
}
}
logger.Infof("backup from %s to %s with origin %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; "+
"server-side copied %d bytes; uploaded %d bytes",
src, dst, origin, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize, uploadSize)
return nil
}
下面是对part的操作方法解析
VictoriaMetrics/lib/backup/common/part.go
// 计算在b中存在的part,在a中不存在的part
func PartsDifference(a, b []Part) []Part {
m := make(map[string]bool, len(b))
for _, p := range b {
k := p.key()
m[k] = true
}
var d []Part
for _, p := range a {
k := p.key()
if !m[k] {
d = append(d, p)
}
}
return d
}
// 计算a、b的交集
func PartsIntersect(a, b []Part) []Part {
m := make(map[string]bool, len(a))
for _, p := range a {
k := p.key()
m[k] = true
}
var d []Part
for _, p := range b {
k := p.key()
if m[k] {
d = append(d, p)
}
}
return d
}
vmbackup的备份逻辑非常清晰简单,每次备份在vmstorage中创建快照,然后从src向dst中备份,如果存在origin,通过origin做服务端复制,来加速备份。
vmrestore 介绍
在执行vmrestore时,必须停止VictoriaMetrics
Run the following command to restore backup from the given -src into the given -storageDataPath:
./vmrestore -src=<storageType>://<path/to/backup> -storageDataPath=<local/path/to/restore>
支持的远端存储引擎与vmbackup一致
vmrestore源码解析
入口文件:VictoriaMetrics/app/vmrestore/main.go
func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
flag.Usage = usage
envflag.Parse()
buildinfo.Init()
logger.Init()
// 启动vmrestore http端口,主要用于指标暴露,用于prometheus定期抓取
listenAddrs := []string{*httpListenAddr}
go httpserver.Serve(listenAddrs, nil, nil)
// 从src创建实例(通常是远端存储)
srcFS, err := newSrcFS()
if err != nil {
logger.Fatalf("%s", err)
}
// 从dst创建实例(从storageDataPath目录)
dstFS, err := newDstFS()
if err != nil {
logger.Fatalf("%s", err)
}
a := &actions.Restore{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
SkipBackupCompleteCheck: *skipBackupCompleteCheck,
}
// metric 初始化
pushmetrics.Init()
// 核心的恢复数据逻辑在run方法中
if err := a.Run(); err != nil {
logger.Fatalf("cannot restore from backup: %s", err)
}
pushmetrics.Stop()
srcFS.MustStop()
dstFS.MustStop()
startTime := time.Now()
logger.Infof("gracefully shutting down http server for metrics at %q", listenAddrs)
// 停止server
if err := httpserver.Stop(listenAddrs); err != nil {
logger.Fatalf("cannot stop http server for metrics: %s", err)
}
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
}
下面对restore的Run()方法进行解析
VictoriaMetrics/lib/backup/actions/restore.go
func (r *Restore) Run() error {
startTime := time.Now()
// Make sure VictoriaMetrics doesn't run during the restore process.
// 保证storageDataPath目录存储
fs.MustMkdirIfNotExist(r.Dst.Dir)
// 创建flock.lock文件加锁
flockF := fs.MustCreateFlockFile(r.Dst.Dir)
defer fs.MustClose(flockF)
// 创建restore-in-progress 进度文件
if err := createRestoreLock(r.Dst.Dir); err != nil {
return err
}
concurrency := r.Concurrency
src := r.Src
dst := r.Dst
if !r.SkipBackupCompleteCheck {
ok, err := src.HasFile(backupnames.BackupCompleteFilename)
if err != nil {
return err
}
// 检查src中是否有backup_complete.ignore文件,有才代表备份成功
if !ok {
return fmt.Errorf("cannot find %s file in %s; this means either incomplete backup or old backup; "+
"pass -skipBackupCompleteCheck command-line flag if you still need restoring from this backup", backupnames.BackupCompleteFilename, src)
}
}
logger.Infof("starting restore from %s to %s", src, dst)
logger.Infof("obtaining list of parts at %s", src)
// 从src读取文件列表
srcParts, err := src.ListParts()
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
logger.Infof("obtaining list of parts at %s", dst)
// 从storageDataPath读取文件列表
dstParts, err := dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts: %w", err)
}
// 计算备份的容量
backupSize := getPartsSize(srcParts)
// Validate srcParts. They must cover the whole files.
// 对src part进行排序,将相同文件名的不同offset的文件排列在一起
common.SortParts(srcParts)
offset := uint64(0)
var pOld common.Part
var path string
// 对具有相同文件名的不同part的offset进行校验
for _, p := range srcParts {
if p.Path != path {
if offset != pOld.FileSize {
return fmt.Errorf("invalid size for %q; got %d; want %d", path, offset, pOld.FileSize)
}
pOld = p
path = p.Path
offset = 0
}
if p.Offset < offset {
return fmt.Errorf("there is an overlap in %d bytes between %s and %s", offset-p.Offset, &pOld, &p)
}
if p.Offset > offset {
if offset == 0 {
return fmt.Errorf("there is a gap in %d bytes from file start to %s", p.Offset, &p)
}
return fmt.Errorf("there is a gap in %d bytes between %s and %s", p.Offset-offset, &pOld, &p)
}
if p.Size != p.ActualSize {
return fmt.Errorf("invalid size for %s; got %d; want %d", &p, p.ActualSize, p.Size)
}
offset += p.Size
}
// 取在src中存在,但在dst(本地目录)中不存在的文件,进行删除
partsToDelete := common.PartsDifference(dstParts, srcParts)
deleteSize := uint64(0)
if len(partsToDelete) > 0 {
// Remove only files with the missing part at offset 0.
// Assume other files are partially downloaded during the previous Restore.Run call,
// so only the last part in them may be incomplete.
// The last part for partially downloaded files will be re-downloaded later.
// This addresses https://github.com/VictoriaMetrics/VictoriaMetrics/issues/487 .
pathsToDelete := make(map[string]bool)
for _, p := range partsToDelete {
if p.Offset == 0 {
pathsToDelete[p.Path] = true
}
}
logger.Infof("deleting %d files from %s", len(pathsToDelete), dst)
// 删除文件
for path := range pathsToDelete {
logger.Infof("deleting %s from %s", path, dst)
size, err := dst.DeletePath(path)
if err != nil {
return fmt.Errorf("cannot delete %s from %s: %w", path, dst, err)
}
deleteSize += size
}
if err := dst.RemoveEmptyDirs(); err != nil {
return fmt.Errorf("cannot remove empty directories at %s: %w", dst, err)
}
}
// Re-read dstParts, since additional parts may be removed on the previous step.
// 重新读取本地的dst 文件目录
dstParts, err = dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts after the deletion: %w", err)
}
// 计算在src(远端)中存在,在dst(本地)不存在的文件列表,即为需要下载的文件列表
partsToCopy := common.PartsDifference(srcParts, dstParts)
downloadSize := getPartsSize(partsToCopy)
// 按照文件路径进行整理,相同文件路径、不同offset的文件放到一起
if len(partsToCopy) > 0 {
perPath := make(map[string][]common.Part)
for _, p := range partsToCopy {
parts := perPath[p.Path]
parts = append(parts, p)
perPath[p.Path] = parts
}
logger.Infof("downloading %d parts from %s to %s", len(partsToCopy), src, dst)
var bytesDownloaded atomic.Uint64
// 以文件路径为粒度,并发进行下载
// runParallelPerPath 使用waitgroup进行并发执行,逻辑比较简单
err = runParallelPerPath(concurrency, perPath, func(parts []common.Part) error {
// Sort partsToCopy in order to properly grow file size during downloading
// and to properly resume downloading of incomplete files on the next Restore.Run call.
// 对parts进行排序,并在下一次 Restore.Run 调用时正确恢复不完整文件的下载(不确定是如何恢复不完整文件的下载的)
common.SortParts(parts)
for _, p := range parts {
logger.Infof("downloading %s from %s to %s", &p, src, dst)
// 使用p的path,创建writer
wc, err := dst.NewWriteCloser(p)
if err != nil {
return fmt.Errorf("cannot create writer for %q to %s: %w", &p, dst, err)
}
sw := &statWriter{
w: wc,
bytesWritten: &bytesDownloaded,
}
// 下载part
if err := src.DownloadPart(p, sw); err != nil {
return fmt.Errorf("cannot download %s to %s: %w", &p, dst, err)
}
if err := wc.Close(); err != nil {
return fmt.Errorf("cannot close reader from %s from %s: %w", &p, src, err)
}
}
return nil
}, func(elapsed time.Duration) {
n := bytesDownloaded.Load()
prc := 100 * float64(n) / float64(downloadSize)
logger.Infof("downloaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, downloadSize, prc, src, dst, elapsed)
})
if err != nil {
return err
}
}
logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
// 删除restore-in-progress文件
return removeRestoreLock(r.Dst.Dir)
}
以上就是vmbackup/vmrestore的源码执行流程,比较简单。
总结
- vmbackup需要与vmstore在同一个实例上部署(能共享storageDataPath目录),并定期执行vmbackup进行备份,在k8s环境中,如果通过sidecar集成,还需要做一些工作,比如定时执行。
- 官方提供了vmbackupmanager对vmbackup的支持,并且在helm chart部署中提供了直接支持
- vmrestore 同样需要与vmstore在同一个实例上部署,并且vmrestore执行过程中,必须停止vmstorage写数据。