go和java测试 nats的jetstream

由于nats-streaming废弃,nats使用jetstream可以存储消息,下面使用java和go分别测试jetstream

1、maven:

<dependency>
                <groupId>io.nats</groupId>
                <artifactId>jnats</artifactId>
                <version>2.13.2</version>
            </dependency>

2、测试代码:

先运行go run jspub.go,再运行go run jssub.go

1、go代码 jetstream

注意几点:

//1、AddStream  pub和sub不同
//pub部分
//stream StreamConfig的Subjects不支持通配符//Subjects: []string{"ORDERS.*"},
js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //Subjects: []string{"ORDERS.*"},//jetstream不支持通配符
        Retention: nats.WorkQueuePolicy,
    })
//sub部分
//stream StreamConfig的Subjects可以使用//Subjects: []string{"ORDERS.*"},
js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //可以Subjects:  []string{"ORDERS.*"}, 
        Retention: nats.WorkQueuePolicy,
    })

//2、三种订阅(选第一或第二种即可)
/**
第一种:js.Subscribe、这2种方式都可以"ORDERS.*"  "ORDERS.scratch"
第二种:js.SubscribeSync可以使用("ORDERS.scratch") 不能使用("ORDERS.*")
第三种:PullSubscribe无法订阅消息
*/
1、go jetstream pub
package main

import (
    "encoding/json"
    "fmt"
    "github.com/nats-io/nats.go"
    "runtime"
    "strconv"
    "time"
)

func main() {
    // Connect to NATS
    nc, _ := nats.Connect(nats.DefaultURL)

    // Create JetStream Context
    js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))
    //js, _ := nc.JetStream()

    js.DeleteConsumer("ORDERS", "MONITOR")
    js.DeleteStream("ORDERS")

    js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //Subjects: []string{"ORDERS.*"},//jetstream不支持通配符
        Retention: nats.WorkQueuePolicy,
    })
    js.UpdateStream(&nats.StreamConfig{
        Name:     "ORDERS",
        MaxBytes: 8,
    })
    js.AddConsumer("ORDERS", &nats.ConsumerConfig{ //存消息
        Durable: "MONITOR",
    })
    //打印信息
    info, _ := js.StreamInfo("ORDERS")
    marshal, _ := json.Marshal(info)
    fmt.Println("===> StreamInfo ", string(marshal))

    consumerInfo, _ := js.ConsumerInfo("ORDERS", "MONITOR")
    marshal2, _ := json.Marshal(consumerInfo)
    fmt.Println("===> ConsumerInfo ", string(marshal2))

    // Simple Stream Publisher
    js.Publish("ORDERS.scratch", []byte("hello"))

    // Simple Async Stream Publisher
    max := 5000
    for i := 0; i < max; i++ {
        js.PublishAsync("ORDERS.scratch", []byte("hello "+strconv.Itoa(i)))
        time.Sleep(time.Duration(500) * time.Millisecond)
    }

    runtime.Goexit()
}
2、go jetstream sub
package main

import (
    "encoding/json"
    "fmt"
    "github.com/nats-io/nats.go"
    "log"
    "runtime"
)

