理解golang io.Pipe

golang的io包中,稍微有点儿晦涩的就是Pipe方法,今天我们就一起来看一看这个Pipe。

函数定义如下:

func Pipe() (*PipeReader, *PipeWriter)

它返回了一个Reader和一个Writer

起初一看是有点儿奇怪的,很少有这么用的哦,它到底能干嘛呢?
其实它返回的不仅仅是简单的一个Writer一个Reader,它返回的是息息相关的一个Writer和一个Reader。
下面我先用比较口语化的方式来讲一下它们是如何工作的。

假设

先假设我们在工地上,有两个工人,一个叫w,一个叫r,w负责搬砖,而r负责砌墙。

初始

w和r一起配合工作,一开始啥都没有,负责砌墙的r就没法工作,于是它开始睡觉(Wait)。而w只能去搬砖了。

砖来了

w深知r懒惰的习性,当它把砖搬过来后,就把r叫醒(Signal)。然后w心想,反正你砌墙也要一会儿,那我也睡会儿。于是w叫醒r后它也开始睡觉(Wait)。

砌墙

r被叫醒之后,心想着睡了这么久害怕被包工头责骂,自然就开始辛勤地砌墙了,很快就把w搬过来的砖用完了。r心想,这墙砌不完可怪不到我头上,因为没砖了,于是r叫醒了w,然后自己又去睡觉了。

继续搬砖

w被叫醒后一看,哎哟我去,这么快就没砖了?然后他又跑去搬了些转过来,然后叫醒睡得跟死猪一样的r起来砌墙,自己又开始睡觉……

周而复始,直到……

w和r两人就这么周而复始地配合,直到r发现墙砌好了,或者w发现工地上已经没有砖了。


以上大概就是Pipe的通俗的解释。不过问题也来了,这俩人瞌睡怎么这么多呢?w干活r就歇着,不能同时干吗?答案是——不能
为什么?因为Pipe就是为了某些特定场景而提出的。看看官方文档的说明:

Reads and Writes on the pipe are matched one to one except when multiple Reads are needed to consume a single Write

也就是说,Pipe适用于,产生了一条数据,紧接着就要处理掉这条数据的场景。而且因为其内部是一把大锁,因此是线程安全的。

内部实现

来看看内部实现,先看看read

func (p *pipe) read(b []byte) (n int, err error) {
    // One reader at a time.
    p.rl.Lock()
    defer p.rl.Unlock()

    p.l.Lock()
    defer p.l.Unlock()
    for {
        if p.rerr != nil {
            return 0, ErrClosedPipe
        }
        if p.data != nil {
            break
        }
        if p.werr != nil {
            return 0, p.werr
        }
        p.rwait.Wait()
    }
    n = copy(b, p.data)
    p.data = p.data[n:]
    if len(p.data) == 0 {
        p.data = nil
        p.wwait.Signal()
    }
    return
}

这段代码,我用伪代码简化一下:

func (p *pipe) read(b []byte) (n int, err error) {
    各种加锁()
    for {
        if 有数据可以读或者哪里有错 {
           break
        } 
        让出时间片等待被唤醒,如果是被正常调度回来的依然不醒,必须是被指名点姓叫醒才醒()
    }
    copy(b, p.data)
    通知writer可以继续写数据进来了()
}

write其实也是大同小异:

func (p *pipe) write(b []byte) (n int, err error) {
  各种加锁()
  p.data = b
  通知reader有数据了()
  for {
    if 数据被读完了或者哪里有错 {
      break
    }
    让出时间片等待被唤醒,如果是被正常调度回来的依然不醒,必须是被指名点姓叫醒才醒()
  }
  p.data = nil
}

看了伪代码,再看看实际代码,应该就很容易了。但是还有几个地方需要细说,第一个就是锁的问题。

在read中:

