fsnotify源码分析

简述

如果golang程序想监听文件系统的变化, 那么最普遍的做法是使用fsnotify库. 起初是由Chris Howey(github account: howeyc)开发的, 后来受到广大开发者的喜爱, 遂单独建立仓库. 至今为止, 其仓库已收到了5.9k star, 这足以证明其受欢迎程度. 想了解更多关于fsnotify的历史, 可以查看官网.

源码分析

以下源码分析基于的git commit版本为: 7f4cf4dd2b522a984eaca51d1ccee54101d3414a

1. 代码统计

使用cloc工具进行源码统计, cloc --by-file-by-lang --exclude-dir=.github --exclude-lang=YAML,Markdown [project-dir], 结果如下(省略yaml等标记型语言相关统计):

File blank comment code
./integration_test.go 188 126 923
./inotify_test.go 69 28 358
./inotify_poller_test.go 29 10 190
./integration_darwin_test.go 31 31 105
./fsnotify_test.go 11 8 51
./windows.go 42 31 488
./kqueue.go 73 77 371
./inotify.go 45 66 226
./inotify_poller.go 16 33 138
./fsnotify.go 10 12 46
./fen.go 8 9 20
./open_mode_bsd.go 4 4 3
./open_mode_darwin.go 4 5 3
SUM: 530 440 2922

fsnotify的go代码总行数为2922行, 其中测试类代码占1627(=923+358+190+105+51)行, 实际有效代码只有1295行. 如此少的代码还支持了windows/linux/mac平台, 由此可见, 算是一个比较精简的库了.

2. 使用示例

为了先对代码有一个感性的认识, 我们以官方的示例作为开头, 代码如下:

package main

import (
    "log"

    "github.com/fsnotify/fsnotify"
)

func main() {
    watcher, err := fsnotify.NewWatcher() // 初始化一个空的watcher
    if err != nil {
        log.Fatal(err)
    }
    defer watcher.Close() // 最后结束程序时关闭watcher

    done := make(chan bool)
    go func() { // 启动一个协程来单独处理watcher发来的事件
        for {
            select {
            case event, ok := <-watcher.Events: // 正常的事件的处理逻辑
                if !ok {
                    return
                }
                log.Println("event:", event)
                if event.Op&fsnotify.Write == fsnotify.Write {
                    log.Println("modified file:", event.Name)
                }
            case err, ok := <-watcher.Errors: // 发生错误时的处理逻辑
                if !ok {
                    return
                }
                log.Println("error:", err)
            }
        }
    }()

    err = watcher.Add("/tmp/foo") // 使watcher监控/tmp/foo
    if err != nil {
        log.Fatal(err)
    }
    <-done // 使主协程不退出
}

用法非常的简单:

  1. 初始化一个空的fsnotify watcher
  2. 写一个协程用来处理watcher推送的事件
  3. 告诉watcher需要监听的文件或目录

3. 构建约束

fsnotify是一个跨平台的库, 源码中既包含了linux平台的实现逻辑, 也包含了mac平台和windows平台的实现逻辑, 此时问题就来了:

开发者在引用了此库后, 如何才能保证编译出来的可执行文件, 只包含对应的目标平台的实现, 而不包含无关平台的实现呢? 比如开发者的编译目标平台是linux, 编译时如何去除mac和windows等无关平台的实现代码呢?

好在golang为我们提供了构建约束(Build Constraints), 大概使用方法如下:

// +build linux,386 darwin,!cgo

上面这条注释不是普通的注释, 而是构建约束, 把它写在代码文件的顶部(package声明的上面), 会被编译器在编译时按照目标平台来判断是否编译进可执行文件中. 上面这行构建约束的意思是(linux AND 386) OR (darwin AND (NOT cgo)).

好了, 了解了构建约束的用法, 我们看fsnotify的源码时就可以根据自己所关心的平台来详细阅读其实现.

4. 详细解读--linux部分

