.net core kafka封装

新建一个.net core类库项目

安装第三方依赖库,如下图所示:


41.png

新建一个SUPKafkaTopicConsumer类

这是用来创建并初始化消费者,接下来看看这个类里面包含了什么。

  • 首先声明一个委托,用来接收订阅消息
public delegate void OnReceivedHandle(object data);

初始化消费者,构造函数中传入kafka地址,以及要订阅的组groupId,另外注入了log4net记录日志信息。
init()方法用来初始化,新建一个消费者,具体代码如下。

 public class SUPKafkaTopicConsumer<TKey, TValue>
    {
        private IConsumer<TKey, TValue> consumer;
        private SUPLogger logger_;
        private string BootStrapServer;
        private string GroupId;
      
        public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null)
        {
            BootStrapServer = bootStrapServer;
            GroupId = groupId;
            logger_ = logger;
        }

        public bool Init()
        {
            try
            {
                var conf = new ConsumerConfig
                {
                    GroupId = GroupId,
                    BootstrapServers = BootStrapServer,
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
                };
                consumer = new ConsumerBuilder<TKey, TValue>(conf)
                    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                    .Build();

                return true;
            }
            catch (Exception ex)
            {
                throw;
            }
        }
  • 定义回调事件,用以处理用户自定义方法。
public event OnReceivedHandle onReceivedHandle;
  • 定义一个订阅的方法,传入topic,以及是否需要提交偏移量。
    其实看init()方法中我把EnableAutoCommit=false,取消了自动提交,让应用程序决定何时提交 偏移量,为什么这么做呢?
    自动提交虽然方便,但是也有一些弊端,自动提交的弊端是通过间隔时间。 一般是默认5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。
    大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,井在发生再均衡时减少 重复消息的数量。消费者 API提供了另一种提交偏移量的方式 , 开发者可以在必要的时候 提交当前偏移盘,而不是基于时间间隔。
public void Subscribe(string topic, bool isCommit)
        {
            try
            {
                if (consumer != null)
                {
                    consumer.Subscribe(topic);
                    while (true)
                    {
                        var consume = consumer.Consume();
                        if (onReceivedHandle != null)
                        {
                            onReceivedHandle(consume);

                            if (isCommit)
                            {
                                consumer.Commit(consume);
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //consumer.Close();
                throw ex;
            }
        }
  • 取消订阅
 public void UnSubscribe()
        {
            if (consumer != null)
            {
                consumer.Unsubscribe();
            }
        }

新建生产者类

  • 首先定义了ISUPKafkaProducer<Tkey, TValue>接口,包含四个方法
 public interface ISUPKafkaProducer<Tkey,TValue>
    {
        ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
        ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);

        ISendResult AsyncSend(Tkey key, TValue value,string topic);
        ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition);
    }
  • 接口的实现,初始化过程类似消费者
internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue>
    {
        private IProducer<Tkey, TValue> producer;
        private SUPLogger logger_;
        private string m_bootStrapServer;

        public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null)
        {
            m_bootStrapServer = bootStrapServer;
            logger_ = logger;
        }
        public bool Init()
        {
            try
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = m_bootStrapServer
                };
                producer = new ProducerBuilder<Tkey, TValue>(config)
                    .SetErrorHandler((producer, error) =>
                    {
                        logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}",
                            m_bootStrapServer, error.Code, error.Reason));
                    })
                    .SetLogHandler((producer, msg) =>
                    {
                        logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}",
                            m_bootStrapServer, msg.Name, msg.Message));
                    })
                    .Build();

                return true;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

实现继承至ISUPKafkaProducer<Tkey, TValue>的方法

 public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };
                    producer.Produce(topic, message, sendCallBack);
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "没有初始化生产者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };
                    producer.Produce(topicPartition, message, sendCallBack);
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "没有初始化生产者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public ISendResult AsyncSend(Tkey key, TValue value,string topic)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };
                    var deliveryReport = producer.ProduceAsync(topic, message);
                    deliveryReport.ContinueWith(task =>
                   {
                       Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                   });
                    producer.Flush(TimeSpan.FromSeconds(10));
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "没有初始化生产者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };

                    var deliveryReport = producer.ProduceAsync(topicPartition, message);
                    deliveryReport.ContinueWith(task =>
                    {
                        Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                    });

                    producer.Flush(TimeSpan.FromSeconds(10));
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "没有初始化生产者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

新建一个SUPKafkaMessageCenter类

这个类是对外开放的,我们利用这个类来管理生产者和消费者,看下代码非常简单。

public static class SUPKafkaMessageCenter<Tkey, TValue>
    {
        private static SUPLogger logger = null;
        static SUPKafkaMessageCenter()
        {
            SUPLoggerManager.Configure();
            logger = new SUPLogger("KafkaCenter");
        }
        /// <summary>
        /// 创建生产者
        /// </summary>
        /// <param name="bootstrapServer"></param>
        /// <param name="topicName"></param>
        /// <returns></returns>
        public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer)
        {
            if (string.IsNullOrEmpty(bootstrapServer))
            {
                return null;
            }
            var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger);
            if (!producer.Init())
            {
                return null;
            }
            return producer;
        }

        /// <summary>
        /// 创建消费者
        /// </summary>
        /// <param name="bootstrapServer"></param>
        /// <param name="groupId"></param>
        /// <returns></returns>
        public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group")
        {
            if (string.IsNullOrEmpty(bootstrapServer))
            {
                return null;
            }
            var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger);
            if (!consumer.Init())
            {
                return null;
            }
            return consumer;
        }

测试

新建一个测试的控制台程序,调用代码如下

  • 消费者
var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092");
            //绑定接收信息,回调函数
            consumer.onReceivedHandle += CallBack;

            var topics = new List<string>();
            topics.Add("kafka-default-topic");
            topics.Add("test");
            //订阅主题
            consumer.Subscribe(topics, false);
  • 生产者
ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");
kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});

除了上面写的这些方法,其实对于kafka还有很多功能,比如topic的增删改查,我把它认为是管理类的,这里就不贴代码了,有兴趣的朋友可以进gitee上下载来看看。https://gitee.com/zhanwei103/Kafka.Net

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容