Kafka源码分析-序列2 -Producer -Metadata的数据结构与读取、更新策略

在上一篇,我们从使用方式和策略上,对消息队列做了一个宏观描述。从本篇开始,我们将深入到源码内部,仔细分析Kafka到底是如何实现一个分布式消息队列。我们的分析将从Producer端开始。

从Kafka 0.8.2开始,发布了一套新的Java版的client api, KafkaProducer/KafkaConsumer,替代之前的scala版的api。本系列的分析将只针对这套Java版的api。

多线程异步发送模型

下图是经过源码分析之后,整理出来的Producer端的架构图:


1.png

在上一篇我们讲过,Producer有同步发送和异步发送2种策略。在以前的Kafka client api实现中,同步和异步是分开实现的。而在0.9中,同步发送其实是通过异步发送间接实现,其接口如下:

public class KafkaProducer<K, V> implements Producer<K, V> {  
...  
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)  //异步发送接口  
     {  
     ...  
     }  
}

要实现同步发送,只要在拿到返回的Future对象之后,直接调用get()就可以了。

基本思路

从上图我们可以看出,异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集群。

要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都需要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每一个Topic的每一个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。

2个数据流

所以在上图中,有2个数据流:
Metadata流(A1,A2,A3):Sender从集群获取信息,然后更新Metadata; KafkaProducer先读取Metadata,然后把消息放入队列。

消息流(B1, B2, B3):这个很好理解,不再详述。

本篇着重讲述Metadata流,消息流,将在后续详细讲述。

Metadata的线程安全性

从上图可以看出,Metadata是多个producer线程读,一个sender线程更新,因此它必须是线程安全的。

Kafka的官方文档上也有说明,KafkaProducer是线程安全的,可以在多线程中调用:

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

从下面代码也可以看出,它的所有public方法都是synchronized:

public final class Metadata {  
  。。。  
    public synchronized Cluster fetch() {  
        return this.cluster;  
    }  
    public synchronized long timeToNextUpdate(long nowMs) {  
       。。。  
    }  
    public synchronized int requestUpdate() {  
      。。。  
    }  
    。。。      
}

Metadata的数据结构

下面代码列举了Metadata的主要数据结构:一个Cluster对象 + 1堆状态变量。前者记录了集群的配置信息,后者用于控制Metadata的更新策略。

public final class Metadata {  
...  
    private final long refreshBackoffMs;  //更新失败的情况下,下1次更新的补偿时间(这个变量在代码中意义不是太大)  
    private final long metadataExpireMs; //关键值:每隔多久,更新一次。缺省是600*1000,也就是10分种  
    private int version;         //每更新成功1次,version递增1。这个变量主要用于在while循环,wait的时候,作为循环判断条件  
    private long lastRefreshMs;  //上一次更新时间(也包含更新失败的情况)  
    private long lastSuccessfulRefreshMs; //上一次成功更新的时间(如果每次都成功的话,则2者相等。否则,lastSuccessulRefreshMs < lastRefreshMs)  
    private Cluster cluster;   //集群配置信息  
    private boolean needUpdate;  //是否强制刷新  
、  
  ...  
}  
  
public final class Cluster {  
...  
    private final List<Node> nodes;   //Node也就是Broker  
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;  //Topic/Partion和broker list的映射关系  
    private final Map<String, List<PartitionInfo>> partitionsByTopic;  
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;  
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;  
    private final Map<Integer, Node> nodesById;  
}  
  
public class PartitionInfo {  
    private final String topic;  
    private final int partition;  
    private final Node leader;  
    private final Node[] replicas;  
    private final Node[] inSyncReplicas;  
}

producer读取Metadata

下面是send函数的源码,可以看到,在send之前,会先读取metadata。如果metadata读不到,会一直阻塞在那,直到超时,抛出TimeoutException

//KafkaProducer  
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {  
        try {  
     long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);  //拿不到topic的配置信息,会一直阻塞在这,直到抛异常  
  
     ... //拿到了,执行下面的send逻辑  
     } catch()  
     {}  
 }  
  