用的最多的当属linux实现部分了, 其底层是基于linux的inotify机制, 相关逻辑就在库中的inotify.go文件中.

a. 总体思路

按照前面使用示例的步骤, 第一步是watcher, err := fsnotify.NewWatcher(), 那么我们就来看看这里new的watcher都包含什么, 代码如下:

// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
    // Create inotify fd
    fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
    if fd == -1 {
        return nil, errno
    }
    // Create epoll
    poller, err := newFdPoller(fd)
    if err != nil {
        unix.Close(fd)
        return nil, err
    }
    w := &Watcher{
        fd:       fd,
        poller:   poller,
        watches:  make(map[string]*watch),
        paths:    make(map[int]string),
        Events:   make(chan Event),
        Errors:   make(chan error),
        done:     make(chan struct{}),
        doneResp: make(chan struct{}),
    }

    go w.readEvents()
    return w, nil
}

上面代码的总体思路:

  1. 建立一个inotify实例

    inotify实例会以一个文件描述符(fd)的形式返回给调用者, 一旦有我们watch的文件发生变化, 就能从这个fd里读到相应的事件. 但是问题是这个文件描述符需要我们自己去读取, 所以我们就需要有某种轮训机制, 就引出下面的epoll注册的用处.

  2. 使用epoll监听实例上的事件

    把这个fd注册到epoll上, 在fd上有数据到达时, epoll就能立刻收到并返回给我们.

  3. 初始化各种的状态上下文, 如: watches用来存放watch对象, event用来推送事件

  4. 启动监听协程

b. 事件监听协程

上面的代码最后启动了一个监听协程go w.readEvents(), 我们就来看看这里发生了什么, 代码如下:

为使篇幅简练, 省略冗余代码

func (w *Watcher) readEvents() {
    var (...) // 各种变量
    defer close(...) // 关闭上下文的各种资源

    for {
        if w.isClosed() { return }

        ok, errno = w.poller.wait() // 程序阻塞在这行, 直到epoll监听到相关事件为止
        if ... { continue } // 各种error处理逻辑
    
        n, errno = unix.Read(w.fd, buf[:]) // 走到这里的话就是有事件发生, 所以这里读出事件到buffer里, 放到下面处理
        if ... { continue } // 各种error处理逻辑
        if n < unix.SizeofInotifyEvent { // 当读到的事件小于16字节(一个事件结构体的单位大小), 异常处理逻辑
            ...
            continue
        }

        var offset uint32
        // 此时我们也不知道读了几个事件到buffer里
        // 所以我们就用offset记录下当前所读到的位置偏移量, 直到读完为止
        // 这个for循环结束条件是: offset累加到了某个值, 以至于剩余字节数不够读取出一整个inotify event结构体
        for offset <= uint32(n-unix.SizeofInotifyEvent) {
            // 强制把地址值转换成inotify结构体
            raw := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset]))

            mask := uint32(raw.Mask) // 所发生的事件以掩码形式表示
            nameLen := uint32(raw.Len) // 当监听的是个目录时, 目录中发生事件的文件名会包含在结构体中, 这里的len就是文件名的长度

            if mask&unix.IN_Q_OVERFLOW != 0 { ... } // mask格式错误, 向Errors chan发送事件

            w.mu.Lock() // 由于可能会对上下文进行删除操作, 所以锁住
            // Wd是我们所watch的, 并且此次发生事件了的文件描述符
            // 我们可以从构建好的上下文中取出这个文件描述符所对应的文件名
            name, ok := w.paths[int(raw.Wd)] 
            // 如果发生删除事件, 也一并在上下文中删掉这个文件名
            if ok && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
                delete(w.paths, int(raw.Wd))
                delete(w.watches, name)
            }
            w.mu.Unlock() // 解锁

            if nameLen > 0 { // 当我们watch是一个目录的时候, 其下面的文件发生事件时, 就会导致这个nameLen大于0
                // 此时读取文件名字(文件名就在inotify event结构体的后面), 强制把地址值转换成长度4096的byte数组
                bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen]
                // 拼接路径(文件名会以\000为结尾表示, 所以要去掉)
                name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
            }

            event := newEvent(name, mask) // 生成一个event
            if !event.ignoreLinux(mask) { // 如果这个事件没有被忽略, 那么发送到Events chan
                select {
                case w.Events <- event:
                case <-w.done:
                    return
                }
            }

            // 移动offset偏移量到下个inotify event结构体
            offset += unix.SizeofInotifyEvent + nameLen
        }
    }
}

