RocketMQ异步刷盘
周杰伦本人 人气:0异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE
,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
RocketMQ
默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService
会每间隔500毫秒尝试去刷盘。
class FlushRealTimeService extends FlushCommitLogService { private long lastFlushTimestamp = 0; private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); //每次Flush间隔500毫秒 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //每次Flush最少4页内存数据(16KB) int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); //距离上次刷盘时间阈值为10秒 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return FlushRealTimeService.class.getSimpleName(); } private void printFlushProgress() { // CommitLog.log.info("how much disk fall behind memory, " // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); } @Override public long getJointime() { return 1000 * 60 * 5; } }
- 判断是否超过10秒没刷盘了,如果超过强制刷盘
- 等待Flush间隔500ms
- 通过
MappedFile
刷盘 - 设置
StoreCheckpoint
刷盘时间点 - 超过500ms的刷盘记录日志
- Broker正常停止前,把内存page中的数据刷盘
开启缓冲池:
class CommitRealTimeService extends FlushCommitLogService { private long lastCommitTimestamp = 0; @Override public String getServiceName() { return CommitRealTimeService.class.getSimpleName(); } @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //每次提交间隔200毫秒 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); //每次提交最少4页内存数据(16KB) int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); //距离上次提交时间阈值为200毫秒 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
RocketMQ
申请一块和CommitLog
文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService
来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。
- 判断是否超过200毫秒没提交,需要强制提交
- 提交到
MappedFile
,此时还未刷盘 - 然后唤醒刷盘线程
- 在Broker正常停止前,提交内存page中的数据
加载全部内容