func printnatsinfo(js nats.JetStreamContext) {
    info, _ := js.StreamInfo("ORDERS")
    marshal, _ := json.Marshal(info)
    fmt.Println("===> StreamInfo ", string(marshal))

    consumerInfo, _ := js.ConsumerInfo("ORDERS", "MONITOR")
    marshal2, _ := json.Marshal(consumerInfo)
    fmt.Println("===> ConsumerInfo ", string(marshal2))
}

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal("connect error")
    }
    js, _ := nc.JetStream()
    printnatsinfo(js)
    //sub端如何delstream的话 消息会丢失
    //js.DeleteConsumer("ORDERS", "MONITOR")
    //js.DeleteStream("ORDERS")

    js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.scratch"},
        //Subjects:  []string{"ORDERS.*"}, //jetstream不支持通配符
        Retention: nats.WorkQueuePolicy,
    })
    js.UpdateStream(&nats.StreamConfig{
        Name:     "ORDERS",
        MaxBytes: 8,
    })
    js.AddConsumer("ORDERS", &nats.ConsumerConfig{
        Durable: "MONITOR",
    })

    printnatsinfo(js)

    //2、三种订阅(选第一或第二种即可)
    /**
    第一种:js.Subscribe、这2种方式都可以"ORDERS.*"  "ORDERS.scratch"
    第二种:js.SubscribeSync可以使用("ORDERS.scratch") 不能使用("ORDERS.*")
    第三种:PullSubscribe无法订阅消息
    */
    //第一种订阅
    //js.Subscribe、这2种方式都可以"ORDERS.*"  "ORDERS.scratch"
    js.Subscribe("ORDERS.*", func(m *nats.Msg) {
        //js.Subscribe("ORDERS.scratch", func(m *nats.Msg) {
        fmt.Printf("Received a JetStream message: %s\n", string(m.Data))
        m.Ack()
    })

    //第二种订阅
    //js.SubscribeSync可以使用("ORDERS.scratch") 不能使用("ORDERS.*")
    // Simple Sync Durable Consumer (optional SubOpts at the end)
    /*sub, _ := js.SubscribeSync("ORDERS.scratch", nats.Durable("MONITOR"), nats.MaxDeliver(3))
    for {
        m, _ := sub.NextMsg(3 * time.Second)
        if m != nil {
            fmt.Printf("<===  Received a JetStream message: %s\n", string(m.Data))
            m.Ack()
        }
    }*/

    //第三种订阅
    /**
    PullSubscribe无法订阅消息
    */
    /*subscribe, err := js.PullSubscribe("ORDERS.scratch", "MONITOR")
    for {
        msgs, _ := subscribe.Fetch(3)
        fmt.Println("<=== ", msgs)
        for i, x := range msgs {
            fmt.Printf("第 %d 位 x 的值 = %d\n", i, x)
        }
        time.Sleep(time.Duration(3) * time.Second)
    }*/
    runtime.Goexit()
}

2、java代码 pub/sub jetstream

import io.nats.client.*;
import io.nats.client.api.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;

@Slf4j
public class NatsJetStreamTest {