//KafkaProducer  
    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {  
        if (!this.metadata.containsTopic(topic))  
            this.metadata.add(topic);  
  
        if (metadata.fetch().partitionsForTopic(topic) != null)  
            return 0;   //取到topic的配置信息,直接返回  
  
        long begin = time.milliseconds();  
        long remainingWaitMs = maxWaitMs;  
        while (metadata.fetch().partitionsForTopic(topic) == null) { //取不到topic的配置信息,一直死循环wait,直到超时,抛TimeoutException  
            log.trace("Requesting metadata update for topic {}.", topic);  
            int version = metadata.requestUpdate(); //把needUpdate置为true  
            sender.wakeup(); //唤起sender  
  
            metadata.awaitUpdate(version, remainingWaitMs); //metadata的关键函数  
            long elapsed = time.milliseconds() - begin;  
            if (elapsed >= maxWaitMs)  
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");  
            if (metadata.fetch().unauthorizedTopics().contains(topic))  
                throw new TopicAuthorizationException(topic);  
            remainingWaitMs = maxWaitMs - elapsed;  
        }  
        return time.milliseconds() - begin;  
    }  
  
//Metadata  
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {  
        if (maxWaitMs < 0) {  
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");  
        }  
        long begin = System.currentTimeMillis();  
        long remainingWaitMs = maxWaitMs;  
        while (this.version <= lastVersion) {  //当Sender成功更新meatadata之后,version加1。否则会循环,一直wait  
            if (remainingWaitMs != 0  
                wait(remainingWaitMs);  //线程的wait机制,wait和synchronized的配合使用  
            long elapsed = System.currentTimeMillis() - begin;  
            if (elapsed >= maxWaitMs)  //wait时间超出了最长等待时间  
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");  
            remainingWaitMs = maxWaitMs - elapsed;  
        }  
    }

总结:从上面代码可以看出,producer wait metadata的时候,有2个条件:
(1) while (metadata.fetch().partitionsForTopic(topic) == null)
(2)while (this.version <= lastVersion)

有wait就会有notify,notify在Sender更新Metadata的时候发出。

Sender的创建

下面是KafkaProducer的构造函数,从代码可以看出,Sender就是KafkaProducer中创建的一个Thread.

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {  
        try {  
        ...  
                    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); //构造metadata  
  
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); //往metadata中,填入初始的,配置的node列表  
  
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());  
  
            NetworkClient client = new NetworkClient(  
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),  
                    this.metadata,  
                    clientId,  
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),  
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),  
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),  
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),  
  
  
            this.sender = new Sender(client,  //构造一个sender。sender本身实现的是Runnable接口  
                    this.metadata,  
                    this.accumulator,  
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),  
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),  
                    config.getInt(ProducerConfig.RETRIES_CONFIG),  
                    this.metrics,  
                    new SystemTime(),  
                    clientId,  
                    this.requestTimeoutMs);  
  
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");  
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);  
            this.ioThread.start();  //一个线程,开启sender

Sender poll()更新Metadata

    public void run() {  
        // main loop, runs until close is called  
        while (running) {  
            try {  
                run(time.milliseconds());  
            } catch (Exception e) {  
                log.error("Uncaught error in kafka producer I/O thread: ", e);  
            }  
        }  
       。。。  
    }  
  
    public void run(long now) {  
        Cluster cluster = metadata.fetch();  
。。。  
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);   //遍历消息队列中所有的消息,找出对应的,已经ready的Node  
  
        if (result.unknownLeadersExist)  //如果一个ready的node都没有,请求更新metadata  
            this.metadata.requestUpdate();  
  
  。。。  
  
     //client的2个关键函数,一个发送ClientRequest,一个接收ClientResponse。底层调用的是NIO的poll。关于nio, 后面会详细介绍  
        for (ClientRequest request : requests)  
            client.send(request, now);  
  
        this.client.poll(pollTimeout, now);  
    }  
  
