VictoriaMetrics vmbackup/vmrestore 源码解析

VictoriaMetrics 官方文档对vmbackup和vmrestore的介绍比较少,近期整理了相关的文档,现在将源码阅读的内容分享给大家。

vmbackup 简介

vmbackup 从即时快照创建 VictoriaMetrics 的数据备份。
vmbackup支持增量备份和完整备份,如果目标路径(-dst参数)包含先前备份的数据,则会自动创建增量备份,可以通过-origin指向同一远端存储上的已有的备份来加速备份,在这种情况下,vmbackup会执行服务器端复制,来节省数据传输的成本和时间。
vmbackup备份进程可以随时中断。当使用相同参数重新启动 vmbackup 时,它会自动从中断点恢复。
支持的存储类型

  • GCS. Example: gs://<bucket>/<path/to/backup>
  • S3. Example: s3://<bucket>/<path/to/backup>
  • Azure Blob Storage. Example: azblob://<container>/<path/to/backup>
  • 任何兼容S3的存储,如: MinIO, Ceph or Swift. See these docs for details.
  • 本地文件系统. Example: fs://</absolute/path/to/backup>.

可以使用以下命令进行定期备份:

./vmbackup -storageDataPath=</path/to/victoria-metrics-data> -snapshot.createURL=http://localhost:8428/snapshot/create -dst=gs://<bucket>/<path/to/new/backup>

vmbackup源码解析

入口文件:VictoriaMetrics/app/vmbackup/main.go

// ...
func main() {
    // ...
    // 完成flag、usage、环境变量、日志等的初始化
    // ...

    deleteSnapshot := func() {}
    // 判断是否指定了snapshotCreateURL参数
    if len(*snapshotCreateURL) > 0 {
        // 调用vmstorage的api,创建文件快照
        createURL, err := url.Parse(*snapshotCreateURL)
        if err != nil {
            logger.Fatalf("cannot parse snapshotCreateURL: %s", err)
        }
        // 不要同时指定snapshotCreateURL 和 snapshotName参数
        if len(*snapshotName) > 0 {
            logger.Fatalf("-snapshotName shouldn't be set if -snapshot.createURL is set, since snapshots are created automatically in this case")
        }
        logger.Infof("Snapshot create url %s", createURL.Redacted())
        if len(*snapshotDeleteURL) <= 0 {
            err := flag.Set("snapshot.deleteURL", strings.Replace(*snapshotCreateURL, "/create", "/delete", 1))
            if err != nil {
                logger.Fatalf("Failed to set snapshot.deleteURL flag: %v", err)
            }
        }
        // 构造要删除快照的url
        deleteURL, err := url.Parse(*snapshotDeleteURL)
        if err != nil {
            logger.Fatalf("cannot parse snapshotDeleteURL: %s", err)
        }
        logger.Infof("Snapshot delete url %s", deleteURL.Redacted())

        // 创建快照
        name, err := snapshot.Create(createURL.String())
        if err != nil {
            logger.Fatalf("cannot create snapshot: %s", err)
        }
        // 将创建的快照名称赋值给flag的snapshotName变量,供后续使用
        err = flag.Set("snapshotName", name)
        if err != nil {
            logger.Fatalf("cannot set snapshotName flag: %v", err)
        }

        deleteSnapshot = func() {
            err := snapshot.Delete(deleteURL.String(), name)
            if err != nil {
                logger.Fatalf("cannot delete snapshot: %s", err)
            }
        }
    }
    // 开启vmbackup的http端口,用于指标暴露,供prometheus采集
    listenAddrs := []string{*httpListenAddr}
    go httpserver.Serve(listenAddrs, nil, nil)

    pushmetrics.Init()
    err := makeBackup() // makeBackup是备份的核心逻辑
    deleteSnapshot()    // 调用vmbackup api删除快照
    // ... 后续收尾工作
}