c. 添加watch路径

我们通过err = watcher.Add("/tmp/foo")来让watcher去watch路径/tmp/foo, Add方法就是在inotify里注册路径, 代码如下:

// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
    name = filepath.Clean(name) // 获取标准路径, 如/tmp//////too经过Clean后就成了/tmp/too
    if w.isClosed() {
        return errors.New("inotify instance already closed")
    }

    const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
        unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
        unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF

    var flags uint32 = agnosticEvents

    w.mu.Lock()
    defer w.mu.Unlock()
  watchEntry := w.watches[name] // 取出上下文里的watch路径(如果存在的话)
    if watchEntry != nil {
        flags |= watchEntry.flags | unix.IN_MASK_ADD
    }
    wd, errno := unix.InotifyAddWatch(w.fd, name, flags) // 添加watch路径
    if wd == -1 {
        return errno
    }

    if watchEntry == nil { // 如果上下文里不存在此路径, 表明这是一个新的watch, 添加到上下文
        w.watches[name] = &watch{wd: uint32(wd), flags: flags}
        w.paths[wd] = name
    } else { // 如果在上下文中存在, 则更新上下文
        watchEntry.wd = uint32(wd)
        watchEntry.flags = flags
    }

    return nil
}

d. 删除watch路径

// Remove stops watching the named file or directory (non-recursively).
func (w *Watcher) Remove(name string) error {
    name = filepath.Clean(name) // 获取标准路径

    // 涉及到多协程可能同时对同一个watch项写, 所以锁住
    w.mu.Lock()
    defer w.mu.Unlock() // 最后解锁
    watch, ok := w.watches[name]

    if ... { ... } // 错误处理

    // 删除上下文里的相应watch项
    delete(w.paths, int(watch.wd))
    delete(w.watches, name)
    // 删除inotify中watch的fd
    success, errno := unix.InotifyRmWatch(w.fd, watch.wd)
    if ... { ... } // 错误处理
  
    return nil
}

e. poller部分(基于epoll)

我们上面看到在func NewWatcher() (*Watcher, error)函数中调用了poller, err := newFdPoller(fd), 这是将inotify的fd注册在epoll上, 以实现高效的fs监听, 代码如下:

为使篇幅简练, 省略冗余代码

func newFdPoller(fd int) (*fdPoller, error) {
    var errno error
    poller := emptyPoller(fd)
    defer func() {
        if errno != nil {
            poller.close()
        }
    }()
    poller.fd = fd

    // 要使用epoll的话, 需要使用EpollCreate函数为其单独创建一个fd
    poller.epfd, errno = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
    if ... { return ... } // error处理
  
    // 为实现优雅退出, 需要创建一个管道, pipe[0]用来读, pipe[1]用来写
  // 在介绍watcher的Close函数时会分析这部分的逻辑
    errno = unix.Pipe2(poller.pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC)
    if ... { return ... } // error处理

    // 注册inotify的fd到epoll
    event := unix.EpollEvent{
        Fd:     int32(poller.fd),
        Events: unix.EPOLLIN,
    }
    errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.fd, &event)
    if ... { return ... } // error处理

    // 注册管道的fd到epoll
    event = unix.EpollEvent{
        Fd:     int32(poller.pipe[0]),
        Events: unix.EPOLLIN,
    }
    errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.pipe[0], &event)
    if ... { return ... } // error处理

    return poller, nil
}