//NetworkClient  
    public List<ClientResponse> poll(long timeout, long now) {  
        long metadataTimeout = metadataUpdater.maybeUpdate(now); //关键点:每次poll的时候判断是否要更新metadata  
  
        try {  
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));  
        } catch (IOException e) {  
            log.error("Unexpected error during I/O", e);  
        }  
  
        // process completed actions  
        long updatedNow = this.time.milliseconds();  
        List<ClientResponse> responses = new ArrayList<>();  
        handleCompletedSends(responses, updatedNow);  
        handleCompletedReceives(responses, updatedNow);   //在返回的handler中,会处理metadata的更新  
        handleDisconnections(responses, updatedNow);  
        handleConnections();  
        handleTimedOutRequests(responses, updatedNow);  
  
        // invoke callbacks  
        for (ClientResponse response : responses) {  
            if (response.request().hasCallback()) {  
                try {  
                    response.request().callback().onComplete(response);  
                } catch (Exception e) {  
                    log.error("Uncaught error in request completion:", e);  
                }  
            }  
        }  
  
        return responses;  
    }  
  
 //DefaultMetadataUpdater  
         @Override  
        public long maybeUpdate(long now) {  
            // should we update our metadata?  
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);  
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);  
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;  
            // if there is no node available to connect, back off refreshing metadata  
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),  
                    waitForMetadataFetch);  
  
            if (metadataTimeout == 0) {  
                // highly dependent on the behavior of leastLoadedNode.  
                Node node = leastLoadedNode(now);  //找到负载最小的Node  
                maybeUpdate(now, node); //把更新Metadata的请求,发给这个Node  
            }  
  
            return metadataTimeout;  
        }  
  
        private void maybeUpdate(long now, Node node) {  
            if (node == null) {  
                log.debug("Give up sending metadata request since no node is available");  
                // mark the timestamp for no node available to connect  
                this.lastNoNodeAvailableMs = now;  
                return;  
            }  
            String nodeConnectionId = node.idString();  
  
            if (canSendRequest(nodeConnectionId)) {  
                Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();  
                this.metadataFetchInProgress = true;  
                ClientRequest metadataRequest = request(now, nodeConnectionId, topics);  //关键点:发送更新Metadata的Request  
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());  
                doSend(metadataRequest, now); //这里只是异步发送,返回的response在上面的handleCompletedReceives里面处理  
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {  
                log.debug("Initialize connection to node {} for sending metadata request", node.id());  
                initiateConnect(node, now);  
  
            } else { // connected, but can't send more OR connecting  
                this.lastNoNodeAvailableMs = now;  
            }  
        }  
  
     private void handleCompletedReceives(List<ClientResponse> responses, long now) {  
        for (NetworkReceive receive : this.selector.completedReceives()) {  
            String source = receive.source();  
            ClientRequest req = inFlightRequests.completeNext(source);  
            ResponseHeader header = ResponseHeader.parse(receive.payload());  
            // Always expect the response version id to be the same as the request version id  
            short apiKey = req.request().header().apiKey();  
            short apiVer = req.request().header().apiVersion();  
            Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());  
            correlate(req.request().header(), header);  
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))  
                responses.add(new ClientResponse(req, now, false, body));  
        }  
    }  
  
  
        @Override  
        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {  
            short apiKey = req.request().header().apiKey();  
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {  
                handleResponse(req.request().header(), body, now);  
                return true;  
            }  
            return false;  
        }  
  
//关键函数  
        private void handleResponse(RequestHeader header, Struct body, long now) {  
            this.metadataFetchInProgress = false;  
            MetadataResponse response = new MetadataResponse(body);  
            Cluster cluster = response.cluster();   //从response中,拿到一个新的cluster对象  
            if (response.errors().size() > 0) {  
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());  
            }  
  
            if (cluster.nodes().size() > 0) {  
                this.metadata.update(cluster, now);   //更新metadata,用新的cluster覆盖旧的cluster  
            } else {  
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());  
                this.metadata.failedUpdate(now);  //更新metadata失败,做失败处理逻辑  
            }  
        }  
  
  
