【Kafka 源码解读】之 【代码没报错但是消息却发送失败!】
不送花的程序猿 人气:7聊聊最近,2020年,在2019年的年尾时,大家可谓对这年充满新希望,特别是有20200202这一天。可是澳洲长达几个月的大火,新型冠状病毒nCoV的发现,科比的去世等等事情,让大家感到相当的无奈,生命是如此的脆弱,明天又是如此的未知。但是人应当活在当下,勇敢的面对疫情,和大家和政府一起打赢这场没硝烟的战争!
作为程序员,我必定不能停止工作,不能停止学习,现在在家办公,完全配合现在提倡的隔离战术,对自己负责,对社会负责。下面我会和大家分享一篇我之前写的笔记,和大家一起讨论关于 Kafka 的一个问题:为什么 Kafka 发送消息失败?
一、问题:消息发送失败
虽然在使用资源后关闭资源是非常正常的操作,但是确实我们也是经常会缺少调用 close()
关闭资源的的代码,特别是在自己写 demo 的时候。而且,以前写关于文件 IO 的例子时,不写 close()
方法确实也不会报错或者出现问题,但是那为啥到了 Kafka 这里,不写就会出现问题呢?
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
//不写导致消息发送失败
//producer.close();
二、猜测
1.首先我们看一下close()方法。
注释:此方法会一直阻塞直到之前所有的发送请求都完成。
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
2.然后再到KafkaProducer中的注释
生产者由一个缓冲区空间池组成,其中保存尚未传输到服务器的记录,以及一个后台I/O线程,该线程负责将这些记录转换为请求并将它们传输到集群。使用后不关闭生产商将泄露这些资源。
/*
* <p>
* The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
* to the cluster. Failure to close the producer after use will leak these resources.
* <p>
*/
3.猜测总结
到这里我们可以稍微总结一下,KafkaProduce r发送消息并不是立刻往 Kafka 中发送,而是先存在一个缓冲区里,然后有一条后台线程去不断地读取消息,然后再往Kafka中发送。我们也可以总结一下,之所以不写 close() 方法,我们的 main() 方法中,发送完 main() 方法就执行完了,而此时消息可能只是刚到缓冲区中,还没被后台线程去读取然后发送。
三、验证
下面我们要阅读 KafkaProducer 的源码,来验证上面的总结。
1.KafkaProducer创建
最要关注的是:创建了存放消息的队列,并且创建了一条后台线程,主要是从队列中获取消息,往 kafka 中发送。
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient) {
try {
// ..... 省略掉很多其他代码,主要是关于Producer的配置,例如clientId、序列化和拦截器等等。
// 这里就是存放消息的队列。
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
// .... 继续省略无关配置
// 这个Sender实现了Runnable,是一条后台线程,处理向Kafka集群发送生产请求的后台线程
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动线程
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
2.KafkaProducer发送消息
发送消息并不是直接就往 kafka 发送,而是存放到我们上面提及到的队列 accumulator。
/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* 异步发送消息记录到指定主题
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/**
* Implementation of asynchronously send a record to a topic.
* 实现异步发送消息到对应的主题
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// ... 省略了一些代码,主要是关于序列化、分区、事务、CallBack等等的配置
// 下面是往创建KafkaProducer时创建的accumulator里添加消息记录。
// RecordAccumulator,private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;是存放消息的变量。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// 下面是异常处理
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
}
// ....省略一堆异常处理
}
3.Sender发送消息
接下来我们得看一下后台线程 Sender 是怎么从队列 accumulator 里面获取消息记录,然后发往 Kafka 的。
/**
* The main run loop for the sender thread
* 循环执行
*/
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// 我们已停止接受请求(调用了close()方法),但accumulator中可能仍有请求或等待确认,那么就wait直到这些请求完成
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 当等待时长用完,要强制关闭时,我们要让所有未完成的批处理失败
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
// 关闭客户端
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
4.sendProducerData方法
发送数据前的准备
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 获取准备发送数据的分区列表
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 处理没有可用的leader副本的问题,强制更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
// 移除还没准备好发送的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// 创建请求
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// ..... 省略其他处理
// 真正发送请求的方法
sendProduceRequests(batches, now);
return pollTimeout;
}
5.sendProduceRequest方法
最后是使用 NetworkClient 发送数据到 Kafka 的。
/**
* Create a produce request from the given record batches
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 最后是利用NetworkClient发送的。
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
四、最后:
如果不写 Producer.close()
,确实可能会导致消息的发送失败,而注释中也提醒了我们一定要 close
掉生产者,避免资源泄漏。而这其中最主要的原因是 KafkaProducer
实现异步发送的逻辑。它是先将消息存放到RecordAccumulator
队列中,然后让 KafkaThread
线程后台不断地从 RecordAccumulator
中读取已准备好发送的消息,最后发送到 Kafka
中。而我们的代码中,如果不写 Producer.close()
,就不会进行超时 wait
,而当 main()
方法执行完后,KafkaThread
线程还没来得及从 RecordAccumulator
队列中获取消息也跟着被销毁了,所以导致消息最后还是没发送成功。
加载全部内容