func (p *pipe) read(b []byte) (n int, err error) {
    // One reader at a time.
    p.rl.Lock()
    defer p.rl.Unlock()

    p.l.Lock()
    defer p.l.Unlock()
        // ...

而在write中:

func (p *pipe) write(b []byte) (n int, err error) {
    // pipe uses nil to mean not available
    if b == nil {
        b = zero[:]
    }

    // One writer at a time.
    p.wl.Lock()
    defer p.wl.Unlock()

    p.l.Lock()
    defer p.l.Unlock()
    if p.werr != nil {
        err = ErrClosedPipe
        return
    }
        // ...

可能你注意到了,read和write都会去取同一把锁p.l
假设我们writer和reader在两个不同的goroutine中执行,并且write先执行,那么依照上面的代码,write会先拿锁,当执行完

p.data = b

之后会通知reader,然后自己进入一个死循环里进行Wait,直到reader把p.data读完。但是问题来了,writer进入死循环时并没有释放锁p.l,然后reader一直等待p.l释放然后去读取数据,而writer一直在等reader读取完数据才能跳出去释放锁。看起来这是一个死锁?
我只能说“Naive”,官方标准库怎么会犯这么低级的错误呢?但是代码就这样,该如何解释?
其实,关键在于那个sync.Cond

type pipe struct {
    rl    sync.Mutex // gates readers one at a time
    wl    sync.Mutex // gates writers one at a time
    l     sync.Mutex // protects remaining fields
    data  []byte     // data remaining in pending write
    rwait sync.Cond  // waiting reader
    wwait sync.Cond  // waiting writer
    rerr  error      // if reader closed, error to give writes
    werr  error      // if writer closed, error to give reads
}

rwait和wwait都是sync.Cond,这是什么东东呢?
看下它的文档:

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond can be created as part of other structures.
// A Cond must not be copied after first use.
type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

Cond如果要细说的话,又得写另一篇文章了。在这里你只要知道sync.Cond其内部依赖于一个Locker。
而且在初始化时:

func Pipe() (*PipeReader, *PipeWriter) {
    p := new(pipe)
    p.rwait.L = &p.l
    p.wwait.L = &p.l
    r := &PipeReader{p}
    w := &PipeWriter{p}
    return r, w
}

可以看到rwait和wwait都是依赖于用一把锁,而且这把锁就是p.l。可能有点儿绕,其实就是:

  • p.l.Lock()
  • p.rwait.Wait()
  • p.wwait.Wait()
    都是依赖于同一把锁。这有什么玄机吗?——有的!
    如前所述,当writer拿到锁p.l,然后开始在死循环中p.wwait.Wait()等着reader读完数据时,表面上看起来p.l锁没有被释放,会发生死锁。但是,玄机就在p.wwait.Wait上。
    不卖关子了,p.wwait.Wait被调用时,会在内部释放锁,而由于p.l和p.wwait.L是同一把锁,因此reader进去时是可以获取到锁的。
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

Cond这个东西,要说起来比较复杂,它涉及到runtime,下次会写一篇文章具体讲讲。本文主要是讲Pipe,所以就不扩展了。

例子

Pipe的使用场景,我觉得极少数场景可能才会需要用到,我目前没有想到非常需要Pipe的场景。因为每次Read需要等Write写完,是串行的场景。不过Pipe的好处是,由于它把Write的slice放到p.data中,这是一次引用赋值。之后Read时,把p.data copy出去,本质上相当于copy了write的原始数据,并没有用临时slice存储,减少了内存使用量。
我感觉也就那么回事儿吧,为此你不得不再开个goroutine,gotoutine虽然轻量级,但也不是没有开销,当然它的开销和分配内存比就小巫见大巫了。我个人感觉,如果你的应用没有对内存要求严苛到这种级别,Pipe也没什么卵用。
如果你发现了Pipe比较合适的场景,非常希望告诉我!
下面给出一个强行使用Pipe的代码:起了多个goroutine作为writer,每个writer内部随机生成字符串写进去。唯一的reader读取数据并打印:

var r = rand.New(rand.NewSource(time.Now().UnixNano()))

func generate(writer *PipeWriter) {
    arr := make([]byte, 32)
    for {
        for i := 0; i < 32; i++ {
            arr[i] = byte(r.Uint32() >> 24)
        }
        n, err := writer.Write(arr)
        if nil != err {
            log.Fatal(err)
        }
        time.Sleep(200 * time.Millisecond)
    }
}

func main() {
    rp, wp := Pipe()
    for i := 0; i < 20; i++ {
        go generate(wp)
    }
    time.Sleep(1 * time.Second)
    data := make([]byte, 64)
    for {
        n, err := rp.Read(data)
        if nil != err {
            log.Fatal(err)
        }
        if 0 != n {
            log.Println("main loop", n, string(data))
        }
        time.Sleep(1 * time.Second)
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容