城市直播房产教育博客汽车
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

阿里二面RockeMQ是如何通过mmap大幅提升单机吞吐量的

11月23日 皇极城投稿
  今天抽空给大家整理了一份关于RocketMQ的高性能知识点文章总结。希望能对各位读者有所帮助。
  关于RockeMQ的基本介绍
  简介
  RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
  发展背景2011年:业界出现了现在被很多大数据领域所推崇的Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ1。0版本,主要是用于解决顺序消息和海量堆积的问题。2012年:阿里巴巴开源其自研的第三代分布式消息中间件RocketMQ。2016年11月:阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。2017年2月20日:RocketMQ正式发布4。0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。2022年:RocketMQ正式发布5。0版本,这也是目前最新版本。
  RocketMQ的各个特点单机吞吐量:10ws。可用性:支持双主双从的分布式架构,具备高可用特性。支持使用topic,tag,SQL来对消息进行筛选。底层结构通过多队列来承载消息存储等特性。可靠的FIFO和严格有序的消息队列中间件。支持多种消息传递协议,例如grpc,Mqtt,Jms。。。。源码实现:Java语言。
  PageCache和Mmap
  RocketMQ这款中间件具有着单机10w的吞吐量,其底层原因,实际上得从操作系统原理开始和大家讲起。
  顺序写入
  磁盘的写步骤通常是:CPU发送一个写信号给到磁盘磁头,接着磁头需要进行寻道操作,找到对应的磁道后,定位对应的位置进行数据写入。所以如果数据是随机写入的话,磁头就需要频繁地切换盘道进行数据的写入,整体耗时会有所提升。
  顺序写,其实是一种非常常见的提升IO写性能的方式,利用连续的写入地址,从而减少磁头的切换次数,提升性能。
  PageCache
  为了提升对文件的读写效率,Linux内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(称为页缓存)与文件中的数据块进行绑定。
  例如下边这张图,当我们发起一次系统调用的write方法,想要将用户态中的数据写入磁盘的时候,其实是需要发生以下操作的:
  首先,将用户地址空间的数据通过CPU拷贝,放入到内核空间中,并且写入一个PageCache里面,然后通过DMA去将PageCache的数据写入到磁盘。
  这里面由于有CPU拷贝这样的重操作,所以想要提升吞吐量,必须解决这个问题。而RocketMQ的创作团队,则是通过mmap技术来解决了它。
  什么是mmap
  mmap系统调用,让用户地址空间,跟文件做映射(实际是指向不存在的物理内存)。将内核态的一段空间地址映射到了用户态中,这样数据只需要写入到用户态的这段虚拟地址中,接着内核空间的DMA会将这段数据写入到磁盘中。这样之后,整体的写入流程就如下图所示:
  使用mmap技术之后,可以减少一次的CPU拷贝次数,提升性能。
  这里补充一些说明,其实DMA本质上是一块访问独立的芯片。由于每次访问磁盘进行IO操作都会导致CPU大量的空闲时间,而DMA则是用于提升IO操作效率的一个角色,主要用于IO的数据传输,降低CPU等待时间。
  Java里面如何使用mmap技术
  在Java语言中,其实很早就有提供mmap方面的api了,下边是一段简单的mmap使用案例。publicclassMmapUtils{publicFilecommitLogFpublicMappedByteBuffermappedByteBpublicintmappedSize0;publicintwritePos0;publicMmapUtils(StringcommitLogPath,intmappedSize){this。commitLogFilenewFile(commitLogPath);if(!commitLogFile。exists()){try{commitLogFile。createNewFile();}catch(IOExceptione){e。printStackTrace();}}this。mappedSizemappedStry{mappedByteBuffernewRandomAccessFile(commitLogFile,rw)。getChannel()。map(FileChannel。MapMode。READWRITE,0,mappedSize);}catch(IOExceptione){e。printStackTrace();}}往磁盘写数据paramcontentreturnpublicintwriteFile(Stringcontent){mappedByteBuffer。put(content。getBytes());强制刷盘mappedByteBuffer。force();writePosmappedByteBuffer。position();return1;}从磁盘中读取数据paramlenreturnpublicbyte〔〕readContent(intlen){mappedByteBuffer。position(0);byte〔〕destnewbyte〔len〕;intj0;for(inti0;imappedSi){bytebmappedByteBuffer。get();if(b!0){dest〔j〕b;}}}}
  mmap存在的缺陷
  其实使用mmap技术还是存在一些缺陷的。
  导致缺页中断问题
  我们知道,在操作系统的中,数据通常都是被放在磁盘中的,只有在需要计算的时候,才会将数据加载到内存中,而每次加载的单位都是以页作为基础,假设我们需要访问一块存在于磁盘,但是没有被加载到内存中的数据,这种情况,我们称之为软性的缺页中断。如果数据是存在于内存,但是该页的地址没有被注册到MMU中,我们则称之为硬性的缺页中断。
  总之不管是软性还是硬性的中断,都需要重新建立一次数据的内存映射,比较消耗性能。
  mmap对于文件的大小有一定要求
  使用mmap技术之后,我们不可以使用2gb大小以上的文件去做映射,同时文件的长度也不建议做变长,最好是固定的大小。最后对于小文件而言,使用mmap可能性能还不如直接的原始IO操作。
  RocketMQ对mmap的优化
  好了,现在我们了解了mmap存在的缺陷之后,来看看RocketMQ是如何解决这些不足点的。
  预映射
  在RocketMQ的源代码里,可以看到这样一份代码,它的名字叫做:org。apache。rocketmq。store。AllocateMappedFileService。这个类负责在rocketmq启动时,预先分配mmap的文件映射。
  文件预热
  调用mmap进行内存映射后,OS只是建立虚拟内存地址至物理地址的映射表,没有实际加载任何文件至内存。依靠一次缺页加载4K,1G的commitLog需要发生256次缺页中断。而在RocketMQ的源代码中,进行了madvise系统调用,其目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存,从而实现预热。
  内存锁定
  将进程使用的部分或者全部的地址空间锁定在物理内存中,并会先写入一些随机值到mmap映射出的内存空间里,防止其被交换到swap空间。基于mlock系统调用。
  堆外缓存
  到这里,我们了解了RocketMQ内部通过使用PageCache去让我们的数据写入如写入内存般轻松,但是这在极端情况下,可能会有出现频繁出现缺页中断的情况,以及PageCache阻塞,这种情况下,Broker节点会返回一个SYSTEMBUSY的信号给到客户端,不过这类场景在实际生产中很少会出现。为了避免这种情况,RocketMQ底层增加了一套堆外缓存来优化这类场景。当PageCache写入阻塞的时候,可以选择写入到堆外缓存中,之后再从堆外缓存(DirectByteBuffer)写入到PageCache。
  RocketMQ中的mmap预分配实现分析
  下边让我们来看看RocketMQ底层是如何进行mmap的文件映射预分配的。在RocketMQ的源代码中,当服务启动之后,AllocateMappedFileService这个线程类便会开始执行。
  线程启动之后,便会执行mmapOperation方法,这个方法的大致步骤如下:从优先级队列中获取AllocateRequest创建MappedFile根据配置是否预热MappedFile(填充0字节),将MappedFile放入到AllocateRequest如果出现IOException将AllocateRequest重新放入优先级队列调用AllocateRequest的CountDownLatchcountDown方法通知putRequestAndReturnMappedFile线程
  整体执行目的总结起来就是:初始化预热mappedFile。
  关于mmapOperation的源代码和其含义,我列了一份出来给各位读者查看:Onlyinterruptedbytheexternalthread,willreturnfalseprivatebooleanmmapOperation(){booleanisSAllocateRtry{从优先级队列里获取AllocateRequestreqthis。requestQueue。take();从Map里获取AllocateRequestAllocateRequestexpectedRequestthis。requestTable。get(req。getFilePath());if(nullexpectedRequest){log。warn(thismmaprequestexpired,maybecausetimeoutreq。getFilePath()req。getFileSize());}putRequestAndReturnMappedFile里map与优先级队列并不是强一致,是最终一致的if(expectedRequest!req){log。warn(neverexpectedhere,maybecausetimeoutreq。getFilePath()req。getFileSize(),req:req,expectedRequest:expectedRequest);}if(req。getMappedFile()null){longbeginTimeSystem。currentTimeMillis();MappedFilemappedF堆外内存if(messageStore。getMessageStoreConfig()。isTransientStorePoolEnable()){try{mappedFileServiceLoader。load(MappedFile。class)。iterator()。next();mappedFile。init(req。getFilePath(),req。getFileSize(),messageStore。getTransientStorePool());}catch(RuntimeExceptione){log。warn(Usedefaultimplementation。);mappedFilenewMappedFile(req。getFilePath(),req。getFileSize(),messageStore。getTransientStorePool());}}else{mappedFilenewMappedFile(req。getFilePath(),req。getFileSize());}longeclipseTimeUtilAll。computeEclipseTimeMilliseconds(beginTime);创建MappedFile花费大于10ms打印ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志aif(eclipseTime10){intqueueSizethis。requestQueue。size();log。warn(createmappedFilespenttime(ms)eclipseTimequeuesizequeueSizereq。getFilePath()req。getFileSize());}prewritemappedFile默认warmMapedFileEnablefalse,即默认不预热if(mappedFile。getFileSize()this。messageStore。getMessageStoreConfig()。getMapedFileSizeCommitLog()this。messageStore。getMessageStoreConfig()。isWarmMapedFileEnable()){文件预热mappedFile。warmMappedFile(this。messageStore。getMessageStoreConfig()。getFlushDiskType(),this。messageStore。getMessageStoreConfig()。getFlushLeastPagesWhenWarmMapedFile());}req。setMappedFile(mappedFile);this。hasEisS}}catch(InterruptedExceptione){log。warn(this。getServiceName()interrupted,possiblybyshutdown。);this。hasE}catch(IOExceptione){log。warn(this。getServiceName()servicehasexception。,e);this。hasEif(null!req){重新插入请求到队列requestQueue。offer(req);try{Thread。sleep(1);}catch(InterruptedExceptionignored){}}}finally{AllocateRequest计数器减一,表示MappedFile已经创建完成if(req!nullisSuccess)req。getCountDownLatch()。countDown();}}
  在这段代码里头,我们可以看到有个文件预热的方法,叫做:warmMappedFile,该方法内部最后发出了一次系统调用,mlock方法。publicvoidwarmMappedFile(FlushDiskTypetype,intpages){longbeginTimeSystem。currentTimeMillis();ByteBufferbyteBufferthis。mappedByteBuffer。slice();intflush0;longtimeSystem。currentTimeMillis();for(inti0,j0;ithis。fileSiMappedFile。OSPAGESIZE,j){byteBuffer。put(i,(byte)0);forceflushwhenflushdisktypeissyncif(typeFlushDiskType。SYNCFLUSH){if((iOSPAGESIZE)(flushOSPAGESIZE)pages){mappedByteBuffer。force();}}preventgcif(j10000){log。info(j{},costTime{},j,System。currentTimeMillis()time);timeSystem。currentTimeMillis();try{Thread。sleep(0);}catch(InterruptedExceptione){log。error(Interrupted,e);}}}forceflushwhenprepareloadfinishedif(typeFlushDiskType。SYNCFLUSH){log。info(mappedfilewarmupdone,forcetodisk,mappedFile{},costTime{},this。getFileName(),System。currentTimeMillis()beginTime);mappedByteBuffer。force();}log。info(mappedfilewarmupdone。mappedFile{},costTime{},this。getFileName(),System。currentTimeMillis()beginTime);这里是一个系统调用this。mlock();}
  这段代码的末尾处调用的mlock函数,它的内部是利用了Java的JNI机制,去进行系统调用。
  JNA的本质就是将大多数native的方法封装到jar包中的动态库中,并且提供了一系列的机制来自动加载这个动态库。例如下边这个例子就是基于JNI去调用clib中的print方法:publicclassJNAUsage{publicinterfaceCLibraryextendsLibrary{CLibraryINSTANCE(CLibrary)Native。load((Platform。isWindows()?msvcrt:c),CLibrary。class);voidprintf(Stringformat,Object。。。args);}publicstaticvoidmain(String〔〕args){CLibrary。INSTANCE。printf(Hello,World);for(inti0;iargs。i){CLibrary。INSTANCE。printf(Argumentd:s,i,args〔i〕);}}}
  这个例子中,我们想要加载系统的clib,从而使用clib中的printf方法。
  具体做法就是创建一个CLibraryinterface,这个interface继承自Library,然后使用Native。load方法来加载clib,最后在这个interface中定义要使用的lib中的方法即可。
  那么在RocketMQ中,底层又是如何通过JNI来实现mlock的调用呢,来看源代码:publicvoidmlock(){finallongbeginTimeSystem。currentTimeMillis();finallongaddress((DirectBuffer)(this。mappedByteBuffer))。address();PointerpointernewPointer(address);{intretLibC。INSTANCE。mlock(pointer,newNativeLong(this。fileSize));log。info(mlock{}{}{}ret{}timeconsuming{},address,this。fileName,this。fileSize,ret,System。currentTimeMillis()beginTime);}{intretLibC。INSTANCE。madvise(pointer,newNativeLong(this。fileSize),LibC。MADVWILLNEED);log。info(madvise{}{}{}ret{}timeconsuming{},address,this。fileName,this。fileSize,ret,System。currentTimeMillis()beginTime);}}
  上边代码中的LIbC是一个作者封装的系统调用接口文件,里面正好是映射了操作系统的mlock方法。packageorg。apache。rocketmq。store。importcom。sun。jna。Limportcom。sun。jna。Nimportcom。sun。jna。NativeLimportcom。sun。jna。Pimportcom。sun。jna。PpublicinterfaceLibCextendsLibrary{LibCINSTANCE(LibC)Native。loadLibrary(Platform。isWindows()?msvcrt:c,LibC。class);intMADVWILLNEED3;intMADVDONTNEED4;intMCLCURRENT1;intMCLFUTURE2;intMCLONFAULT4;syncmemoryasynchronouslyintMSASYNC0x0001;invalidatemappingscachesintMSINVALIDATE0x0002;synchronousmemorysyncintMSSYNC0x0004;intmlock(Pointervar1,NativeLongvar2);intmunlock(Pointervar1,NativeLongvar2);intmadvise(Pointervar1,NativeLongvar2,intvar3);Pointermemset(Pointerp,intv,longlen);intmlockall(intflags);intmsync(Pointerp,NativeLonglength,intflags);}
  通过调用mlock可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到swap空间。对时间敏感的应用会希望全部使用物理内存,提高数据访问和操作的效率。例如,memcached就提供锁定内存的选项,保证memcached使用内存全部在物理内存中。
  通过mlock锁定对应的地址空间,防止被swap出去,这样的效果,也正是RocketMQ所需要的。
  来源:https:mp。weixin。qq。comsQrf8fCvBbNGEbqsLf3lRvA