函数func newFdPoller(fd int) (*fdPoller, error)在epoll的fd上注册了两个文件, 一个是inotify的, 另一个是其用来实现优雅退出的pipe[0].

我们在上面的*事件监听协程 func (w Watcher) readEvents()小节中提到的ok, errno = w.poller.wait()语句阻塞直到收到事件才会返回, 来看看具体poller(也就是上面的epoll)对事件的处理逻辑, 代码如下:

为使篇幅简练, 省略冗余代码

func (poller *fdPoller) wait() (bool, error) {
    // 总共监听两个fd: 1.inotify 2.优雅退出所需的pipe[0]
    // 每个fd有三种可能的事件, 所以最多可以触发6个事件
    // 准备一个大于6的slice
    events := make([]unix.EpollEvent, 7)
    for {
        // 阻塞wait在epoll的fd上, 参数-1表示不会超时
        // 一旦有事件产生, 就会发到events里
        n, errno := unix.EpollWait(poller.epfd, events, -1) 
        if ... { ... } // 各种异常处理

        // 以下就是收到正常事件的处理
        ready := events[:n]
        epollhup := false
        epollerr := false
        epollin := false
        for _, event := range ready {
            if event.Fd == int32(poller.fd) {
                if event.Events&unix.EPOLLHUP != 0 {
                    // This should not happen, but if it does, treat it as a wakeup.
                    epollhup = true
                }
                if event.Events&unix.EPOLLERR != 0 {
                    // If an error is waiting on the file descriptor, we should pretend
                    // something is ready to read, and let unix.Read pick up the error.
                    epollerr = true
                }
                if event.Events&unix.EPOLLIN != 0 {
                    // inotify有事件
                    epollin = true
                }
            }
            if event.Fd == int32(poller.pipe[0]) {
                if event.Events&unix.EPOLLHUP != 0 {
                    // Write pipe descriptor was closed, by us. This means we're closing down the
                    // watcher, and we should wake up.
                }
                if event.Events&unix.EPOLLERR != 0 {
                    // If an error is waiting on the pipe file descriptor.
                    // This is an absolute mystery, and should never ever happen.
                    return false, errors.New("Error on the pipe descriptor.")
                }
                if event.Events&unix.EPOLLIN != 0 {
                    // 收到程序发来的优雅退出事件, 将调用clearWake以使管道排空
                    err := poller.clearWake()
                    if err != nil {
                        return false, err
                    }
                }
            }
        }

        if epollhup || epollerr || epollin {
            return true, nil
        }
        return false, nil
    }
}

clearWake函数, 代码如下

func (poller *fdPoller) clearWake() error {
   // You have to be woken up a LOT in order to get to 100!
   buf := make([]byte, 100)
   n, errno := unix.Read(poller.pipe[0], buf) // 读取pipe[0]中的退出信号
   if ... { ... } // 错误处理
   return nil
}

那么pipe[0]中的信号是怎么来的呢? 也就是说必须有一个地方往pipe[1]中写数据. 其实, 我们示例代码中采用defer方式调用了watcher.Close()函数, 而其最重要的一步就是调用w.poller.wake()函数, 代码如下:

为使篇幅简练, 省略冗余代码

// Close the write end of the poller.
func (poller *fdPoller) wake() error {
    buf := make([]byte, 1)
    // 这里在pipe[1]写入了一个字符当做退出信号
    n, errno := unix.Write(poller.pipe[1], buf)
    if ... { ... } // 错误处理
    return nil
}

题外话: 关于这个优雅退出的早期设计其实不是这样的, 但是思路差不多. 有兴趣可以去看看fsnotify的早期提交

至此, 关于fsnotify对linux的实现就分析完了.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,064评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,606评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,011评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,550评论 1 269
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,465评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,919评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,428评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,075评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,208评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,185评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,191评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,914评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,482评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,585评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,825评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,194评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,703评论 2 339

推荐阅读更多精彩内容