//更新成功,version+1, 同时更新其它字段  
    public synchronized void update(Cluster cluster, long now) {  
        this.needUpdate = false;  
        this.lastRefreshMs = now;  
        this.lastSuccessfulRefreshMs = now;  
        this.version += 1;  
  
        for (Listener listener: listeners)  
            listener.onMetadataUpdate(cluster);  //如果有人监听了metadata的更新,通知他们  
  
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;    //新的cluster覆盖旧的cluster  
  
        notifyAll();  //通知所有的阻塞的producer线程  
  
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);  
    }  
  
//更新失败,只更新lastRefreshMs  
    public synchronized void failedUpdate(long now) {  
        this.lastRefreshMs = now;  
    }

从上面可以看出,Metadata的更新,是在while循环,每次调用client.poll()的时候更新的。

更新机制又有以下2种:

Metadata的2种更新机制

(1)周期性的更新: 每隔一段时间更新一次,这个通过 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 这2个字段来实现

对应的ProducerConfig配置项为:
metadata.max.age.ms //缺省300000,即10分钟1次

(2) 失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新。 requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了false

每次poll的时候,都检查这2种更新机制,达到了,就触发更新。

那如何判定Metadata失效了呢?这个在代码中很分散,有很多地方,会判定Metadata失效。

Metadata失效检测

条件1:initConnect的时候

private void initiateConnect(Node node, long now) {  
    String nodeConnectionId = node.idString();  
    try {  
        log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());  
        this.connectionStates.connecting(nodeConnectionId, now);  
        selector.connect(nodeConnectionId,  
                         new InetSocketAddress(node.host(), node.port()),  
                         this.socketSendBuffer,  
                         this.socketReceiveBuffer);  
    } catch (IOException e) {  
        connectionStates.disconnected(nodeConnectionId, now);  
        metadataUpdater.requestUpdate(); //判定metadata失效  
        log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);  
    }  
}

条件2:poll里面IO的时候,连接断掉了

private void handleDisconnections(List<ClientResponse> responses, long now) {  
    for (String node : this.selector.disconnected()) {  
        log.debug("Node {} disconnected.", node);  
        processDisconnection(responses, node, now);  
    }  
    if (this.selector.disconnected().size() > 0)  
        metadataUpdater.requestUpdate();  //判定metadata失效  
}

条件3:有请求超时

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {  
    List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);  
    for (String nodeId : nodeIds) {  
        this.selector.close(nodeId);  
        log.debug("Disconnecting from node {} due to request timeout.", nodeId);  
        processDisconnection(responses, nodeId, now);  
    }  
  
    if (nodeIds.size() > 0)  
        metadataUpdater.requestUpdate();  //判定metadata失效  
}

条件4:发消息的时候,有partition的leader没找到

public void run(long now) {  
    Cluster cluster = metadata.fetch();  
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);  
  
    if (result.unknownLeadersExist)  
        this.metadata.requestUpdate();

条件5:返回的response和请求对不上的时候

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {  
    int correlationId = response.request().request().header().correlationId();  
    if (response.wasDisconnected()) {  
        log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()  
                                                                                              .request()  
                                                                                              .destination());  
        for (RecordBatch batch : batches.values())  
            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);

总之1句话:发生各式各样的异常,数据不同步,都认为metadata可能出问题了,要求更新。

Metadata其他的更新策略

除了上面所述,Metadata的更新,还有以下几个特点:

1.更新请求MetadataRequest是nio异步发送的,在poll的返回中,处理MetadataResponse的时候,才真正更新Metadata。

这里有个关键点:Metadata的cluster对象,每次是整个覆盖的,而不是局部更新。所以cluster内部不用加锁。

2.更新的时候,是从metadata保存的所有Node,或者说Broker中,选负载最小的那个,也就是当前接收请求最少的那个。向其发送MetadataRequest请求,获取新的Cluster对象。

欢迎加入QQ群:104286694

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,565评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,286评论 1 15
  • 转帖:原文地址http://www.infoq.com/cn/articles/depth-interpretat...
    端木轩阅读 2,308评论 0 19
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,420评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,812评论 8 167