搜索 投诉 评论 转载

当爱已成过往,又何必苦苦追寻?当爱已成过往,何必苦苦寻找?强扭的瓜不甜,强求只能加深你我的裂痕,只能让你我的痛楚更深。如果你真的要走,那我又怎么挽留。学会大度,让生活简单;学着忙碌,让生活得到充实;其……重庆开州区2022年第1批重点建设项目03前期准备(135项注:标的项目为年度投资亿元以上重点项目;单位:万元。序号,项目名称,项目业主,建设性质,投资类别,建设年限,主要建设内容及规模,计划总投资,2022年计划投资,2023年……封神的遗憾文案(2)1。花都只能开一季,我怎么不能只爱你一个?2。我们总在一边学习相爱,一边预习离别!3。对我冷冰冰的时候,你又在捂热谁?4。念念不忘必有回响,是电影的桥段!5。时间和我都在往前走……阅读的力量聆听绘本故事,守护心暖花开,让我们一起来聆听中国福利会托儿所陶紫妍老师给大家带来的绘本故事《爸爸的手提箱》。点击文末阅读原文播放音频《爸爸的手提箱》内容简介:……春季艾草正当时,坚持用它泡脚,可能收获3个好处都说:三月艾,四月蒿,七月八月当柴烧。清明前后,正好是艾草生长最旺盛的阶段,除了用它做青团、做野菜之外,还可以用来泡脚,激发它的养生功效。这是大自然的馈赠,千万不要浪费哦。……世界杯梅西的几个争议行为,恐要遭到FIFA重罚2022世界杯在世界杯上,球星的个人发挥,是比赛的核心看点之一。而本届世界杯,万众期待的梅西和C罗的对决,却因葡萄牙在摩洛哥身上翻了船再也无法上演了。梅西、C……阿里二面RockeMQ是如何通过mmap大幅提升单机吞吐量的今天抽空给大家整理了一份关于RocketMQ的高性能知识点文章总结。希望能对各位读者有所帮助。关于RockeMQ的基本介绍简介RocketMQ是一个纯Java……12月最佳旅游地推荐在寒冷的冬季寻找一份宁静浪漫的风光当第一片雪花飘下来时,我就知道,冬天是真的来了。作为一个南方人,最期待的便是,睁眼就看到千树万树梨花开的美景。回首这一年,我们都生活在紧迫与担忧之中,……国产替代的王者题材,5G滤波器近期,有一家公司在短短半个月内股价就完成了翻倍,原因是做出了一个匹配华为的手机壳,通过eSIM卡服务和TypeC接口连接华为手机后,即可解决华为手机因5G芯片缺失而无法使用5G……悲催了战友掉线四打五怎么玩出了泉水,大家都是一级,买了把小野刀后,点选学习二技能的猴子习惯性奔向上路野区蓝buff。中路安琪拉还没跑到一塔边,猴子早就猫在草丛里等待蓝爸爸的出生了。下路后羿也是正准……生活的意义在哪里?为什么会觉得生活没有意义,这一切的一切来源于无聊的循环。每日做着重复没有尽头的工作,就像禁锢在一个古老的小镇上。所有人都习惯了这种生活,它成了人们前进的阻力,但又是人们无法离开……安徽黄山市发改委最新审批通过的建设工程项目清单来了1、徽州区丰乐鞋业150kW分布式光伏发电项目,时间:202210282、谭家桥镇乡村振兴及旅游基础设施提升项目,时间:202210263、黄山太平湖流域水生态保护……
新闻播报(September14)草帽姐,你要坚持下去杜兰特强于单打,库里自带体系!事实胜于狡辩!杜兰特是大将中国哪里的啤酒最好喝?这5个地方很出名,看看有你家乡的吗?那些现实又温柔的句子35岁还这般精力旺盛,赵奕欢才是真正的时间管理大师二十大10月16日召开,广电5G192将正式商用,看看有啥新荣耀出于华为,能否胜于华为奥陌陌的加速离开,越来越引发科学家怀疑,追赶奥陌陌被提上日程第46周新能源与智能化板块两部委发威锂业公司地震拉里。惠尔斯没有魔山和埃迪,我将成为世界最强壮的男人央行再次降息!央行已经尽力了!经济该怎么办?
安全在我心周海媚现身秀场,发型简单穿着朴素,钟丽缇走出自己的风格如何用折纸折有爱心的小船?要写5000字日记的一天抱怨没有用一切靠自己作文羽绒服出绒怎么补救拒绝冬日尴尬这真的是林心如吗?妆容属实不敢恭维外出旅游用什么样望远镜好宝妈洗澡时身上这两个部位再难受也别碰。别因无知害了宝宝怎样处理个人债务动力煤期货价格回落超700元吨机构资金大撤退,贸易商囤货待涨敷完面膜用洗脸吗怎么正确清洗脸部

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找江西南阳嘉兴昆明铜陵滨州广东西昌常德梅州兰州阳江运城金华广西萍乡大理重庆诸暨泉州安庆南充武汉辽宁