func makeBackup() error {
    // 创建目标备份对象,一般是远端存储的对象存储实例,从-dst参数中解析出存储类型(gcs、s3、azblob、fs等等),解析出对象存储桶和目录
    // 使用-customS3Endpoint参数连接远端对象存储
    // 使用-credsFilePath文件路径做校验(key、secrets)
    dstFS, err := newDstFS()
    if err != nil {
        return err
    }
    if *snapshotName == "" {
        // 做远端存储的服务端的复制 from -origin to -dst
        originFS, err := newRemoteOriginFS()
        if err != nil {
            return err
        }
        a := &actions.RemoteBackupCopy{
            Concurrency: *concurrency,
            Src:         originFS,
            Dst:         dstFS,
        }
        if err := a.Run(); err != nil {
            return err
        }
        originFS.MustStop()
    } else {
        // 创建src实例
        // 从快照的数据目录创建实例
        srcFS, err := newSrcFS()
        if err != nil {
            return err
        }
        // 创建origin实例,如果-origin为空,创建的实例为空,主要是用于加速复制
        originFS, err := newOriginFS()
        if err != nil {
            return err
        }
        a := &actions.Backup{
            Concurrency: *concurrency,
            Src:         srcFS,
            Dst:         dstFS,
            Origin:      originFS,
        }
        // 备份的核心逻辑在run方法中
        if err := a.Run(); err != nil {
            return err
        }
        srcFS.MustStop()
        originFS.MustStop()
    }
    dstFS.MustStop()
    return nil
}

// ...

下面对Run()方法进行解析

VictoriaMetrics/lib/backup//actions/backup.go

// Run runs b with the provided settings.
func (b *Backup) Run() error {
    concurrency := b.Concurrency
    src := b.Src
    dst := b.Dst
    origin := b.Origin

    if origin != nil && origin.String() == dst.String() {
        origin = nil
    }
    if origin == nil {
        origin = &fsnil.FS{}
    }

    // 删除远端存储的 backup_complete.ignore 文件,该文件是在上一次备份完成后创建的,表示备份成功
    if err := dst.DeleteFile(backupnames.BackupCompleteFilename); err != nil {
        return fmt.Errorf("cannot delete `backup complete` file at %s: %w", dst, err)
    }
    // 运行备份程序
    if err := runBackup(src, dst, origin, concurrency); err != nil {
        return err
    }
    // 创建备份的元数据,里面保存了备份的创建时间和完成备份的时间
    if err := storeMetadata(src, dst); err != nil {
        return fmt.Errorf("cannot store backup metadata: %w", err)
    }
    if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil {
        return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err)
    }

    return nil
}

// ...

