针对高并发,可扩展的互联网架构,搭建消息队列

想开发高并发可扩展的互联网架构,消息队列是不可缺少的,目前主流的消息队列,有windows自带的MSMQ,还有跨平台的强大的ZeroMQ,这里我们就选用ZeroMQ.

ZeroMQ介绍:(也拼写作 ?MQ、 0MQ 或 ZMQ) 是个非常轻量级的开源消息队列软件。它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序。ZeroMQ的学习和应用也非常简单,它只有一个 C++ 编写成的单个库文件libzmq.dll, 可以链接到任何应用程序中。如果要在.NET 环境中使用,我们需要用到一个C#编写的名为 clrzmq.dll 包装库。ZeroMQ可以在 Windows、 OS X 和 Linux 等多种操作系统上运行, C、 C++、C#、 Java、 Python 等语言都可以编写ZeroMQ 应用程序这使得不同平台上的不同应用程序之间可以相互通讯。

一、环境搭建:

codeproject专题,下载对应的Download binaries - 377.6 KB,解压缩到你的指定路径。

这里我们就不详细介绍,主要说一下C#封装好的版本,NetMQ,是基于ZeroMQ进行封装的。就不需要下载了,直接nuget上获取:

PM> Install-Package NetMQ

为什么不直接用ZeroMQ,而使用NetMQ,运行非托管代码的托管应用程序内可能会出现许多想不到的问题,像内存泄漏和奇怪的没有访问错误。而NetMQ使用原生的C#语言,它更容易调试原生C#代码,你可以下载代码,调试你的系统。你可以在github上贡献。

待安装好后,系统会自动添加NetMQ的引用。

可以看到,NetMQ是基于zmq进行开发的,其实就是ZeroMQ了,并且已经为我们封装了各种功能的MQ对象,比如REP/REQ ,PUB/SUB(主题式订阅),XPUB/XSUB(非主题订阅),Push/Pull,甚至还有路由模式等,从字面意义上,应该能看出个大概,后面我们一个一个进行测试使用。

先看个简单的demo,初步了解一下:

class Program
{
    static void Main(string[] args)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            Task serverTask = Task.Factory.StartNew(() =>Server(context));
            Task clientTask = Task.Factory.StartNew(() => Client(context));
            Task.WaitAll(serverTask, clientTask);
        }
    }
    static void Server(NetMQContext context)
    {
        using (NetMQSocket serverSocket = context.CreateResponseSocket())
        {
            serverSocket.Bind("tcp://*:5555");
            while (true)
            {
                string message = serverSocket.ReceiveString();
                Console.WriteLine("Receive message {0}", message);
                serverSocket.Send("World");
                 
                if (message == "exit")
                {
                    break;
                }
            }
        }
         
    }
    static void Client(NetMQContext context)
    {
        using (NetMQSocket clientSocket = context.CreateRequestSocket())
        {
            clientSocket.Connect("tcp://127.0.0.1:5555");
            while (true)
            {
                Console.WriteLine("Please enter your message:");
                string message = Console.ReadLine();
                clientSocket.Send(message);
                string answer = clientSocket.ReceiveString();
                Console.WriteLine("Answer from server: {0}", answer);
                if (message == "exit")
                {
                    break;
                }
            }
        }
    }
}

代码比较简洁的介绍了REP/REQ模式下NetMQ的使用,而且我们可以看到,这个Mq对象是可以在不同的线程间切换使用的,也许你会测试中文,那就先序列化再反序列化吧,因为可能会出现乱码哟。

这里,我先简单根据NetMQ,封装一个Server端和一个Client端,方便后面使用,当然也可以不封装,直接使用:

Server:

/// <summary>
/// Mq服务端
/// </summary>
public class OctMQServer : IDisposable
{
    public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
     
    protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
    {
        EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
        if (handler != null) handler(this, e);
    }
    private int _port;
    private NetMQSocket _serverSocket;
    private ServerType _type;
    private NetMQContext _context;
    public void Init(int port, ServerType type)
    {
        _type = type;
        _port = port;
        _context = NetMQContext.Create();
        CreateServer();
    }
    void CreateServer()
    {
        switch (_type)
        {
            case ServerType.Response:
            _serverSocket = _context.CreateResponseSocket();
            break;
            case ServerType.Pub:
            _serverSocket = _context.CreatePushSocket();
            break;
            case ServerType.Router:
            _serverSocket = _context.CreateRouterSocket();
            break;
            case ServerType.Stream:
            _serverSocket = _context.CreateStreamSocket();
            break;
            case ServerType.Push:
            _serverSocket = _context.CreatePushSocket();
            break;
            case ServerType.XPub:
            _serverSocket = _context.CreateXPublisherSocket();
            break;
            default:
            _serverSocket = _context.CreateResponseSocket();
            break;
        }
        _serverSocket.Bind("tcp://*:" + _port);
        Task.Factory.StartNew(() =>
        AsyncRead(_serverSocket), TaskCreationOptions.LongRunning);
    }
    private void AsyncRead(NetMQSocket serverSocket)
    {
        while (true)
        {
            var msg = serverSocket.ReceiveMessage();
            OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(serverSocket, msg));
        }
    }
    public NetMQSocket Server
    {
        get {
            return _serverSocket;
        }
    }
    public void Dispose()
    {
        _serverSocket.Dispose();
        _context.Dispose();
    }
    public void Send(NetMQMessage msg)
    {
        _serverSocket.SendMessage(msg);
    }
    public NetMQMessage CreateMessage()
    {
        return new NetMQMessage();
    }
} 