    private Connection nc;
    private JetStream jetStream;
    @Before
    public void pre() throws InterruptedException {
        Options options = new Options.Builder().server("nats://127.0.0.1:4222"))
                .connectionListener(getConnectionListener()).build();
        Nats.connectAsynchronously(options, true);
    }

    private ConnectionListener getConnectionListener() {
        return (conn, type) -> {
            try {
                log.info("nats event {}.", type);
                if (type == ConnectionListener.Events.CONNECTED) {
                    nc = conn;
                    String subject = this.getChannelSubjectName(null);
                    String stream = this.getChannelStreamName(null);
                    this.delConsumer(stream);
                    this.delStream(stream);
                    this.addChannelStream(null);

                    ConsumerConfiguration cc = ConsumerConfiguration.builder().maxAckPending(1).durable("MONITOR").build();
                    PushSubscribeOptions so = PushSubscribeOptions.builder().stream(stream).configuration(cc).build();
                    jetStream = nc.jetStream();
                    Dispatcher dispatcher = nc.createDispatcher();

                    /*jetStream.subscribe(subject, dispatcher, (msg) -> {
                        log.info("Nats receive room Message {}", msg);
                        msg.ack();
                    }, false, so);*/
                    log.info("初始化完成,可以发送数据");
                }
            } catch (Exception e) {
                log.error("ConnectionListener error", e);
            }
        };
    }

    @Test
    public void jetstreamtest() throws InterruptedException, JetStreamApiException, IOException {
        while (jetStream == null) {
            Thread.sleep(1000);
        }
        log.info("开始发送数据");
        int max = 500;
        for (int i = 0; i < max; i++) {
            jetStream.publish(this.getChannelSubjectName(null), ("hello === " + i).getBytes(StandardCharsets.UTF_8));
            Thread.sleep(1000);
        }
    }

    public String getChannelStreamName(String channelId) {
        return "ORDERS";
    }

    public String getChannelSubjectName(String channelId) {
        return "ORDERS.scratch";
    }

    public void publishToStream(String subject, String message) {
        try {
            nc.jetStream().publish(subject, message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException | JetStreamApiException e) {
            log.error("nats publishToStream error", e);
        }
    }

    public static StreamInfo getStreamInfoOrNullWhenNotExist(JetStreamManagement jsm, String streamName) throws IOException, JetStreamApiException {
        try {
            return jsm.getStreamInfo(streamName);
        }
        catch (JetStreamApiException jsae) {
            if (jsae.getErrorCode() == 404) {
                return null;
            }
            throw jsae;
        }
    }
    public static void printStreamInfo(StreamInfo si) {
        printObject(si, "StreamConfiguration", "StreamState", "ClusterInfo", "Mirror", "subjects", "sources");
    }

    public static void printObject(Object o, String... subObjectNames) {
        String s = o.toString();
        for (String sub : subObjectNames) {
            boolean noIndent = sub.startsWith("!");
            String sb = noIndent ? sub.substring(1) : sub;
            String rx1 = ", " + sb;
            String repl1 = (noIndent ? ",\n": ",\n    ") + sb;
            s = s.replace(rx1, repl1);
        }

        log.info(s);
    }

    public void addChannelStream(String channelId) {
        try {
            String channelSubject = getChannelSubjectName(channelId);
            String channelStream = getChannelStreamName(channelId);
            StreamInfo streamInfo = this.createOrGetChannelStream(channelStream);
            StreamConfiguration configuration = streamInfo.getConfiguration();
            List<String> subjects = configuration.getSubjects();
            if (Ts.hv(subjects) && subjects.contains(channelSubject)) {
                return;
            }
            StreamConfiguration streamConfig = StreamConfiguration.builder(configuration).addSubjects(channelSubject).build();
            nc.jetStreamManagement().updateStream(streamConfig);
        } catch (IOException | JetStreamApiException e) {
            log.error("nats addSubjectToStream error", e);
        }
    }

    public void delStream(String channelStream) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = nc.jetStreamManagement();
        try {
            jsm.deleteStream(channelStream);
        } catch (JetStreamApiException e) {
            if (e.getErrorCode() != 404) {
                throw e;
            }
        }
    }

    public StreamInfo createOrGetChannelStream(String streamName) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = nc.jetStreamManagement();
        StreamInfo streamInfo = getStreamInfoOrNullWhenNotExist(jsm, streamName);
        if (!Ts.hv(streamInfo)) {
            StreamConfiguration streamConfig = StreamConfiguration.builder().name(streamName)
//                                .subjects(subject1)
                    .retentionPolicy(RetentionPolicy.WorkQueue)
                    // .maxConsumers(...)
                    // .maxBytes(...)
                    .maxAge(Duration.ofHours(1))
                    // .maxMsgSize(...)
                    .storageType(StorageType.Memory)
                    // .replicas(...)
                    // .noAck(...)
                    // .template(...)
//                                 .discardPolicy(...)
                    .build();
            streamInfo = jsm.addStream(streamConfig);
        }
        printStreamInfo(streamInfo);
        return streamInfo;
    }

    public void delConsumer(String channelStream) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = nc.jetStreamManagement();
        try {
            List<String> consumerNames = jsm.getConsumerNames(channelStream);
            if (Ts.hv(consumerNames)) {
                log.info("Nats stream {} has consumers need del.", channelStream);
                for (String consumerName : consumerNames) {
                    jsm.deleteConsumer(channelStream, consumerName);
                }
            }
        } catch (JetStreamApiException e) {
            if (e.getErrorCode() != 404) {
                throw e;
            }
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容