golang nats[4] request reply模式

请求响应模式

无论是发布订阅模式还是queue模式,nats都不能保证消息一定发送到订阅方,除非订阅者发送一个响应给发布者。
所以订阅者发送一个回执给发布者,就是请求响应模式。

这种模式有什么用?

nats要求订阅者一定要先完成订阅,发布消息后,订阅者才能收到消息,类似离线消息的模式nats不支持。就算先完成订阅,后发送消息,消息发送方也不知道是否有订阅者收到了消息,请求响应模式就是应对这种情况。

基本流程

A发送消息,B收到消息,发送回执给A。这就是request reply的基本流程。

基本实现原理

  • A启用request模式发送消息(消息中包含了回执信息,replya主题),同步等待回执(有超时时间)。
  • B收到消息,在消息中取出回执信息=replay主题,对replay主题,主动发送普通消息(消息内容可自定义,比如server A上的service1收到msgid=xxxx的消息。)。
  • A在超时内收到消息,确认结束。
  • A在超时内未收到消息,超时结束。

注意

  • 因为A发送的消息中包装了回执测相关信息,订阅者B收到消息后,也要主动发送回执,所以请求响应模式,对双方都有影响。
  • A发送消息后,等待B的回执,需要给A设置超时时间,超时后,不在等待回执,直接结束,效果和不需要回执的消息发送一样,不在关心是否有订阅者收到消息。

两种模式

request reply有两种模式:

  • one to one 默认模式

1条消息,N个订阅者,消息发送方,仅会收到一条回执记录(因为消息发送方收到回执消息后,就自动断开了对回执消息的订阅。),即使N个订阅都都收到了消息。注意:pub/sub和queue模式的不同

  • one to many 非默认模式,需要自己实现

1条消息,N个订阅者,消息发送方,可以自己设定一个数量限制N,接受到N个回执消息后,断开对回执消息的订阅。

Server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc *nats.Conn

    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err) {
        }
    }
}

func main() {
    var (
        servername = flag.String("servername", "Y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()

    mode := "queue"
    if *queueGroup == "" {
        mode = "pub/sub"
    }
    log.Printf("Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode)

    startService(*subj, *servername+" worker1", *queueGroup)
    startService(*subj, *servername+" worker2", *queueGroup)
    startService(*subj, *servername+" worker3", *queueGroup)

    nc.Flush()
    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    replyMsg := name + " Received a msg"
    if queue == "" {
        nc.Subscribe(subj, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    } else {
        nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    }

}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

Client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "github.com/pborman/uuid"
    "flag"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc         *nats.Conn
    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err, func() {

    }) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err, func() {

            }) {

        }
    }
}

func main() {
    var (
        subj = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 3; i++ {
        id := uuid.New()
        log.Println(id)
        if msg, err := nc.Request(subj, []byte(id+" hello"), time.Second); checkErr(err, func() {
            // handle err
        }) {
            log.Println(string(msg.Data))
        }
    }
}

func checkErr(err error, errFun func()) bool {
    if err != nil {
        log.Println(err)
        errFun()
        return false
    }
    return true
}

pub/sub模式启动

$ ./main
2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode
2018/08/18 18:54:26 Y worker2 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello

发送消息

$ ./main
2018/08/18 18:54:26 yasenagat
2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a
2018/08/18 18:54:26 Y worker3 Received a msg
2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c
2018/08/18 18:54:26 Y worker2 Received a msg
2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e
2018/08/18 18:54:26 Y worker2 Received a msg

queue模式启动

$ ./main -group=test
2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode
2018/08/18 19:14:33 Y worker2 Received a message From Async :  4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg
2018/08/18 19:14:33 Y worker3 Received a message From Async :  4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg
2018/08/18 19:14:33 Y worker2 Received a message From Async :  38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg

发送消息

$ ./main
2018/08/18 19:14:33 yasenagat
2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e
2018/08/18 19:14:33 Y worker2 Received a msg
2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0
2018/08/18 19:14:33 Y worker3 Received a msg
2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127
2018/08/18 19:14:33 Y worker2 Received a msg

queue模式下,发送3条消息,3个订阅者有相同的queue,每条消息只有一个订阅者收到。

pub/sub模式下,发送3条消息,3个订阅者都收到3条消息,一共9条。

总结:

回执主要解决:订阅者是否收到消息的问题、有多少个订阅者收到消息的问题。(不是具体业务是否执行完成的回执!)
基于事件的架构模式可以构建于消息机制之上,依赖消息机制。异步调用的其中一种实现方式,就是基于事件模式。异步调用又是分布式系统中常见的任务处理方式。

业务模式

  • 业务A发送eventA给事件中心,等待回执
  • 事件中心告知A收到了消息,开始对外发送广播
  • 订阅者B订阅了eventA主题
  • 事件中心对eventA主题发送广播,等待回执
  • B收到消息,告知事件中心,收到eventA,开始执行任务taskA
  • B异步执行完taskA,通知事件中心taskAComplete,等待回执
  • 事件中心发送回执给B,对外发送广播,taskAComplete
  • ........

如果超时,未能收到回执,需要回执信息的确认方可以主动调用相关接口,查询任务执行状态,根据任务状态做后续的处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容

  • 点击查看原文 Web SDK 开发手册 SDK 概述 网易云信 SDK 为 Web 应用提供一个完善的 IM 系统...
    layjoy阅读 13,607评论 0 15
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,559评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,836评论 2 11
  • 今天早晨,處在霧霾中的北京朋友發來一张照片: 是褚時健的橙子,於是我们有了以下的對話: "褚橙啊,我還沒吃過,好吃...
    MZ_梅枝阅读 403评论 0 0
  • ——舍与得,珍惜现在拥有 电影《人在囧途泰囧》有三位主...
    行走在学习的路上阅读 1,008评论 4 51