这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
最近有些同学在学习Kafka时,问到Kafka的日志压缩(Log Compaction)问题,对于Kafka的日志压缩有些疑惑,今天笔者就为大家来剖析一下Kafka的日志压缩的相关内容。
Kafka是一个基于Log的流处理系统,一个Topic可以有若干个Partition,Partition是复制的基本单元,在一个Broker节点上,一个Partition的数据文件可以存储在若干个独立磁盘目录中,每个Partition的日志文件存储的时候又会被分成一个个的Segment,默认的Segment的大小是1GB,有属性offsets.topic.segment.bytes来控制。Segment是日志清理的基本单元,当前正在使用的Segment是不会被清理的,对于每一个Partition的日志,以Segment为单位,都会被分为两部分,已清理和未清理的部分。同时,未清理的那部分又分为可以清理和不可清理。日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的保留。
Kafka中的每一条数据都包含Key和Value,数据存储在磁盘上,一般不会永久保留,而是在数据达到一定的量或者时间后,对最早写入的数据进行删除。日志压缩在默认的删除规则之外提供了另一种删除过时数据(或者说是保留有价值的数据)的方式,就是对于具有相同的Key,而数据不同,值保留最后一条数据,前面的数据在合适的情况下删除。
日志压缩特性,就实时计算来说,可以在异常容灾方面有很好的应用途径。比如,我们在Spark、Flink中做实时计算时,需要长期在内存里面维护一些数据,这些数据可能是通过聚合了一天或者一周的日志得到的,这些数据一旦由于异常因素(内存、网络、磁盘等)崩溃了,从头开始计算需要很长的时间。一个比较有效可行的方式就是定时将内存里的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。
使用日志压缩来替代这些外部存储有哪些优势及好处呢?这里为大家列举并总结了几点:
当Topic中的cleanup.policy(默认为delete)设置为compact时,Kafka的后台线程会定时将Topic遍历两次,第一次将每个Key的哈希值最后一次出现的offset记录下来,第二次检查每个offset对应的Key是否在较为后面的日志中出现过,如果出现了就删除对应的日志。
日志压缩是允许删除的,这个删除标记将导致删除任何先前带有该Key的消息,但是删除标记的特殊之处在于,它们将在一段时间后从日志中清理,以释放空间。这些需要注意的是,日志压缩是针对Key的,所以在使用时应注意每个消息的Key值不为NULL。
压缩是在Kafka后台通过定时的重新打开Segment来完成的,Segment的压缩细节如下图所示:
日志压缩可以确保的内容,这里笔者总结了以下几点:
日志压缩的核心实现代码大部分的功能在CleanerThread中,核心实现逻辑在Cleaner中的clean方法中,实现细节如下:
/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } doClean(cleanable, deleteHorizonMs) } private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = { info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log val stats = new CleanerStats() // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) val transactionMetadata = new CleanedTransactionMetadata val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset) for (group <- groupedSegments) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
日志压缩通过两次遍历所有的数据来实现,两次遍历之间交流的通道就是一个OffsetMap,下面是OffsetMap的内容:
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long): Unit def get(key: ByteBuffer): Long def updateLatestOffset(offset: Long): Unit def clear(): Unit def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long }
这基本就是一个普通的MuTable Map,在Kafka代码中,它的实现只有一个叫做SkimpyOffsetMap
PUT方法会为每个Key生成一份信息,默认使用MD5方法生成一个Byte,根据这个信息在Byte中哈希的到一个下标,如果这个下标已经被别的占用,则线性查找到下个空余的下标为止,然后对应位置插入该Key的Offset,实现代码如下:
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long): Unit = { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1 }
GET方法使用和PUT同样的算法获取Key的信息,通过信息获得Offset的存储位置,实现代码如下:
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }
默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志清理器,这里为大家总结了以下几点:
Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的。
另外,笔者开源的一款Kafka监控关系系统Kafka-Eagle,喜欢的同学可以Star一下,进行关注。
Kafka Eagle源代码地址:https://github.com/smartloli/kafka-eagle
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
加载全部内容