这样,使用者就可以根据枚举进行服务端的创建, 不用纠结到底用哪一种服务端,并且封装了一些消息的异步事件,方便在开发中使用,可以使用多播委托,针对不同的消息进行不同的处理,我这里使用的while循环,当然,在netmq内部提供了循环器和心跳等,都可以在实际的开发中进行扩展和使用:Poller和NetMQTimer。

Client:

/// <summary>
/// MQ客户端
/// </summary>
public class OctMQClient:IDisposable
{
    public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
     
    protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
    {
        EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
        if (handler != null) handler(this, e);
    }
    private int _port;
    private NetMQSocket _clientSocket;
    private ClientType _type;
    private NetMQContext _context;
    private string _ip;
    private Task task;
    public void Init(string ip, int port, ClientType type)
    {
        _type = type;
        _ip = ip;
        _port = port;
        _context = NetMQContext.Create();
        CreateClient();
    }
    void CreateClient()
    {
        switch (_type)
        {
            case ClientType.Request:
            _clientSocket = _context.CreateRequestSocket();
            break;
            case ClientType.Sub:
            _clientSocket = _context.CreateSubscriberSocket();
            break;
            case ClientType.Dealer:
            _clientSocket = _context.CreateDealerSocket();
            break;
            case ClientType.Stream:
            _clientSocket = _context.CreateStreamSocket();
            break;
            case ClientType.Pull:
            _clientSocket = _context.CreatePullSocket();
            break;
            case ClientType.XSub:
            _clientSocket = _context.CreateXSubscriberSocket();
            break;
            default:
            _clientSocket = _context.CreateRequestSocket();
            break;
        }
        _clientSocket.Connect("tcp://" + _ip + ":" + _port);
    }
    public void StartAsyncReceive()
    {
        task = Task.Factory.StartNew(() =>
        AsyncRead(_clientSocket), TaskCreationOptions.LongRunning);
    }
    private void AsyncRead(NetMQSocket cSocket)
    {
        while (true)
        {
            var msg = cSocket.ReceiveMessage();
            OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(cSocket, msg));
        }
    }
    public NetMQSocket Client
    {
        get {
            return _clientSocket;
        }
    }
    public T GetClient<T>() where T : NetMQSocket
    {
        return (T)_clientSocket;
    }
    public void Send(NetMQMessage msg)
    {
        _clientSocket.SendMessage(msg);
    }
    public NetMQMessage CreateMessage()
    {
        return new NetMQMessage();
    }
    public NetMQMessage ReceiveMessage()
    {
        return _clientSocket.ReceiveMessage();
    }
    public void Dispose()
    {
        _clientSocket.Dispose();
        _context.Dispose();
        if (task != null)
        {
            task.Dispose();
        }
    }
}

客户端提供了,同步接受消息和异步接收消息两种方式,当启动异步时,就开始循环的读取消息了,当读到消息时抛出事件,并且针对任务等做了资源的释放。并提供创建消息和返回MQ对象等公共方法,可以在开发过程中快速的入手和使用。

先简单说一下response和request模式,就是响应模式,当mq客户端向mq的服务端发送消息时,需要得到及时的响应,并返回给使用者或者是用户,这就需要及时响应的服务端程序,一般的MQ都会有这种功能,也是使用最广泛的,我们就先写一个这种类型的demo,基于我们前面提供的客户端和服务端。

Server Console

这里我提供了2种也是最常用的2种服务端方式,并且提供了不同的处理方式。