func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, concurrency int) error {
    startTime := time.Now()

    logger.Infof("starting backup from %s to %s using origin %s", src, dst, origin)

    // 从src中读取所有快照的文件
    srcParts, err := src.ListParts()
    if err != nil {
        return fmt.Errorf("cannot list src parts: %w", err)
    }
    logger.Infof("obtained %d parts from src %s", len(srcParts), src)
    // 从dst中读取存在的所有文件
    dstParts, err := dst.ListParts()
    if err != nil {
        return fmt.Errorf("cannot list dst parts: %w", err)
    }
    logger.Infof("obtained %d parts from dst %s", len(dstParts), dst)
    // 从origin中读取所有存在的文件
    originParts, err := origin.ListParts()
    if err != nil {
        return fmt.Errorf("cannot list origin parts: %w", err)
    }
    logger.Infof("obtained %d parts from origin %s", len(originParts), origin)
    // 计算src所有parts的数据容量
    backupSize := getPartsSize(srcParts)
    // 判断在src中不存在,在dst中存在的文件
    partsToDelete := common.PartsDifference(dstParts, srcParts)
    deleteSize := getPartsSize(partsToDelete)
    // 删除在src中不存在,在dst中存在的文件,通常是一些小文件,vmstore将小文件合并成了大文件然后删除了小文件
    // 在vmstore本地已经不存在了,所有在dst中也进行删除
    if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil {
        return fmt.Errorf("cannot delete unneeded parts at dst: %w", err)
    }
    // 统计在src中存在,在dst中不存在的文件,就是新增并需要备份的文件
    partsToCopy := common.PartsDifference(srcParts, dstParts)
    // 计算需要备份的文件和origin中文件的交集,origin通常也是指向远端的对象存储,用于加速备份
    originPartsToCopy := common.PartsIntersect(originParts, partsToCopy)
    copySize := getPartsSize(originPartsToCopy)
    // 将需要备份的文件和origin中文件的交集,进行远端复制
    if err := copySrcParts(origin, dst, originPartsToCopy, concurrency); err != nil {
        return fmt.Errorf("cannot server-side copy origin parts to dst: %w", err)
    }
    // 计算需要备份的文件且在origin中不存在的文件,这是最终需要备份的文件
    srcCopyParts := common.PartsDifference(partsToCopy, originParts)
    uploadSize := getPartsSize(srcCopyParts)
    if len(srcCopyParts) > 0 {
        logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst)
        var bytesUploaded atomic.Uint64
        // 并发执行 func(p common.Part) error
        err = runParallel(concurrency, srcCopyParts, func(p common.Part) error {
            logger.Infof("uploading %s from %s to %s", &p, src, dst)
            rc, err := src.NewReadCloser(p)
            if err != nil {
                return fmt.Errorf("cannot create reader for %s from %s: %w", &p, src, err)
            }
            sr := &statReader{
                r:         rc,
                bytesRead: &bytesUploaded,
            }
            // 上传文件
            if err := dst.UploadPart(p, sr); err != nil {
                return fmt.Errorf("cannot upload %s to %s: %w", &p, dst, err)
            }
            if err = rc.Close(); err != nil {
                return fmt.Errorf("cannot close reader for %s from %s: %w", &p, src, err)
            }
            return nil
        }, func(elapsed time.Duration) {
            n := bytesUploaded.Load()
            prc := 100 * float64(n) / float64(uploadSize)
            logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed)
        })
        if err != nil {
            return err
        }
    }

    logger.Infof("backup from %s to %s with origin %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; "+
        "server-side copied %d bytes; uploaded %d bytes",
        src, dst, origin, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize, uploadSize)

    return nil
}

下面是对part的操作方法解析

VictoriaMetrics/lib/backup/common/part.go

// 计算在b中存在的part,在a中不存在的part
func PartsDifference(a, b []Part) []Part {
   m := make(map[string]bool, len(b))
   for _, p := range b {
       k := p.key()
       m[k] = true
   }
   var d []Part
   for _, p := range a {
       k := p.key()
       if !m[k] {
           d = append(d, p)
       }
   }
   return d
}

// 计算a、b的交集
func PartsIntersect(a, b []Part) []Part {
   m := make(map[string]bool, len(a))
   for _, p := range a {
       k := p.key()
       m[k] = true
   }
   var d []Part
   for _, p := range b {
       k := p.key()
       if m[k] {
           d = append(d, p)
       }
   }
   return d
}

vmbackup的备份逻辑非常清晰简单,每次备份在vmstorage中创建快照,然后从src向dst中备份,如果存在origin,通过origin做服务端复制,来加速备份。

vmrestore 介绍

在执行vmrestore时,必须停止VictoriaMetrics
Run the following command to restore backup from the given -src into the given -storageDataPath:

./vmrestore -src=<storageType>://<path/to/backup> -storageDataPath=<local/path/to/restore>

支持的远端存储引擎与vmbackup一致

vmrestore源码解析

入口文件:VictoriaMetrics/app/vmrestore/main.go

func main() {
    // Write flags and help message to stdout, since it is easier to grep or pipe.
    flag.CommandLine.SetOutput(os.Stdout)
    flag.Usage = usage
    envflag.Parse()
    buildinfo.Init()
    logger.Init()
    // 启动vmrestore http端口,主要用于指标暴露,用于prometheus定期抓取
    listenAddrs := []string{*httpListenAddr}
    go httpserver.Serve(listenAddrs, nil, nil)

    // 从src创建实例(通常是远端存储)
    srcFS, err := newSrcFS()
    if err != nil {
        logger.Fatalf("%s", err)
    }
    // 从dst创建实例(从storageDataPath目录)
    dstFS, err := newDstFS()
    if err != nil {
        logger.Fatalf("%s", err)
    }
    a := &actions.Restore{
        Concurrency:             *concurrency,
        Src:                     srcFS,
        Dst:                     dstFS,
        SkipBackupCompleteCheck: *skipBackupCompleteCheck,
    }
    // metric 初始化
    pushmetrics.Init()
    // 核心的恢复数据逻辑在run方法中
    if err := a.Run(); err != nil {
        logger.Fatalf("cannot restore from backup: %s", err)
    }
    pushmetrics.Stop()
    srcFS.MustStop()
    dstFS.MustStop()

    startTime := time.Now()
    logger.Infof("gracefully shutting down http server for metrics at %q", listenAddrs)
    // 停止server
    if err := httpserver.Stop(listenAddrs); err != nil {
        logger.Fatalf("cannot stop http server for metrics: %s", err)
    }
    logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
}

