另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
最近有些同学在学习Kafka时,问到Kafka的日志压缩(Log Compaction)问题,对于Kafka的日志压缩有些疑惑,今天笔者就为大家来剖析一下Kafka的日志压缩的相关内容。
/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } doClean(cleanable, deleteHorizonMs) } private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = { info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log val stats = new CleanerStats() // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) val transactionMetadata = new CleanedTransactionMetadata val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset) for (group <- groupedSegments) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long): Unit def get(key: ByteBuffer): Long def updateLatestOffset(offset: Long): Unit def clear(): Unit def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long }
这基本就是一个普通的MuTable Map,在Kafka代码中,它的实现只有一个叫做SkimpyOffsetMap
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long): Unit = { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1 }
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }
Kafka Eagle源代码地址:https://github.com/smartloli/kafka-eagle
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。