class Program
{
    private static OctMQServer _server;
    static ServerType _type;
    static void Main(string[] args)
    {
        AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException;
        CreateCmd();
         
         
    }
    /// <summary>
    /// 创建mq对象
    /// </summary>
    static void Create()
    {
        _server = new OctMQServer();
        _server.OnReceive += server_OnReceive;
        _server.Init(5555, _type);
    }
    /// <summary>
    /// 选择类型
    /// </summary>
    private static void CreateCmd()
    {
        Csl.Wl(ConsoleColor.Red, "请选择您要创建的MQ服务端类型");
        Csl.Wl(ConsoleColor.Yellow, "1.PUB 2.REP");
        var key = System.Console.ReadLine();
        switch (key)
        {
            case "1":
            {
                _type = ServerType.Pub;
                Create();
                Cmd();
            }
            break;
            case "2":
            _type = ServerType.Response;
            Create();
            Cmd();
            break;
            default:
            {
                CreateCmd();
            }
            break;
        }
    }
    static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
    {
        Csl.WlEx((Exception)e.ExceptionObject);
    }
    /// <summary>
    /// 接收消息
    /// </summary>
    private static void Cmd()
    {
        if (_type == ServerType.Pub)
        {
            Csl.Wl(ConsoleColor.Red, "请输入您要发个订阅者的信息主题与信息用空格分开");
        } else
        {
            Csl.Wl(ConsoleColor.Red, "等待消息");
        }
        var cmd = System.Console.ReadLine();
        switch (cmd)
        {
            case "exit":
            Csl.Wl("正在关闭应用程序。。。等待最后一个心跳执行完成。。。");
            _server.Dispose();
            break;
            default:
            {
                var str = cmd.Split(' ');
                var msg = _server.CreateMessage();
                msg.Append(str[0],Encoding.UTF8);
                msg.Append(str[1],Encoding.UTF8);
                _server.Send(msg);
                Cmd();
                break;
            }
            return;
        }
    }
    static void server_OnReceive(object sender, DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e)
    {
        var msg = e.Arg2;
        var server = e.Arg1;
        Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8));
        server.Send("你好,您的请求已处理,并返回消息及处理结果",Encoding.UTF8);
    }
}

Client Form

客户端,我使用winform来处理,并且配合控制台使用,这个用法有些巧妙,不会的同学可以私密我,嘿嘿,先上截图,也是可以同时处理两种方式,给个demo,方便大家在实际项目中使用:

响应式

订阅者式

不会做gif ,我就逐步说吧,从订阅者模式中我们可以看到,我的打开顺序1→2→3,先打开1,订阅了t的主题,发了2个消息,内容1和内容2,第一个程序均收到,这时我启动另外一个程序,同样订阅t这个主题,发现消息是通过轮询的方式分别向两个订阅者发送,这样,我们在处理一些比较耗时的业务逻辑,并且不会因为并发出现问题时,就可以使用多个订阅者,分别处理业务从而大大的提高我们的系统性能。

然后打开第三个,订阅y这个主题,这时发送y的主题消息,前2个订阅者就无法收到了,这样我们还可以区分业务,进行多进程的处理,更高的提高可用性和可扩展性,并结合高性能的缓存解决方案处理高并发的业务逻辑。

贴出客户端代码

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
        Csl.Init();
    }
     
     /// <summary>
    /// mq客户端
    /// </summary>
    private OctMQClient _client;
    /// <summary>
    /// 订阅者模式连接
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    private void btnConn_Click(object sender, EventArgs e)
    {
        _client = new OctMQClient();
        _client.OnReceive += _client_OnReceive;
        _client.Init(txtip.Text,int.Parse(txtport.Text),
        ClientType.Sub);
        var sub = (SubscriberSocket) _client.Client;
        sub.Subscribe(txtTop.Text);
        _client.StartAsyncReceive();
    }
    /// <summary>
    /// 订阅者模式受到消息
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    void _client_OnReceive(object sender, Core.Args.DataEventArgs<NetMQ.NetMQSocket,
    NetMQ.NetMQMessage> e)
    {
        var msg = e.Arg2;
        Csl.Wl("主题:"+msg.Pop().ConvertToString
        (Encoding.UTF8));
        Csl.Wl("内容:" + msg.Pop().ConvertToString
        (Encoding.UTF8));
    }
    /// <summary>
    /// 发送响应消息
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    private void btnSend_Click(object sender, EventArgs e)
    {
        using (_client = new OctMQClient())
        {
            _client.Init(txtip.Text, int.Parse(txtport.Text), ClientType.Request);
            var content = txtContent.Text;
            var msg = _client.CreateMessage();
            msg.Append(content, Encoding.UTF8);
            _client.Send(msg);
            var rmsg = _client.ReceiveMessage();
            var reqStr = rmsg.Pop().ConvertToString(Encoding.UTF8);
            Csl.Wl(reqStr);
        }
    }
    /// <summary>
    /// 释放资源
    /// </summary>
    /// <param name="e"></param>
    protected override void OnClosed(EventArgs e)
    {
        base.OnClosed(e);
        if (_client != null)
        {
            _client.Dispose();
             
        }
    }
}

Java_苏先生:专注于Java开发技术的研究与知识分享!
————END————

  • 点赞(感谢)
  • ...
  • 转发(感谢)
  • ...
  • 关注(感谢)
  • ...
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容