下面对restore的Run()方法进行解析

VictoriaMetrics/lib/backup/actions/restore.go

func (r *Restore) Run() error {
    startTime := time.Now()

    // Make sure VictoriaMetrics doesn't run during the restore process.
    // 保证storageDataPath目录存储
    fs.MustMkdirIfNotExist(r.Dst.Dir)
    // 创建flock.lock文件加锁
    flockF := fs.MustCreateFlockFile(r.Dst.Dir)
    defer fs.MustClose(flockF)
    // 创建restore-in-progress 进度文件
    if err := createRestoreLock(r.Dst.Dir); err != nil {
        return err
    }
    concurrency := r.Concurrency
    src := r.Src
    dst := r.Dst

    if !r.SkipBackupCompleteCheck {
        ok, err := src.HasFile(backupnames.BackupCompleteFilename)
        if err != nil {
            return err
        }
        // 检查src中是否有backup_complete.ignore文件,有才代表备份成功
        if !ok {
            return fmt.Errorf("cannot find %s file in %s; this means either incomplete backup or old backup; "+
                "pass -skipBackupCompleteCheck command-line flag if you still need restoring from this backup", backupnames.BackupCompleteFilename, src)
        }
    }

    logger.Infof("starting restore from %s to %s", src, dst)

    logger.Infof("obtaining list of parts at %s", src)
    // 从src读取文件列表
    srcParts, err := src.ListParts()
    if err != nil {
        return fmt.Errorf("cannot list src parts: %w", err)
    }
    logger.Infof("obtaining list of parts at %s", dst)
    // 从storageDataPath读取文件列表
    dstParts, err := dst.ListParts()
    if err != nil {
        return fmt.Errorf("cannot list dst parts: %w", err)
    }
    // 计算备份的容量
    backupSize := getPartsSize(srcParts)

    // Validate srcParts. They must cover the whole files.
    // 对src part进行排序,将相同文件名的不同offset的文件排列在一起
    common.SortParts(srcParts)
    offset := uint64(0)
    var pOld common.Part
    var path string
    // 对具有相同文件名的不同part的offset进行校验
    for _, p := range srcParts {
        if p.Path != path {
            if offset != pOld.FileSize {
                return fmt.Errorf("invalid size for %q; got %d; want %d", path, offset, pOld.FileSize)
            }
            pOld = p
            path = p.Path
            offset = 0
        }
        if p.Offset < offset {
            return fmt.Errorf("there is an overlap in %d bytes between %s and %s", offset-p.Offset, &pOld, &p)
        }
        if p.Offset > offset {
            if offset == 0 {
                return fmt.Errorf("there is a gap in %d bytes from file start to %s", p.Offset, &p)
            }
            return fmt.Errorf("there is a gap in %d bytes between %s and %s", p.Offset-offset, &pOld, &p)
        }
        if p.Size != p.ActualSize {
            return fmt.Errorf("invalid size for %s; got %d; want %d", &p, p.ActualSize, p.Size)
        }
        offset += p.Size
    }
    // 取在src中存在,但在dst(本地目录)中不存在的文件,进行删除
    partsToDelete := common.PartsDifference(dstParts, srcParts)
    deleteSize := uint64(0)
    if len(partsToDelete) > 0 {
        // Remove only files with the missing part at offset 0.
        // Assume other files are partially downloaded during the previous Restore.Run call,
        // so only the last part in them may be incomplete.
        // The last part for partially downloaded files will be re-downloaded later.
        // This addresses https://github.com/VictoriaMetrics/VictoriaMetrics/issues/487 .
        pathsToDelete := make(map[string]bool)
        for _, p := range partsToDelete {
            if p.Offset == 0 {
                pathsToDelete[p.Path] = true
            }
        }
        logger.Infof("deleting %d files from %s", len(pathsToDelete), dst)
        // 删除文件
        for path := range pathsToDelete {
            logger.Infof("deleting %s from %s", path, dst)
            size, err := dst.DeletePath(path)
            if err != nil {
                return fmt.Errorf("cannot delete %s from %s: %w", path, dst, err)
            }
            deleteSize += size
        }
        if err := dst.RemoveEmptyDirs(); err != nil {
            return fmt.Errorf("cannot remove empty directories at %s: %w", dst, err)
        }
    }

    // Re-read dstParts, since additional parts may be removed on the previous step.
    // 重新读取本地的dst 文件目录
    dstParts, err = dst.ListParts()
    if err != nil {
        return fmt.Errorf("cannot list dst parts after the deletion: %w", err)
    }
    // 计算在src(远端)中存在,在dst(本地)不存在的文件列表,即为需要下载的文件列表
    partsToCopy := common.PartsDifference(srcParts, dstParts)
    downloadSize := getPartsSize(partsToCopy)
    // 按照文件路径进行整理,相同文件路径、不同offset的文件放到一起
    if len(partsToCopy) > 0 {
        perPath := make(map[string][]common.Part)
        for _, p := range partsToCopy {
            parts := perPath[p.Path]
            parts = append(parts, p)
            perPath[p.Path] = parts
        }
        logger.Infof("downloading %d parts from %s to %s", len(partsToCopy), src, dst)
        var bytesDownloaded atomic.Uint64
        // 以文件路径为粒度,并发进行下载
        // runParallelPerPath 使用waitgroup进行并发执行,逻辑比较简单
        err = runParallelPerPath(concurrency, perPath, func(parts []common.Part) error {
            // Sort partsToCopy in order to properly grow file size during downloading
            // and to properly resume downloading of incomplete files on the next Restore.Run call.
            // 对parts进行排序,并在下一次 Restore.Run 调用时正确恢复不完整文件的下载(不确定是如何恢复不完整文件的下载的)
            common.SortParts(parts)
            for _, p := range parts {
                logger.Infof("downloading %s from %s to %s", &p, src, dst)
                // 使用p的path,创建writer
                wc, err := dst.NewWriteCloser(p)
                if err != nil {
                    return fmt.Errorf("cannot create writer for %q to %s: %w", &p, dst, err)
                }
                sw := &statWriter{
                    w:            wc,
                    bytesWritten: &bytesDownloaded,
                }
                // 下载part
                if err := src.DownloadPart(p, sw); err != nil {
                    return fmt.Errorf("cannot download %s to %s: %w", &p, dst, err)
                }
                if err := wc.Close(); err != nil {
                    return fmt.Errorf("cannot close reader from %s from %s: %w", &p, src, err)
                }
            }
            return nil
        }, func(elapsed time.Duration) {
            n := bytesDownloaded.Load()
            prc := 100 * float64(n) / float64(downloadSize)
            logger.Infof("downloaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, downloadSize, prc, src, dst, elapsed)
        })
        if err != nil {
            return err
        }
    }

    logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
        backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
    // 删除restore-in-progress文件
    return removeRestoreLock(r.Dst.Dir)
}

以上就是vmbackup/vmrestore的源码执行流程,比较简单。

总结

  • vmbackup需要与vmstore在同一个实例上部署(能共享storageDataPath目录),并定期执行vmbackup进行备份,在k8s环境中,如果通过sidecar集成,还需要做一些工作,比如定时执行。
  • 官方提供了vmbackupmanager对vmbackup的支持,并且在helm chart部署中提供了直接支持
  • vmrestore 同样需要与vmstore在同一个实例上部署,并且vmrestore执行过程中,必须停止vmstorage写数据。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容