架构
Client
生产发送流程
Server:
kafka 网络架构
kafka 数据存储
kafka 副本同步
kafka 元数据管理
一个demo
1、 producer核心流程:初始化对象 05}
2、producer初始化过程 06} 0:25
1:00:00-1:18:12
\kafka-0.10.1.0-src\examples\src\main\java\kafka\examples\Producer.java
构造函数
——》Producer()
//TODO 初始化kafkaProducer
producer = new KafkaProducer<>(props);
——》KafkaProducer构造函数((ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)
▼
//配置clientId
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
//设置分区器
this.partitioner = config.getConfiguredInstance(
//重试时间 retry.backoff.ms 默认100ms
long retryBackoffMs = config.getLong
//设置序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(
//设置拦截器
//类似于一个过滤器
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>
//metadata.max.age.ms(默认5分钟)
//生产者每隔一段时间都要去更新一下集群的元数据。
this.metadata = new Metadata(retryBackoffMs,
//max.request.size 生产者往服务端发送消息的时候,规定一条消息最大多大?
//如果你超过了这个规定消息的大小,你的消息就不能发送过去。
//默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。
//经验值是10M。但是大家也可以根据自己公司的情况来。
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//指的是缓存大小
//默认值是32M,这个值一般是够用,如果有特殊情况的时候,我们可以去修改这个值。
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
//kafka是支持压缩数据的,这儿设置压缩格式。
//提高你的系统的吞吐量,你可以设置压缩格式。
//一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.
this.compressionType = CompressionType.forName(
//TODO 初始化了一个重要的管理网路的组件。
/**
* (1)connections.max.idle.ms: 默认值是9分钟 一个网络连接最多空闲多久,超过这个空闲就关闭这个网络连接。
*(2)max.in.flight.requests.per.connection:默认是5 producer向broker发送数据的时候,其实是有多个网络连接。 每个网络连接可以忍受 producer端发送给broker 消息然后没有响应的个数。
- 因为kafka有重试机制,所以有可能会造成数据乱序,如果想要保证有序,这个值要把设置为1.
*(3)send.buffer.bytes:socket发送数据的缓冲区的大小,默认值是128K - (4)receive.buffer.bytes:socket接受数据的缓冲区的大小,默认值是32K。
*/
NetworkClient client = new NetworkClient(
//这个就是一个发送线程
/*(1) retries:重试的次数
* (2) acks:
* 0:producer发送数据到broker后,就完了,没有返回值,不管写成功还是写失败都不管了。
* 1: producer发送数据到broker后,数据成功写入leader partition以后返回响应。
- -1: producer发送数据到broker后,数据要写入到leader partition里面,并且数据同步到所有的 follower partition里面以后,才返回响应。
*/
this.sender = new Sender(client,
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
//创建了一个线程,然后里面传进去了一个sender对象。
//把业务的代码和关于线程的代码给隔离开来。
//关于线程的这种代码设计的方式,其实也值得大家积累的。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
//启动线程。
this.ioThread.start();
(2)元数据管理 07} 0:6
1:18:00一1:25:00
入口
——》KafkaProducer((ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)
▼
//生产者从服务端那儿拉取过来的kafka的元数据。
//生产者要想去拉取元数据, 发送网络请求,重试,
//metadata.max.age.ms(默认5分钟)
//生产者每隔一段时间都要去更新一下集群的元数据。
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
//去更新元数据
//addresses 这个地址其实就是我们写producer代码的时候,传参数的时候,传进去了一个broker的地址。
//所以这段代码看起来像是去服务端拉取元数据,所以我们去验证一下,是否真的去拉取元数据。
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
//TODO update方法初始化的时候并没有去服务端拉取元数据。
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
(2.1)元数据管理2 元数据结构 07} 0:15
java/org/apache/kafka/clients/Metadata.java
java/org/apache/kafka/common/Cluster.java
java/org/apache/kafka/common/Node.java
java/org/apache/kafka/common/PartitionInfo.java
(3.1)KafkaProducer 核心流程:发送 08} 0:14
1:25:00—1:32:00
java/kafka/examples/Producer.java
//isAsync , kafka发送数据的时候,有两种方式
//1: 异步发送
//2: 同步发送
//isAsync: true的时候是异步发送,false就是同步发送
if (isAsync) { // Send asynchronously
//异步发送,一直发送,消息响应结果交给回调函数处理
//这样的方式,性能比较好,我们生产代码用的就是这种方式。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
主类 org/apache/kafka/clients/producer/KafkaProducer.java
——》KafkaProducer.send()
——》★ KafkaProducer.doSend()八大步骤
▼
/**
* 步骤一:
* 同步等待拉取元数据。
* maxBlockTimeMs 最多能等待多久。
/
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(),
/*
* 步骤二:
* 对消息的key和value进行序列化。
/
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
/*
* 步骤三:
* 根据分区器选择消息应该发送的分区。
*
* 因为前面我们已经获取到了元数据
* 这儿我们就可以根据元数据的信息
* 计算一下,我们应该要把这个数据发送到哪个分区上面。
*/
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
/**
* 步骤四:
* 确认一下消息的大小是否超过了最大值。
* KafkaProdcuer初始化的时候,指定了一个参数,代表的是Producer这儿最大能发送的是一条消息能有多大
* 默认最大是1M,我们一般都回去修改它。
*/
ensureValidRecordSize(serializedSize);
/**
* 步骤五:
* 根据元数据信息,封装分区对象
*/
tp = new TopicPartition(record.topic(), partition);
/**
* 步骤六:
* 给每一条消息都绑定他的回调函数。因为我们使用的是异步的方式发送的消息。
/
Callback interceptCallback = this.interceptors == null ? callback :
/*
* 步骤七:
* 把消息放入accumulator(32M的一个内存)
* 然后有accumulator把消息封装成为一个批次一个批次的去发送。
/
RecordAccumulator.RecordAppendResult result = accumulator.append(
/*
* 步骤八:
* 唤醒sender线程。他才是真正发送数据的线程。
*/
this.sender.wakeup();
(3.2)KafkaProducer核心流程2:异常体系 08 02} 0:6
异常体系构建 ,自定义异常+ 多层抛出
(4.1 )KafkaProducer 加载元数据1 09 01} 0:15
——》★ KafkaProducer.doSend()里面
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata
——》KafkaProducer .waitOnMetadata()
/**
* TODO 这个步骤重要
* 我们发现这儿去唤醒sender线程。
* 其实是因为,拉取有拉取元数据这个操作是有sender线程去完成的。
* 这个地方把线程给唤醒了以后
* 我们知道sender线程肯定就开始进行干活了 至于怎么我们后面在继续分析。
* 很明显,真正去获取元数据是这个线程完成。
*/
sender.wakeup();
(4.2)KafkaProducer 加载元数据2 09 02} 0:7
——》KafkaProducer .waitOnMetadata()
//TODO 等待元数据
//同步的等待
//等待这sender线程获取到元数据。
metadata.awaitUpdate(version, remainingWaitMs);
(4.3)KafkaProducer 加载元数据3 09 03} 0:25
——》KafkaProducer .waitOnMetadata()
——》 sender.wakeup();
sender线程 在KafkaProducer 的 构造函数里面
//创建了一个线程,然后里面传进去了一个sender对象。
//把业务的代码和关于线程的代码给隔离开来。
//关于线程的这种代码设计的方式,其实也值得大家积累的。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
//启动线程。
this.ioThread.start();
——》 java/org/apache/kafka/clients/producer/internals/Sender.java
——》 run 方法
——》 void run(long now) {
//TODO 重点就是去看这个方法
//就是用这个方法拉取的元数据。
/**
* 步骤八:
* 真正执行网络操作的都是这个NetWordClient这个组件
* 包括:发送请求,接受响应(处理响应)
*
* 拉取元数据信息,靠的就是这段代码
*/
//我们猜这儿可能就是去建立连接。
this.client.poll(pollTimeout, now);
↓
java/org/apache/kafka/clients/NetworkClient.java
—— ★》 NetworkClient.poll ()
//步骤一:封装了一个要拉取元数据请求
long metadataTimeout = metadataUpdater.maybeUpdate(now);
//步骤二: 发送请求,进行复杂的网络操作
//但是我们目前还没有学习到kafka的网络
//所以这儿大家就只需要知道这儿会发送网络请求。
//TODO 执行网络IO的操作。 NIO
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
//步骤三:处理响应,响应里面就会有我们需要的元数据。
/**
* 这个地方是我们在看生产者是如何获取元数据的时候,看的。
* 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。
* 获取元数据 -》 判断网络连接是否建立好 -》 建立网络连接
* -》 发送请求(获取元数据的请求) -》 服务端发送回来响应(带了集群的元数据信息)
*
*/
handleCompletedReceives(responses, updatedNow);
——》NetworkClient.DefaultMetadataUpdater.handleResponse()
(4.4)KafkaProducer 加载元数据4 流程图 09 04} 0:7
(5)分区选择逻辑 10 } 0:10
2:11:00---2:21:22
—— ★》 KafkaProducer.doSend()里面 步骤三
▼
/**
* 步骤三:
* 根据分区器选择消息应该发送的分区。
* 因为前面我们已经获取到了元数据
* 这儿我们就可以根据元数据的信息
* 计算一下,我们应该要把这个数据发送到哪个分区上面。
*/
int partition = partition(record, serializedKey, serializedValue, cluster);
——》KafkaProducer.partition()
//如果你的这个消息已经分配了分区号,那直接就用这个分区号就可以了
//但是正常情况下,消息是没有分区号的。
//使用分区器进行选择合适的分区
partitioner.partition( record.topic(), record.key(),
↓
——》DefaultPartitioner.partition()
(6.1)封装recordAccumulator 11.01 } 0:11
2:21:00---2:26:00
—— ★》 KafkaProducer.doSend()里面 步骤四、五、六、七
/**
* 步骤四:
* 确认一下消息的大小是否超过了最大值。
* KafkaProdcuer初始化的时候,指定了一个参数,代表的是Producer这儿最大能发送的是一条消息能有多大
* 默认最大是1M,我们一般都回去修改它。
*/
ensureValidRecordSize(serializedSize);
/**
* 步骤五:
* 根据元数据信息,封装分区对象
/
tp = new TopicPartition(record.topic(), partition);
/*
* 步骤六:
* 给每一条消息都绑定他的回调函数。因为我们使用的是异步的方式发送的消息。
/
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
/*
* 步骤七:
* 把消息放入accumulator(32M的一个内存)
* 然后有accumulator把消息封装成为一个批次一个批次的去发送。
*/
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
转到
——》 RecordAccumulator.append()
(6.2)封装recordAccumulator 2 11.02 } 0:15
——》 RecordAccumulator.append()详解7大步骤
/**
- 步骤一:先根据分区找到应该插入到哪个队列里面。
- 如果有已经存在的队列,那么我们就使用存在队列
- 如果队列不存在,那么我们新创建一个队列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
/**
* 步骤二:
* 尝试往队列里面的批次里添加数据
* 一开始添加数据肯定是失败的,我们目前只是以后了队列
* 数据是需要存储在批次对象里面(这个批次对象是需要分配内存的)
* 我们目前还没有分配内存,所以如果按场景驱动的方式,
* 代码第一次运行到这儿其实是不成功的。
*/
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
/**
* 步骤三:计算一个批次的大小
* 在消息的大小和批次的大小之间取一个最大值,用这个值作为当前这个批次的大小。
* 有可能我们的一个消息的大小比一个设定好的批次的大小还要大。
* 默认一个批次的大小是16K。
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
/**
* 步骤四:
* 根据批次的大小去分配内存
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
/**
* 步骤五:
* 尝试把数据写入到批次里面。
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
/**
* 步骤六:
* 根据内存大小封装批次
* 线程一到这儿 会根据内存封装出来一个批次。
*/
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
/**
* 步骤七:
* 把这个批次放入到这个队列的队尾
* 线程一把批次添加到队尾
*/
dq.addLast(batch);
incomplete.add(batch);
(7) 读写分离 copyOnwrite数据结构使用 12} 0:26
2:26:00---2:41:00
入口
——》 RecordAccumulator.append()
——》 getOrCreateDeque()
Deque<RecordBatch> d = this.batches.get(tp);
//把这个空的队列存入batches 这个数据结构里面
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
batches 是 RecordAccumulator 的 成员变量
ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
在RecordAccumulator构造函数里面赋值
this.batches = new CopyOnWriteMap<>();
CopyOnWriteMap是自定义类
——》CopyOnWriteMap.putIfAbsent()
▼
//如果我们传进来的这个key不存在
if (!containsKey(k))
//那么就调用里面内部的put方法
return put(k, v);
else
//返回结果
return get(k);
}
//新的内存空间
//读写分离
//往新的内存空间里面插入
//读,读数据就老读空间里面去
Map<K, V> copy = new HashMap<K, V>(this.map);
//插入数据
V prev = copy.put(k, v);
//赋值给map
this.map = Collections.unmodifiableMap(copy);
(8) 数据写入对应批次(分段加锁) 13} 0:21
2:41:42-----2:53:11
入口
——》 RecordAccumulator.append()
分段加锁 步骤1~ 6 可以最大并发
synchronized (dq) {
}
(9) 发送者内存池设计 14} 0:28
人口——》 RecordAccumulator.append()
* 步骤四:
* 根据批次的大小去分配内存
——》BufferPool.allocate()
/**
*
* 总的分配的思路,可能一下子分配不了这么大的内存,但是可以先有点分配一点。
*
*/
//如果分配的内存的大小 还是没有要申请的内存大小大。
//内存池就会一直分配的内存,一点一点的去分配。
//等着别人会释放内存。
//accumulated 5K+16K=21K 16K
// size 32K
while (accumulated < size) {
(9) 发送者线程sender详解1 15} 0:15
入口 KafkaProducer 构造函数
//这个就是一个线程
this.sender = new Sender(client,
——》Sender.run(long now) 方法 , 8个步骤
- 步骤一:(上面章节讲了)
获取元数据
Cluster cluster = metadata.fetch();
- 步骤二:首先是判断哪些partition有消息可以发送:
* 获取到这个partition的leader partition对应的broker主机(根据元数据信息来就可以了)
* 哪些broker上面需要我们去发送消息?
*/
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
/**
* 步骤三:
* 标识还没有拉取到元数据的topic
*/
if (!result.unknownLeaderTopics.isEmpty()) {
/**
* 步骤四:检查与要发送数据的主机的网络是否已经建立好。
*/
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
/**
* 步骤五:
*
* 我们有可能要发送的partition有很多个,
* 很有可能有一些partition的leader partition是在同一台服务器上面。
- 按照broker进行分组,同一个broker的partition为同一组
* 0:{p0,p1} -> 批次
* 1:{p2}
* 2:{p3}
/**
* 步骤六:
* 对超时的批次是如何处理的?
*
/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
/*
* 步骤七:
* 创建发送消息的请求
* 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面
* ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁
* 我们要知道,我们集群里面网络资源是非常珍贵的。
* 会把发往同个broker上面partition的数据 组合成为一个请求。
* 然后统一一次发送过去,这样子就减少了网络请求。
*/
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
//如果网络连接没有建立好 batches其实是为空。
//也就说其实这段代码也是不会执行。
/**
* 步骤八:
* 真正执行网络操作的都是这个NetWordClient这个组件
* 包括:发送请求,接受响应(处理响应)
* 拉取元数据信息,靠的就是这段代码
*/
//我们猜这儿可能就是去建立连接。
this.client.poll(pollTimeout, now);
(10) 发送者线程sender详解2 batch什么条件下发送? 16} 0:25
——》Sender.run(long now) 方法 里面
/**
* 步骤二:
* 首先是判断哪些partition有消息可以发送:
* 我们看一下一个批次可以发送出去的条件
* 获取到这个partition的leader partition对应的broker主机(根据元数据信息来就可以了)
* 哪些broker上面需要我们去发送消息?
*/
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
——》RecordAccumulator.ready()
▼
//waiters里面有数据,说明我们的这个内存池里面内存不够了。
//如果exhausted的值等于true,说明内存池里面的内存不够用了。
boolean exhausted = this.free.queued() > 0;
//首先从队列的队头获取到批次
RecordBatch batch = deque.peekFirst();
//如果这个catch不null,我们判断一下是否可以发送这个批次。
if (batch != null) {
(11) 发送者线程sender详解3 筛选可以发送消息的broker(1) 17 1} 0:16
——》Sender.run(long now) 方法 里面
/**
* 步骤三:
* 标识还没有拉取到元数据的topic
*/
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
/**
* 步骤四:检查与要发送数据的主机的网络是否已经建立好。
*/
if (!this.client.ready(node, now)) {
(12) 发送者线程sender详解3 筛选可以发送消息的broker(2) 17 2} 0:16
(13) 发送者 源码之kafka网络设计 18 1 2 3} 0:10 0:5 0:8
——》Sender.run(long now) 方法 里面
/**
* 步骤四:检查与要发送数据的主机的网络是否已经建立好。
*/
if (!this.client.ready(node, now)) {
↓
——》NetworkClient.ready()
——》NetworkClient. initiateConnect()
//TODO 尝试建立连接
selector.connect(nodeConnectionId
1 ——》Selector类 各成员变量 详解
▼
* TODO 这个selector是kafka主机封装的一个selector
* 他是基于java NIO里面的selector去封装的。
//这个对象就是javaNIO里面的Selector
//Selector是负责网络的建立,发送网络请求,处理实际的网络IO。
//所以他是最最核心的这么样的一个组件。
private final java.nio.channels.Selector nioSelector;
//broker 和 KafkaChannel(SocketChnnel)的映射
//这儿的kafkaChannel大家暂时可以理解为就是SocketChannel
//代表的就是一个网络连接。
private final Map<String, KafkaChannel> channels;
//已经完成发送的请求
private final List<Send> completedSends;
//已经接收到的,并且处理完了的响应。
private final List<NetworkReceive> completedReceives;
//已经接收到了,但是还没来得及处理的响应。
//一个连接,对应一个响应队列
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
//没有建立连接的主机
private final List<String> disconnected;
//完成建立连接的主机
private final List<String> connected;
//建立连接失败的主机。
private final List<String> failedSends;
2 ——》KafkaChannel类 各成员变量 详解
//一个broker就对应一个KafkaChannel
//这个id就是broker的id
private final String id;
//接收到的响应
private NetworkReceive receive;
//发送出去的请求
private Send send;
//我们推测这个里面应该会有SocketChannel
private final TransportLayer transportLayer;
▼
* returns underlying socketChannel
* 这个核心的组件,就是javaNIO里面的SocketChannel
SocketChannel socketChannel();
(14) 发送者 源码 Selector.connect 与broker建立连接 20 1 } 0:13
——》Sender.run(long now) 方法 里面
/**
* 步骤四:检查与要发送数据的主机的网络是否已经建立好。
*/
if (!this.client.ready(node, now)) {
↓
——》NetworkClient.ready()
——》NetworkClient. initiateConnect()
//TODO 尝试建立连接
selector.connect(nodeConnectionId
——》Selector.connect()
▼
//获取到SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞的模式
socketChannel.configureBlocking(false);
//设置一些参数
//这些网络的参数,我们之前在分析Producer的时候给大家看过
//都有一些默认值。
//这个的默认值是false,代表要开启Nagle的算法
//它会把网络中的一些小的数据包收集起来,组合成一个大的数据包
//然后再发送出去。因为它认为如果网络中有大量的小的数据包在传输
//其实是会影响网络拥塞。
//kafka一定不能把这儿设置为false,因为我们有些时候可能有些数据包就是比较
//小,他这儿就不帮我们发送了,显然是不合理的。
socket.setTcpNoDelay(true);
//尝试去服务器去连接。
//因为这儿非阻塞的
//有可能就立马连接成功,如果成功了就返回true
//也有可能需要很久才能连接成功,返回false。
connected = socketChannel.connect(address);
//SocketChannel往Selector上注册了一个OP_CONNECT
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
//根据根据SocketChannel 封装出来一个KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//把key和KafkaChannel关联起来
//后面使用起来会比较方便
//我们可以根据key就找到KafkaChannel
//也可以根据KafkaChannel找到key
key.attach(channel);
//缓存起来了
this.channels.put(id, channel);
//所以正常情况下,这儿网络不能完成连接。
//如果这儿不能完成连接。大家猜一下
//kafka会在哪儿完成网络最后的连接呢?
//如果里面就连接上了
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
// 取消前面注册 OP_CONNECT 事件。
key.interestOps(0);
}
(15) 发送者 源码 Selector.connect 与broker建立连接2 完成最后的 连接 20 2 } 0:10
——》Sender.run(long now) 方法 里面
/**
* 步骤八:
* 真正执行网络操作的都是这个NetWordClient这个组件
-
包括:发送请求,接受响应(处理响应)
* 拉取元数据信息,靠的就是这段代码
//我们猜这儿可能就是去建立连接。
this.client.poll(pollTimeout, now);
↓
——》NetworkClient.poll();
▼
//步骤二: 发送请求,进行复杂的网络操作
//但是我们目前还没有学习到kafka的网络
//所以这儿大家就只需要知道这儿会发送网络请求。
//TODO 执行网络IO的操作。 NIO
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
↓
——》Selector.poll()
▼
//从Selector上找到有多少个key注册了
int readyKeys = select(timeout);
//因为我们用场景驱动的方式
//我们刚刚确实是注册了一个key
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//立马就要对这个Selector上面的key要进行处理。
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);——》Selector.pollSelectionKeys() ▼
//遍历所有的key
while (iterator.hasNext()) {
//根据key找到对应的KafkaChannel
KafkaChannel channel = channel(key);
/**
*
* 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是
* SelectionKey key = socketChannel.register(nioSelector,
* SelectionKey.OP_CONNECT);
*
*/
if (isImmediatelyConnected || key.isConnectable()) {
//TODO 核心的代码
//去最后完成网络的连接
//如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你
//完成网络的连接。
1:》 if (channel.finishConnect()) {
//网络连接已经完成了以后,就把这个channel存储到
this.connected.add(channel.id());
(16) 发送者 源码 Selector.connect 与broker建立连接3 连接示意图 20 3 } 0:17
(17) 发送者 源码产生者 发送网络请求了 21 1 } 0:20
接上期 1:》
//完成网络的连接。
if (channel.finishConnect()) {
——》PlaintextTransportLayer.finishConnect()
▼
//完成的最后的网络的连接
boolean connected = socketChannel.finishConnect();
//如果连接完成了以后。
if (connected)
//取消了OP_CONNECT事件
//增加了OP_READ 事件 我们这儿的这个key对应的KafkaChannel是不是就可以接受服务
//端发送回来的响应了。
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
▲
回到 ——》Sender.run(long now) 方法 里面 步骤4 ,现在网络已经建立连接
进入下面
/**
* 步骤七:
* 创建发送消息的请求
* 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面
* ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁
* 我们要知道,我们集群里面网络资源是非常珍贵的。
* 会把发往同个broker上面partition的数据 组合成为一个请求。
* 然后统一一次发送过去,这样子就减少了网络请求。
*/
List<ClientRequest> requests = createProduceRequests(batches, now);
//TODO 发送请求的操作
for (ClientRequest request : requests)
//绑定 op_write
client.send(request, now);
↓
——》 NetworkClient..send()
——》NetworkClient..doSend()
▼
//这儿往inFlightRequests 组件里存 Request请求。
//存储的就是还没有收到响应的请求。
//这个里面默认最多能存5个请求。
//其实我们可以猜想一个事,如果我们的请求发送出去了
//然后也成功的接受到了响应,后面就会到这儿把这个请求移除。
this.inFlightRequests.add(request);
//TODO
selector.send(request.request());
↓
——》Selector.send()
//获取到一个KafakChannel
KafkaChannel channel = channelOrFail(send.destination());
try {
//TODO
channel.setSend(send);
——》KafkaChannel .setSend()
▼
//关键的代码来了
//这儿绑定了一个OP_WRITE事件。
//一旦绑定了这个事件以后,我们就可以往服务端发送请求了。
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
▲
回到 ——》Sender.run(long now) 方法 里面 步骤8 ,
this.client.poll(pollTimeout, now);
↓
——》NetworkClient.poll();
↓
——》Selector.poll()
▼
//因为我们用场景驱动的方式
//我们刚刚确实是注册了一个key
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//立马就要对这个Selector上面的key要进行处理。
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
——》Selector. pollSelectionKeys()
▼
//核心代码,处理发送请求的事件
//selector 注册了一个OP_WRITE
//selector 注册了一个OP_READ
if (channel.ready() && key.isWritable()) {
//获取到我们要发送的那个网络请求。
//是这句代码就是要往服务端发送数据了。
//TODO:服务端
//里面我们发现如果消息被发送出去了,然后移除OP_WRITE
Send send = channel.write();
//已经完成响应消息的发送
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
(18) 发送者 源码 产生者 发送网络请求2 21 2 } 0:10
流程图图示如下:
(19) 发送者 源码 producer是如何处理粘包问题的1 22 1} 0:5
粘包: 一个请求里面有多个响应,多个消息粘在一起回来
转到这里
——》NetworkClient..doSend()
▼
//这儿往inFlightRequests 队列组件里存 Request请求。
//存储的就是还没有收到响应的请求。
//这个里面默认最多能存5个请求。
//其实我们可以猜想一个事,如果我们的请求发送出去了
//然后也成功的接受到了响应,后面就会到这儿把这个请求移除。
this.inFlightRequests.add(request);
inFlightRequests 类里面有 Map 成员变量
private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
(20) 发送者 源码 producer是如何处理粘包问题的2 22 2} 0:12
接上面:发送代码
——》Selector. pollSelectionKeys()
//里面不断的读取数据,读取数据的代码我们之前就已经分析过
//里面还涉及到粘包和拆包的一些问题。
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
——》NetworkReceive.readFromReadableChannel()
▼
//先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小)
int bytesRead = channel.read(size);
//一直要读取到当这个size没有剩余空间
//说明已经读取到了一个4字节的int类型的数了。
if (!size.hasRemaining()) {
//分配一个内存空间,这个内存空间的大小
//就是刚刚读出来的那个4字节的int的大小。
//10
this.buffer = ByteBuffer.allocate(receiveSize);
if (buffer != null) {
//去读取数据
int bytesRead = channel.read(buffer);
?? 如何判断读完了消息呢? 回到
while ((networkReceive = channel.read()) != null)
——》KafkaChannel.read()
▼
//一直在读取数据。
receive(receive);
//是否读完一个完整的响应消息
if (receive.complete()) {
——》NetworkReceive.complete()
//size 没有剩余空间(50) &&
return !size.hasRemaining() && !buffer.hasRemaining();
(21) 发送者 源码 producer是如何处理拆包问题 23 } 0:5
拆包:
和粘包类似
——》NetworkReceive.readFromReadableChannel()
▼
if (buffer != null) {
//去读取数据
int bytesRead = channel.read(buffer);
?? 如何判断读完了消息呢? 回到
while ((networkReceive = channel.read()) != null)
——》KafkaChannel.read()
▼
//一直在读取数据。
receive(receive);
//是否读完一个完整的响应消息
if (receive.complete()) {
(22) 发送者 源码 如何处理暂存状态的响应消息 24 } 0:32
——》Sender.run(long now) 方法入口
——》NetworkClient.poll()
——》NetworkClient.handleCompletedReceives()
(23) 发送者 源码 响应消息处理流程 25 } 0:25
——》Sender.run(long now) 方法入口
——》NetworkClient.poll()
▼
//步骤三:处理响应,响应里面就会有我们需要的元数据。
/**
* 这个地方是我们在看生产者是如何获取元数据的时候,看的。
* 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。
* 获取元数据 -》 判断网络连接是否建立好 -》 建立网络连接
* -》 发送请求(获取元数据的请求) -》 服务端发送回来响应(带了集群的元数据信息)
*
*/
//调用的响应的里面的我们之前发送出去的请求的回调函数
//看到了这儿,我们回头再去看一下
//我们当时发送请求的时候,是如何封装这个请求。
//不过虽然目前我们还没看到,但是我们可以大胆猜一下。
//当时封装网络请求的时候,肯定是给他绑定了一个回调函数。
response.request().callback().onComplete(response);
回调函数在封装时发送
——》Sender.produceRequest()
▼
public void onComplete(ClientResponse response) {
//回调函数要是被调用
//其实就是这个方法被执行了。
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
——》Sender.handleProduceResponse()回调函数
▼
//如果处理成功那就是成功了,但是如果服务端那儿处理失败了
//是不是也要给我们发送回来异常的信息。
//error 这个里面存储的就是服务端发送回来的异常码
Errors error = Errors.forCode(partResp.errorCode);
//获取到当前分区的响应。
RecordBatch batch = batches.get(tp);
//对响应进行处理
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
——》Sender.completeBatch()
▼
//如果响应里面带有异常 并且 这个请求是可以重试的
if (error != Errors.NONE && canRetry(batch, error)) {
// retry
} else {
//TODO 这儿过来的数据:(1)带有异常,但是不可以重试(1:压根就不让重试2:重试次数超了。
// (2)没有异常
//TODO 核心代码 把异常的信息也给带过去了
//我们刚刚看的就是这儿的代码
//里面调用了用户传进来的回调函数
//回调函数调用了以后
//说明我们的一个完整的消息的发送流程就结束了。
batch.done(baseOffset, timestamp, exception);
//看起来这个代码就是要回收资源的。
this.accumulator.deallocate(batch);
}
——》RecordBatch.done()
▼
//如果没有异常
if(exception == null){
//调用我们发送的消息的回调函数
//大家还记不记得我们在发送数据的时候
//还不是绑定了一个回调函数。
//这儿说的调用的回调函数
//就是我们开发,生产者代码的时候,我们用户传进去的那个
//回调函数。
thunk.callback.onCompletion(metadata, null);//带过去的就是没有异常
//也就是说我们生产者那儿的代码,捕获异常的时候就是发现没有异常。
} else {
//如果有异常就会把异常传给回调函数。
//由我们用户自己去捕获这个异常。
//然后对这个异常进行处理
//大家根据自己公司的业务规则进行处理就可以了。
//如果走这个分支的话,我们的用户的代码是可以捕获到timeoutexception
//这个异常,如果用户捕获到了,做对应的处理就可以了。
thunk.callback.onCompletion(null, exception);
(24) 发送者 源码 消息发送完了以后内存如何处理 26} 0:6
接上
——》Sender.completeBatch()
▼
batch.done(baseOffset, timestamp, exception);
//看起来这个代码就是要回收资源的。
this.accumulator.deallocate(batch);
——》RecordAccumulator.deallocate()
▼
//从某个数据结构里面移除 已经成功处理的批次
incomplete.remove(batch);
//释放内存,回收到内存池里面
free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
——》BufferPool.deallocate()
▼
//如果你还回来的内存的大小 就等于一个批次的大小,
//我们的参数设置的内存是16K,你计算出来一个批次的大小也是16,申请的内存也是16k
//16K 32K
if (size == this.poolableSize && size == buffer.capacity()) {
//内存里面的东西清空
buffer.clear();
//把内存放入到内存池
this.free.add(buffer);
} else {
//但是如果 我们释放的内存的大小
//不是一个批次的大小,那就把归为可用内存
//等着垃圾回收即可
this.availableMemory += size;
}
(25) 发送者 源码 消息有异常是如何处理的 27} 0:6 ?
接上
——》Sender.completeBatch()
▼ 1、如果异常可以重新发
//重新把发送失败等着批次 加入到队列里面。
this.accumulator.reenqueue(batch, now);
——》RecordAccumulator.reenqueue()
synchronized (deque) {
//重新放入到队列里面
//放入到队头
deque.addFirst(batch);
}
▼2、如果异常不可以重新发
exception = error.exception();
batch.done(baseOffset, timestamp, exception);
——》RecordBatch.done()
//如果走这个分支的话,我们的用户的代码是可以捕获到
//这个异常,如果用户捕获到了,做对应的处理就可以了。
thunk.callback.onCompletion(null, exception);
(26) 发送者 源码 如何处理超时的批次 28} 0:12
回到
——》Sender.run()
/* * 步骤六:
* 对超时的批次是如何处理的?
*/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(
——》RecordAccumulator.abortExpiredBatches()
//迭代的看每个分区里面的每个批次
Iterator<RecordBatch> batchIterator = dq.iterator();
//判断一下是否超时
if (batch.maybeExpire(requestTimeout,, retryBackoffMs, now, this.lingerMs, isFull))
——》RecordBatch.maybeExpire()
/**
* requestTimeoutMs:代表的是请求发送的超时的时间。默认值是30.
* now:当前时间
* lastAppendTime:批次的创建的时间(上一次重试的时间)
* now - this.lastAppendTime 大于30秒,说明批次超时了 还没发送出去。
*/
//调用done方法
//方法里面传过去了一个TimeoutException的异常。(超时了)
//TODO 处理超时的批次
this.done(-1L, Record.NO_TIMESTAMP,
(27) 发送者 源码 如何处理长时间没有接受到响应的消息 29} 0:10
回到
—— ★》 NetworkClient.poll ()
//步骤三:处理响应,响应里面就会有我们需要的元数据。
//TODO 处理长时间没有接受到响应
handleTimedOutRequests(responses, updatedNow);
▼
——》InFlightRequests.getNodesWithTimedOutRequests()
//是否超时,如果有主机超时了
if (timeSinceSend > requestTimeout)
//把超时的主机的信息加入到nodeIds里面
nodeIds.add(nodeId);
▲
for (String nodeId : nodeIds) {
// close connection to the node
//关闭请求超时的主机的连接
this.selector.close(nodeId);
processDisconnection(responses, nodeId, now);
//对这些请求进行处理
//大家会看到一个比较有意思的事
//自己封装了一个响应。这个响应里面没有服务端响应消息(服务端没给响应)
//失去连接的状态表标识为true
responses.add(new ClientResponse(request, now, true, null));
—— 》 NetworkClient.processDisconnection()
//修改连接状态
connectionStates.disconnected(nodeId, now);
—— 》 ClusterConnectionStates.disconnected()
(28) 发送者 源码 生产者源码精华总结 30} 0:10
【截图】