Librdkafka的Transport层

  • Librdkafka要和kakfa集群通讯, 网络操作肯定是少不了的,这就需要封装transport数据传输层;

  • Librdkafka毕竟是SDK, 作为访问kafka集群的客户端,不需要支持大并发, 在网络IO模型 上选用了 poll;

  • IO模型确定后, 发送和接收数据必不可少的缓冲区buffer, 我们前面已经介绍过, 请参考Librdkafka的基础数据结构 3 -- Buffer相关 ;

  • 以上介绍的librdkafka中的poll模型和buffer, 完全可以独立出来, 用在其他项目上, 作者封装得很好;

  • Librdkafka与kafka broker间是tcp连接, 在接收数据后就涉及到一个拆包组包的问题, 这个就和kafka的协议有关了, kafka的二进制协议:

    1. 前4个字节是payload长度;
    2. 后面紧接着是payload具体内容,这部分又分为header和body两部分;
  • 收包时就可以先收4字节,拿到payload长度, 再根据这个长度收够payload内容, 这样一个完整的response就接收到了;

  • 下面我们来结合代码分析一下, 其中有一部分是windows下的实现代码,我们将忽略掉


rd_kafka_transport_s
  • 所在文件: src/rdkafka_transport_int.h srd/rdkafka_transport.c(.h)
  • 定义:
struct rd_kafka_transport_s {   
    int rktrans_s; // 与broker通讯的socket
    
    rd_kafka_broker_t *rktrans_rkb; // 与之通讯的broker, 一个broker一个tcp连接,也就对应着一个transport对象

#if WITH_SSL
    SSL *rktrans_ssl; //ssl连接句柄
#endif

        // sasl权限验证相关
    struct {
                void *state;               /* SASL implementation
                                            * state handle */

                int           complete;    /* Auth was completed early
                        * from the client's perspective
                        * (but we might still have to
                                            *  wait for server reply). */

                /* SASL framing buffers */
        struct msghdr msg;
        struct iovec  iov[2];

        char          *recv_buf;
        int            recv_of;    /* Received byte count */
        int            recv_len;   /* Expected receive length for
                        * current frame. */
    } rktrans_sasl;

        //  response接收buffer
    rd_kafka_buf_t *rktrans_recv_buf;  /* Used with framed_recvmsg */

        // poll模型使用
        /* Two pollable fds:
         * - TCP socket
         * - wake-up fd
         */
        struct pollfd rktrans_pfd[2];
        int rktrans_pfd_cnt;

        size_t rktrans_rcvbuf_size;    /**< Socket receive buffer size */
        size_t rktrans_sndbuf_size;    /**< Socket send buffer size */
};

typedef struct rd_kafka_transport_s rd_kafka_transport_t
  • 异步连接到broker rd_kafka_transport_connect:
rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
                          const rd_sockaddr_inx_t *sinx,
                          char *errstr,
                          size_t errstr_size) {
    rd_kafka_transport_t *rktrans;
    int s = -1;
    int on = 1;
        int r;

        rkb->rkb_addr_last = sinx;

        // 建socket, 可以调用socket_cb, 用用户自定义的方式来得到socket, 默认用socket函数
    s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,
                       SOCK_STREAM, IPPROTO_TCP,
                       rkb->rkb_rk->rk_conf.opaque);
    if (s == -1) {
        rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",
                socket_strerror(socket_errno));
        return NULL;
    }


#ifdef SO_NOSIGPIPE
    /* Disable SIGPIPE signalling for this socket on OSX */
    if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) 
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
               "Failed to set SO_NOSIGPIPE: %s",
               socket_strerror(socket_errno));
#endif

    /* Enable TCP keep-alives, if configured. */
    if (rkb->rkb_rk->rk_conf.socket_keepalive) {
#ifdef SO_KEEPALIVE
        if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE,
                   (void *)&on, sizeof(on)) == SOCKET_ERROR)
            rd_rkb_dbg(rkb, BROKER, "SOCKET",
                   "Failed to set SO_KEEPALIVE: %s",
                   socket_strerror(socket_errno));
#else
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
               "System does not support "
               "socket.keepalive.enable (SO_KEEPALIVE)");
#endif
    }

        /* Set the socket to non-blocking */
        if ((r = rd_fd_set_nonblocking(s))) {
                rd_snprintf(errstr, errstr_size,
                            "Failed to set socket non-blocking: %s",
                            socket_strerror(r));
                goto err;
        }

    /* Connect to broker */
       // 可以用用户自定义的连接方式connect, 回调没set的话调用connect
        if (rkb->rkb_rk->rk_conf.connect_cb) {
                r = rkb->rkb_rk->rk_conf.connect_cb(
                        s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),
                        rkb->rkb_name, rkb->rkb_rk->rk_conf.opaque);
        } else {
                if (connect(s, (struct sockaddr *)sinx,
                            RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR &&
                    (socket_errno != EINPROGRESS
                            ))
                        r = socket_errno;
                else
                        r = 0;
        }

        if (r != 0) {
        rd_snprintf(errstr, errstr_size,
                "Failed to connect to broker at %s: %s",
                rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),
                socket_strerror(r));
        goto err;
    }

    /* Create transport handle */
        // 创建并初始化rd_kafka_transport_s
    rktrans = rd_calloc(1, sizeof(*rktrans));
    rktrans->rktrans_rkb = rkb;
    rktrans->rktrans_s = s;
    rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;

        // 添加poll事件
        if (rkb->rkb_wakeup_fd[0] != -1) {
                rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;
                rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];
        }

    /* Poll writability to trigger on connection success/failure. */
    rd_kafka_transport_poll_set(rktrans, POLLOUT);

    return rktrans;

 err:
    if (s != -1)
                rd_kafka_transport_close0(rkb->rkb_rk, s);

    return NULL;
}
  • 对 poll方法的封装 rd_kafka_transport_poll:
int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) {
        int r;
        // 调用poll, 失败直接return
    r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
    if (r <= 0)
        return r;

        rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
        // 没什么实际用处
        if (rktrans->rktrans_pfd[1].revents & POLLIN) {
                /* Read wake-up fd data and throw away, just used for wake-ups*/
                char buf[512];
                if (rd_read((int)rktrans->rktrans_pfd[1].fd,
                            buf, sizeof(buf)) == -1) {
                        /* Ignore warning */
                }
        }
         // 返回poll到的有效事件
         return rktrans->rktrans_pfd[0].revents;
}
  • transport io 事件循环rd_kafka_transport_io_serve:
void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
                                  int timeout_ms) {
    rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
    int events;

        // 如果没收到response的请求个数没超过最大限制, 并且有需要发送的buf, 就把POLLOUT事件加入
    if (rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
        rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0)
        rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT);

        // poll啊poll
    if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0)
                return;
        // 暂时去掉POLLOUT
        rd_kafka_transport_poll_clear(rktrans, POLLOUT);

         //  处理poll到的io events
    rd_kafka_transport_io_event(rktrans, events);
}
  • IO事件处理 rd_kafka_transport_io_event:
    broker有一个简单的状态机转换,有如下几个状态:
enum {
        RD_KAFKA_BROKER_STATE_INIT,
        RD_KAFKA_BROKER_STATE_DOWN,
        RD_KAFKA_BROKER_STATE_CONNECT,
        RD_KAFKA_BROKER_STATE_AUTH,

        /* Any state >= STATE_UP means the Kafka protocol layer
         * is operational (to some degree). */
        RD_KAFKA_BROKER_STATE_UP,
                RD_KAFKA_BROKER_STATE_UPDATE,
        RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
        RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
    } rkb_state;

根据这个broker当前的状态,有不同的操作和状态转换

static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
                     int events) {
    char errstr[512];
    int r;
    rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;

    switch (rkb->rkb_state)
    {
    case RD_KAFKA_BROKER_STATE_CONNECT:
#if WITH_SSL
        if (rktrans->rktrans_ssl) {
            /* Currently setting up SSL connection:
             * perform handshake. */
            rd_kafka_transport_ssl_handshake(rktrans);
            return;
        }
#endif

        /* Asynchronous connect finished, read status. */
        if (!(events & (POLLOUT|POLLERR|POLLHUP)))
            return;

        if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) {
            rd_kafka_broker_fail(
                                rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
                                "Connect to %s failed: "
                                "unable to get status from "
                                "socket %d: %s",
                                rd_sockaddr2str(rkb->rkb_addr_last,
                                                RD_SOCKADDR2STR_F_PORT |
                                                RD_SOCKADDR2STR_F_FAMILY),
                                rktrans->rktrans_s,
                                rd_strerror(socket_errno));
        } else if (r != 0) {
            /* Connect failed */
                        errno = r;
            rd_snprintf(errstr, sizeof(errstr),
                    "Connect to %s failed: %s",
                                    rd_sockaddr2str(rkb->rkb_addr_last,
                                                    RD_SOCKADDR2STR_F_PORT |
                                                    RD_SOCKADDR2STR_F_FAMILY),
                                    rd_strerror(r));

            rd_kafka_transport_connect_done(rktrans, errstr);
        } else {
            /* Connect succeeded */
            rd_kafka_transport_connected(rktrans);
        }
        break;

    case RD_KAFKA_BROKER_STATE_AUTH:
        /* SASL handshake */
        if (rd_kafka_sasl_io_event(rktrans, events,
                       errstr, sizeof(errstr)) == -1) {
            errno = EINVAL;
            rd_kafka_broker_fail(rkb, LOG_ERR,
                         RD_KAFKA_RESP_ERR__AUTHENTICATION,
                         "SASL authentication failure: %s",
                         errstr);
            return;
        }
        break;

    case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
    case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
    case RD_KAFKA_BROKER_STATE_UP:
    case RD_KAFKA_BROKER_STATE_UPDATE:

        if (events & POLLIN) {
            while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
                   rd_kafka_recv(rkb) > 0)
                ;
        }

        if (events & POLLHUP) {
            rd_kafka_broker_fail(rkb,
                                             rkb->rkb_rk->rk_conf.
                                             log_connection_close ?
                                             LOG_NOTICE : LOG_DEBUG,
                                             RD_KAFKA_RESP_ERR__TRANSPORT,
                         "Connection closed");
            return;
        }

        if (events & POLLOUT) {
            while (rd_kafka_send(rkb) > 0)
                ;
        }
        break;

    case RD_KAFKA_BROKER_STATE_INIT:
    case RD_KAFKA_BROKER_STATE_DOWN:
        rd_kafka_assert(rkb->rkb_rk, !*"bad state");
    }
}

