高性能消息中间件——NATS

在介绍NATS之前先了解下什么是分布式系统和消息中间件

对于分布式系统的定义,一直以来我都没有找到或者想到特别简练而又合适的定义,这里引用一下Distributed System Concepts and Design (Thrid Edition)中的一句话A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages,从这句话我们可以看到几个重点,一是组件分布在网络计算机上,二是组件之间仅仅通过消息传递来通信并协调行动。消息中间件维基百科给出的定义为Message-oriented middleware(MOM) is software infrastructure focused on sending and receiving messages between distrubuted systems,意思就是面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件

消息中间件常被提及的好处即异步和解耦,市面上常常被使用到的中间件有RabbitMQ, ActiveMQ, Kafka等,他们的关注度和使用率都非常的高,并且使用起来也非常的方便。公司的WiseCloud产品就集成了RabbitMQ。而在下一个版本的更新中将会使用NATS来替换RabbitMQ。使用NATS的好处比较多首先就是其性能非常好,下面引用官网的性能对比图:

NATS介绍

NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的Publish/Subscribe模型,使用Golang语言开发。NATS的开发哲学认为高质量的QoS应该在客户端构建,故只建立了Request-Reply,不提供 1.持久化 2.事务处理 3.增强的交付模式 4.企业级队列。

NATS消息传递模型

NATS支持各种消息传递模型,包括:

发布订阅(Publish Subscribe)

请求回复(Request Reply)

队列订阅(Queue Subscribers )

提供的功能:

纯粹的发布订阅模型(Pure pub-sub)

服务器集群(Cluster mode server)

自动精简订阅者(Auto-pruning of subscribers)

基于文本协议(Text-based protocol)

多服务质量保证(Multiple qualities of service - QoS)

发布订阅(Publish Subscribe)

NATS将publish/subscribe消息分发模型实现为一对多通信,发布者在Subject上发送消息,并且监听该Subject在任何活动的订阅者都会收到该消息

java:

//publish

Connection nc = Nats.connect("nats://127.0.0.1:4222");

nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));

//subscribe

Subscription sub = nc.subscribe("subject");

Message msg = sub.nextMessage(Duration.ofMillis(500));

String response = new String(msg.getData(), StandardCharsets.

UTF_8);

或者是基于回调的subscribe

//subscribe

Dispatcher d = nc.createDispatcher(msg - >{

String response = new String(msg.getData(), StandardCharsets.UTF_8)

//do something

})

d.subscribe("subject");

请求响应(Request Reply)

NATS支持两种请求响应消息:点对点或多对多。点对点涉及最快或首先响应。在一对多的消息交换中,需要限制请求响应的限制

在Request Reply过程中,发布请求发布带有响应主题的消息,期望对该subject做出响应操作

java:

// publish

Connection connection = Nats.connect("nats://127.0.0.1:4222");

String reply = "replyMsg";

//请求回应方法回调

Dispatcher d = connection.createDispatcher(msg -> 

System.out.println("reply: " + JSON.toJSONString(msg));

})

d.unsubscribe(repl , 1);

//订阅请求

d.subscribe(reply);

//发布请求

connection.publish("requestSub", reply, "request".getBytes(StandardCharsets.

UTF_8));

//subscribe

Connection nc = Nats.connect("nats://127.0.0.1:4222");

//注册订阅

Dispatcher dispatcher = nc.createDispatcher(msg -> {

System.out.println(JSON.toJSONString(msg));

nc.publish(msg.getReplyTo(), "this is reply".getBytes(StandardCharsets.UTF_8));

});

dispatcher.subscribe("requestSub");

队列订阅&分享工作(Queue Subscribers & Sharing Work)

NATS提供称为队列订阅的负载均衡功能,虽然名字为queue(队列)但是并不是我们所认为的那样。他的主要功能是将具有相同queue名字的subject进行负载均衡。使用队列订阅功能消息发布者不需要做任何改动,消息接受者需要具有相同的对列名

// Subscribe

Connection nc = Nats.connect();

Dispatcher d = nc.createDispatcher(msg -> {

//do something

System.out.println("msg: " + new String(msg.getData(),StandardCharsets.UTF_8));

});

d.subscribe("queSub", "queName");

Nats-Spring集成

NATS虽说是一个性能非常好的消息中间键,但是和Spring的集成不是很好。这里提供两个集成的思路

CloudFoundry-Community/java-nats

Wanlinus/nats-spring

java-nats

这是一个由CloudFoundry主导的一个NATS java客户端。提供了区别于官方的nats客户端,支持注解配置,对Spring有比较好的支持,但是此项目已经有1年多没有更新且不支持NATS Streaming。相应用法参考Github,这里不做详细讲解.

nats-spring

由于开源社区只提供一个简单的NATS Client,缺少对注解和Spring的支持,所以我基于官方jnats客户端写了一个SpringBoot的兼容插件.主要是为了兼容spring boot amqp开发模式,尽量使用注解解决问题开发出来的,所以使用方法类似于在代码中使用@RabbitListener.具体使用方法如下

{{git clone

cd nats-spring

mvn clean install}}}

<dependency>

<groupId>cn.wanlinus</groupId>

<artifactId>nats-spring</artifactId>

<version>1.0.0.RELEASE</version>

</dependency>

application.yml

spring:

nats:

urls:

- nats://127.0.0.1:4222