最主要的是以下两点:

  1. 有可读事件:
循环读, 读到不能读为止
if (events & POLLIN) {
            while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
                   rd_kafka_recv(rkb) > 0)
                ;
        }

rd_kafka_recv按kafka的协议来收包, 先收4字节,拿到payload长度, 再根据这个长度收够payload内容, 这样一个完整的response就接收到了

  1. 有可写事件:
if (events & POLLOUT) {
            while (rd_kafka_send(rkb) > 0)
                ;
        }

循环写, 写到不能写为止

  • 关闭transport
void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) {
#if WITH_SSL
    if (rktrans->rktrans_ssl) {
        SSL_shutdown(rktrans->rktrans_ssl);
        SSL_free(rktrans->rktrans_ssl);
    }
#endif

        rd_kafka_sasl_close(rktrans);

    if (rktrans->rktrans_recv_buf)
        rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);

    if (rktrans->rktrans_s != -1)
                rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,
                                          rktrans->rktrans_s);

    rd_free(rktrans);
}
  • 封装sendmsg方法
ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,
                                           rd_slice_t *slice,
                                           char *errstr, size_t errstr_size) {
        struct iovec iov[IOV_MAX];
        struct msghdr msg = { .msg_iov = iov };
        size_t iovlen;
        ssize_t r;

       // 从slice产生iovec数组, 然后再发送
        rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX,
                         /* FIXME: Measure the effects of this */
                         rktrans->rktrans_sndbuf_size);
        msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen;

#ifdef sun
        /* See recvmsg() comment. Setting it here to be safe. */
        socket_errno = EAGAIN;
#endif

        r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT
#ifdef MSG_NOSIGNAL
                    | MSG_NOSIGNAL
#endif
                );

        if (r == -1) {
                if (socket_errno == EAGAIN)
                        return 0;
                rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno));
        }

        /* Update buffer read position */
        rd_slice_read(slice, NULL, (size_t)r);

        return r;
}
  • 封装 send方法:
rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans,
                                 rd_slice_t *slice,
                                 char *errstr, size_t errstr_size) {
        ssize_t sum = 0;
        const void *p;
        size_t rlen;

       // rd_slice_peeker相当于一个iterator接口,每次返回一个slice中有效的segment
        while ((rlen = rd_slice_peeker(slice, &p))) {
                ssize_t r;
                //实际的发送
                r = send(rktrans->rktrans_s, p, rlen, 0
                );

                // 如果非eagain错误的话,是真的有错误发生了,return -1
                if (unlikely(r <= 0)) {
                        if (r == 0 || errno == EAGAIN)
                                return 0;
                        rd_snprintf(errstr, errstr_size, "%s",
                                    socket_strerror(socket_errno));
                        return -1;
                }

                /* Update buffer read position */
               //更新slice的读位置 
                rd_slice_read(slice, NULL, (size_t)r);

                sum += r;

                /* FIXME: remove this and try again immediately and let
                 *        the next write() call fail instead? */
                if ((size_t)r < rlen)
                        break;
        }

        return sum;
}
  • 封装 recv方法:
rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans,
                                 rd_buf_t *rbuf,
                                 char *errstr, size_t errstr_size) {
        ssize_t sum = 0;
        void *p;
        size_t len;
        
       // rbuf可写就一直写
        while ((len = rd_buf_get_writable(rbuf, &p))) {
                ssize_t r;

                r = recv(rktrans->rktrans_s, p,
                         len,
                         0);

                if (unlikely(r <= 0)) {
                        if (r == -1 && socket_errno == EAGAIN)
                                return 0;
                        else if (r == 0) {
                                /* Receive 0 after POLLIN event means
                                 * connection closed. */
                                rd_snprintf(errstr, errstr_size,
                                            "Disconnected");
                                return -1;
                        } else if (r == -1) {
                                rd_snprintf(errstr, errstr_size, "%s",
                                            rd_strerror(errno));
                                return -1;
                        }
                }

                /* Update buffer write position */
                // 更新buf的写位置
                rd_buf_write(rbuf, NULL, (size_t)r);

                sum += r;

                /* FIXME: remove this and try again immediately and let
                 *        the next recv() call fail instead? */
                if ((size_t)r < len)
                        break;
        }
        return sum;
}

Librdkafka源码分析-Content Table

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,703评论 13 425
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,302评论 1 15
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,428评论 0 34
  • 为了取这个标题我一度纠结了很久,就像我对一些不吐不快的东西一度纠结了很久一样。 我曾幻想过这篇伟大的文章诞生,它可...
    ___海阅读 156评论 0 1