@EnableNats

@SpringBootApplication

public class NatsDemo2Application {

public static void main(String[] args) {

  SpringApplication.run(NatsDemo2Application.class, args);

}

}

@Component

public class Foo{

@NatsSubscribe("haha")

public void message(Message message) {

System.out.println(message.getSubject() + " : " + new String(message.getData()));

}

}

NATS Streaming介绍

NATS由于不能保证消息的投递正确性和存在其他的缺点,NATS Streaming就孕育而生.他是一个由NATS提供支持的数据流系统,采用Go语言编写,NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作.除了核心NATS平台的功能外,他还提供了以下功能:

NATS Streaming特征

增强消息协议(Enhanced message protocol)

消息/事件持久化(Message/event persistence)

至少一次数据传输(At-least-once-delivery)

Publisher限速(Publisher rate limiting)

Subscriber速率匹配(Rate matching/limiting per subscriber)

按主题重发消息(Historical message replay by subject)

持续订阅(Durable subscriptions)

基本用法

在使用NATS Streaming之前首先要启动服务器,在这里我选择使用docker容器

# 4222 client默认连接端口

8222 Web端口

6222 集群通信端口

$ docker run -d -p 4222:4222 -p 8222:8222 -p 6222:6222 nats-streaming

STREAM: Starting nats-streaming-server[test-cluster] version 0.11.0

STREAM: ServerID: bzkKJL3jI4KW9Hqb0bC1Ae

STREAM: Go version: go1.11

Starting nats-server version 1.3.0

Git commit [not set]

Starting http monitor on 0.0.0.0:8222

Listening for client connections on 0.0.0.0:4222

Server is ready

STREAM: Recovering the state...

STREAM: No recovered state

STREAM: Message store is MEMORY

STREAM: ---------- Store Limits ----------

STREAM: Channels:                  100 *

STREAM: --------- Channels Limits --------

STREAM:   Subscriptions:          1000 *

STREAM:   Messages     :       1000000 *

STREAM:   Bytes        :     976.56 MB *

STREAM:   Age          :     unlimited *

STREAM:   Inactivity   :     unlimited *

STREAM: ----------------------------------

java:

// 第一个参数表示clusterId,在启动NATS Streaming容器的时候确定

// 第二个参数表示clientID,连接客户端的唯一标识符

StreamingConnectionFactory cf = new StreamingConnectionFactory

("test-cluster", "bar");

//设置Nats服务器地址和端口,默认是nats://127.0.0.1:4222

cf.setNatsConnection(Nats.connect("nats://127.0.0.1:4222"));

StreamingConnection sc = cf.createConnection();

Publish: sc.publish("foo", "Hello World".getBytes());

Subscribe:

sc.subscribe("foo", msg -> {

System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));

}, new SubscriptionOptions.Builder()

        .durableName("aa")

        .deliverAllAvailable().build());

在使用NATS Streaming的时候需要注意订阅主题不支持通配符,在订阅消息时传入MessageHandler函数是接口实现和SubscriptionOptions对象.MessageHandler提供消息回调处理, SubscriptionOptions用于设置订阅选项,比如设置Queue, durableName, ack等。

Streaming-Spring集成

作为一款优秀的消息中间件,却没有对Spring做集成,这是非常的可惜的事情.所以为了在工作中方便的使用他,我开发了一个很小的插件.虽然还有很大的改进空间,不过在公司的项目中却能够很好的运行.他开发思路和nats-spring差不多,所以使用方式也是大同小异,具体如下:

{{git clonehttps://github.com/wanlinus/na ... g.git

cd nats-streaming-spring

mvn clean install}}}

<dependency>

<groupId>cn.wanlinus</groupId>

<artifactId>nats-streaming-spring</artifactId>

<version>1.0.0-SNAPSHOT</version>

</dependency>

application.yml

spring:

nats:

streaming:

nats-url: nats://127.0.0.1:4222

cluster-id: test-cluster

@EnableNatsStreaming

@SpringBootApplication

public class StreamingDemoApplication {

public static void main(String[] args) {

  SpringApplication.run(StreamingDemoApplication.class, args);

}

//发布消息只需要注入StreamingConnection

@Autowired

private StreamingConnection sc;

public void sendMsg(){

  sc.publish("foo", "publish message".getBytes())

}

}

@Service

public class A {

@Subscribe(value = "foo", durableName = "dname", queue = "queue")

public void asd(Message message) throws IOException {

  System.out.println(new String(message.getData(), StandardCharsets.UTF_8));

}

}

两个插件由于是为了结合项目所写的,所以里面有些部分并不通用。后续的开发中我将会继续进行抽象和改进。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,563评论 18 139
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong阅读 22,263评论 1 92
  • -----只灯片笺(简书ID、有读故事ID、小说阅读网ID) 日升月落,斗转星移,时光流逝的速度,总让人在不经意的...
    寒山半秋阅读 484评论 2 12
  • 新生儿首日的胃容量大约是5-7ml,所以麻麻们不用担心你的初乳不够宝宝吃,也让你的婆婆和妈妈别担心会饿着他们。到了...
    糖果粉阅读 149评论 0 0
  • bonds连体衣质量很好,手感柔软,做工真心没得说。单层连体衣120rmb一条,两条包邮。四条打包价450rmb包...
    sherly_72a4阅读 435评论 0 0