城市直播房产教育博客汽车
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

JDK9响应式流使用详解

9月9日 栀璃鸢投稿
  上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVAAPI,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理流程,然后再以几个调用者的例子来帮助大家理解。JDK9中的实现
  再放上一下上文中的响应式流的交互流程:订阅者向发布者发送订阅请求。发布者根据订阅请求生成令牌发送给订阅者。订阅者根据令牌向发布者发送请求N个数据。发送者根据订阅者的请求数量返回M(MN)个数据重复3,4数据发送完毕后由发布者发送给订阅者结束信号
  该流程的角度是以接口调用的交互来说的,而考虑实际的coding工作中,我们的调用流程其实为:创建发布者创建订阅者订阅令牌交互发送信息
  接下来我们按照这个流程来梳理一下代码细节。创建发布者
  对于实现响应流的最开始的步骤,便是创建一个发布者。之前提到在JDK9中提供了一个发布者的简单实现SubmissionPublisher。SubmissionPublisher继承自Flow。Publisher,他有三种构造函数:publicSubmissionPublisher(){this(ASYNCPOOL,Flow。defaultBufferSize(),null);}publicSubmissionPublisher(Executorexecutor,intmaxBufferCapacity){this(executor,maxBufferCapacity,null);}publicSubmissionPublisher(Executorexecutor,intmaxBufferCapacity,BiC?superS?superT,?superThrowablehandler)
  SubmissionPublisher将使用Executor作为线程池向订阅者发送信息。如果需要需要设置线程池的话可以自己传入,否则的话再无参的构造函数中将默认使用ForkJoinPool类的commonPool()方法获取,即无餐构造方法中的ASYNCPOOL静态变量。
  SubmissionPublisher会为每一个订阅者单独的建立一个缓冲空间,其大小由入参maxBufferCapacity决定。默认情况下直接使用Flow。defaultBufferSize()来设置,默认为256。如果缓冲区满了之后会根据发送信息时候的策略确定是阻塞等待还是抛弃数据。
  SubmissionPublisher会在订阅者发生异常的时候(onNext处理中),会调用最后一个参数handler方法,然后才会取消订阅。默认的时候为null,也就是不会处理异常。
  最简单的创建SubmissionPublisher的方法就是直接使用无参构造方法:SubmissionPublisherIntegerpublishernewSubmissionPublisher();
  上文书说到,因为SubmissionPublisher实现了AutoCloseable接口,所以可以用try来进行资源回收可以省略close()的调用:try(SubmissionPublisherIntegerpublishernewSubmissionPublisher()){}
  但是也可以手动的调用close()方法来显示的关闭发布者,关闭后再发送数据就会抛出异常:if(complete)thrownewIllegalStateException(Closed);创建订阅者
  上文中咱们没有手动创建订阅者,而是直接调用SubmissionPublisher中的consume方法使用其内部的订阅者来消费消息。在本节可以实现接口Flow。Subscriber创建一个SimpleSubscriber类:publicclassSimpleSubscriberimplementsFlow。SubscriberInteger{privateFlow。S订阅者名称privateS定义最大消费数量privatefinallongmaxC计数器publicSimpleSubscriber(Stringname,longmaxCount){this。this。maxCountmaxCount0?1:maxC}OverridepublicvoidonSubscribe(Flow。Subscriptionsubscription){this。System。out。printf(订阅者:s,最大消费数据:d。n,name,maxCount);实际上是等于消费全部数据subscription。request(maxCount);}OverridepublicvoidonNext(Integeritem){System。out。printf(订阅者:s接收到数据:d。n,name,item);if(countermaxCount){System。out。printf(准备取消订阅者:s。已处理数据个数:d。n,name,counter);处理完毕,取消订阅subscription。cancel();}}OverridepublicvoidonError(Throwablet){System。out。printf(订阅者:s,出现异常:s。n,name,t。getMessage());}OverridepublicvoidonComplete(){System。out。printf(订阅者:s处理完成。n,name);}}
  SimpleSubscriber是一个简单订阅者类,其逻辑是根据构造参数可以定义其名称name与最大处理数据值maxCount,最少处理一个数据。
  当发布者进行一个订阅的时候会生成一个令牌Subscription作为参数调用onSubscribe方法。在订阅者需要捕获该令牌作为后续与发布者交互的纽带。一般来说在onSubscribe中至少调用一次request且参数需要0,否则发布者将无法向订阅者发送任何信息,这也是为什么maxCount需要大于0。
  当发布者开始发送数据后,会异步的调用onNext方法并将数据传入。该类中使用了一个计数器对数据数量进行校验,当达到最大值的时候,则会通过令牌(subscription)异步通知发布者订阅结束,然后发送者再异步的调用发订阅者的onComplete方法,以处理完成流程。
  其中的onError和onComplete方法只进行打印,这里就不再说了。
  以上的这个订阅者可以看作是一个push模型的实现,因为当开始订阅时订阅者就约定了需要接受的数量,然后在后续的处理(onNext)中不再请求新数据。
  我们可以用以下的代码创建一个名称为S1,消费2个元素的订阅者:SimpleSubscribersub1newSimpleSubscriber(S1,2);订阅令牌交互
  当我们可以创建了发送者和订阅者之后,我们需要确认一下进行交互的顺序,由于响应流的处理就是对于事件的处理,所以事件的顺序十分重要,具体顺序如下:我们创建一个发布者publisher一个订阅者subscriber订阅者subscriber通过调用发布者的subscribe()方法进行信息订阅。如果订阅成功,则发布者将生成一个令牌(Subscription)并作为入参调用订阅者的订阅事件方法onSubscribe()。如果调用异常则会直接调用订阅者的onError错误处理方法,并抛出IllegalStateException异常然后结束订阅。在onSubscribe()中,订阅者需要通过调用令牌(Subscription)的请求方法request(long)来异步的向发布者请求数据。当发布者有数据可以发布的时候,则会异步的调用订阅者的onNext()方法,直到所有消息的总数已经满足了订阅者调用request的数据请求上限。所以当订阅者请求订阅的消息数为Long。MAXVALUE时,实际上是消费所有数据,即push模式。如果发布者没有数据要发布了,则可以会调用发布者自己的close()方法并异步的调用所有订阅者的onComplete()方法来通知订阅结束。发布者可以随时向发布者请求更多的元素请求(一般在onNext里),而不用等到之前的处理完毕,一般是与之前的数据数量进行累加。放发布者遇到异常的时候会调用订阅者的onError()方法。
  上面的描述中是只使用的一个订阅者来进行描述的,后面的例子中将说明发布者可以拥有多个订阅者(甚至0个订阅者)。发送信息
  当发布者需要推送消息的时候会调用submit方法或者offer方法,上文中我们提到submit实际上是offer的一种简单实现,本节咱们自己比较一下。
  首先他们的方法签名为:intoffer(Titem,longtimeout,TimeUnitunit,BiPredicateFlow。S?superT,?superTonDrop)intoffer(Titem,BiPredicateFlow。S?superT,?superTonDrop)intsubmit(Titem)
  而submit和offer的直接方法为:publicintsubmit(Titem){returndoOffer(item,Long。MAXVALUE,null);}publicintoffer(Titem,BiPredicateS?superT,?superTonDrop){returndoOffer(item,0L,onDrop);
  可以看到他们的底层调用的都是doOffer方法,而doOffer的方法签名为:privateintdoOffer(Titem,longnanos,BiPredicateS?superT,?superTonDrop)
  所以我们可以直接看doOffer()方法。doOffer()方法是可选阻塞时长的,而时长根据入参数nanos来决定。而onDrop()是一个删除判断器,如果调用BiPredicate的test()方法结果为true则会再次重试(根据令牌中的nextRetry属性与发布器中的retryOffer()方法组合判断,但是具体实现还没梳理明白);如果结果为flase则直接删除内容。doOffer()返回的结果为正负两种,正数的结果为发送了数据,但是订阅者还未消费的数据(估计值,因为是异步多线程的);如果为负数,则返回的是重拾次数。
  所以,根据submit()的参数我们可以发现,submit会一直阻塞直到数据可以被消费(因为不会阻塞超时,所以不需要传入onDrop()方法)。而我们可以根据需要配置offer()选择器。如果必须要求数据都要被消费的话,那就可以直接选择submit(),如果要设置重试次数的话就可以选择使用offer()异步调用的例子
  下面看一个具体的程序例子,程序将以3秒为周期进行数据发布:publicclassPeriodicPublisher{publicstaticfinalintWAITTIME2;publicstaticfinalintSLEEPTIME3;publicstaticvoidmain(String〔〕args){SubmissionPublisherIntegerpublishernewSubmissionPublisher();创建4订阅者SimpleSubscribersubscriber1newSimpleSubscriber(S1,2);SimpleSubscribersubscriber2newSimpleSubscriber(S2,4);SimpleSubscribersubscriber3newSimpleSubscriber(S3,6);SimpleSubscribersubscriber4newSimpleSubscriber(S4,10);前三个订阅者直接进行订阅publisher。subscribe(subscriber1);publisher。subscribe(subscriber2);publisher。subscribe(subscriber3);第四个方法延迟订阅delaySubscribeWithWaitTime(publisher,subscriber4);开始发送消息ThreadpubThreadpublish(publisher,5);try{等待处理完成pubThread。join();}catch(InterruptedExceptione){e。printStackTrace();}}publicstaticThreadpublish(SubmissionPublisherIntegerpublisher,intcount){ThreadtnewThread((){IntStream。range(1,count)。forEach(item{publisher。submit(item);sleep(item);});publisher。close();});t。start();}privatestaticvoidsleep(Integeritem){try{System。out。printf(推送数据:d。休眠3秒。n,item);TimeUnit。SECONDS。sleep(SLEEPTIME);}catch(InterruptedExceptione){e。printStackTrace();}}privatestaticvoiddelaySubscribeWithWaitTime(SubmissionPublisherIntegerpublisher,Flow。SubscriberIntegersub){newThread((){try{TimeUnit。SECONDS。sleep(WAITTIME);publisher。subscribe(sub);}catch(InterruptedExceptione){e。printStackTrace();}})。start();}}
  代码后是运行结果如下:订阅者:S1,最大消费数据:2。推送数据:1。休眠3秒。订阅者:S3,最大消费数据:6。订阅者:S2,最大消费数据:4。订阅者:S2接收到数据:1。订阅者:S3接收到数据:1。订阅者:S1接收到数据:1。订阅者:S4,最大消费数据:10。推送数据:2。休眠3秒。订阅者:S2接收到数据:2。订阅者:S3接收到数据:2。订阅者:S1接收到数据:2。订阅者:S4接收到数据:2。准备取消订阅者:S1。已处理数据个数:2。推送数据:3。休眠3秒。订阅者:S4接收到数据:3。订阅者:S2接收到数据:3。订阅者:S3接收到数据:3。推送数据:4。休眠3秒。订阅者:S4接收到数据:4。订阅者:S3接收到数据:4。订阅者:S2接收到数据:4。准备取消订阅者:S2。已处理数据个数:4。推送数据:5。休眠3秒。订阅者:S3接收到数据:5。订阅者:S4接收到数据:5。订阅者:S3处理完成。订阅者:S4处理完成。
  由于是异步执行,所以在接收数据部分的顺序可能不同。
  我们分析一下程序的执行流程。创建一个发布者实例创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10。前三个订阅者立即订阅消息。S4的订阅者单独创建一个线程等待WAITTIME秒(2秒)之后进行数据的订阅。新建一个线程来以SLEEPTIME秒(3秒)为间隔发布5个数据。将publish线程join()住等待流程结束。
  执行的日志满足上述流程而针对一些关键点为:S4在发送者推送数据1的时候还未订阅,所以S4没有接收到数据1。当发送数据2的时候S1已经接收够了预期数据2个,所以取消了订阅。之后只剩下S2、S3、S4。当发送数据4的时候S2已经接收够了预期数据4个,所以取消了订阅。之后只剩下S3、S4。当发送数据5的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成。
  需要注意的是,如果在最后submit完毕之后直接close()然后结束进行的话可能订阅者并不能执行完毕。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以执行完毕的。最后
  本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程。同时本文没有提供Processor的例子,各位也可以自行学习。
  总结一下流程:订阅者向发布者进行订阅,然后发布者向订阅者发送令牌。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息。订阅者可以随时异步追加需要的更多信息。
  JDK9中在Flow接口中实现了JavaAPI的4个接口,并提供了SubmissionPublisher作为Publisher接口的简单实现。
搜索 投诉 评论 转载

有哪些让学生党相见恨晚的app?作为学生党,好用的app我也一直在搜集,接下来就给大家分享一波。中国大学mooc(俗称慕课)包揽了多方面知识,机械制图,计算机,经济学,高数,大物都是重点大学,名师……i9RTX3080!雷蛇灵刃17挑战笔记本性能上限不久前,Razer发布了2021款高性能游戏本《灵刃17》,搭载了Intel11第11代酷睿i911900H移动标准压力处理器和NVIDIAGeForceRTX3080笔记本G……5G时代的到来,短视频时代的开始!你觉得是好事还是坏事?只要是发展就是好事现今社会时代发展得很快,我们迎来了5G时代的到来。短视频时代的开始,开扩了我的眼界和见识。各行各业平凡的人把他们的所见所想发到网上,让大家足不出户,一部……专注音质阻击评价JEETMARS真无线蓝牙耳机前言蓝牙耳机现在是年轻人的标配,手机的更新换代加剧了行业的进化,尤其AirPods的产品线不断的推陈出新,Pro的发布和廉价版的AirPodslite让苹果赚的盆满钵满,……JDK9响应式流使用详解上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVAAPI,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文……寻找致命蛇蝎毒克星生物学家齐普里亚诺阿尔特米拉诺手上这种有毒的穴居蝎,原产于墨西哥南部瓦哈卡州。重力将血浆从血液中分离,血浆是富含抗体的部分。蛇蝎等动物蜇咬产生的毒液,是一种无声的杀……区块链数字资产,未来财富的新起点易观分析:《数字经济全景白皮书》浓缩了易观分析对于数字经济各行业经验和数据的积累,并结合数字时代企业的实际业务和未来面临的挑战,以及数字技术的创新突破等因素,最终从数字经济发展……人工智能预测蛋白质结构来源:人民网人民日报近日,人工智能企业上海天壤智能科技有限公司宣布,其自主研发的深度学习蛋白质折叠预测平台在国际蛋白质结构预测竞赛蛋白质测试集的评估中获得优异成绩,位居全……富士康70的订单都来自苹果,为什么感觉苹果没有无限压价呢?富士康70的订单都来自苹果,为什么感觉苹果没有无限压价呢?富士康加工苹果手机也有成本还要赚钱,如果无限压价难道让富士康倒贴么?不可能。苹果手机贵,并不单只是软硬件贵,而是……争端再起,华为鸿蒙来到新十字路口,接下来市场会改变吗?随着市场逐渐的发展,鸿蒙系统在现在的手机市场上也难以避免的再次跟安卓系统发生了市场摩擦了,在手机市场上进入三月之后,安卓突然给软件开发者公开发了一个警告,那就是所有谷歌上面发布……iPhone13跌至新低价,256GB售价更亲民了,幸福来得关注手机圈的都知道,iPhone13诞生之后,在国内市场拥有极高的热度,哪怕就是现在,也是最畅销的5G手机之一。截止到目前,该机在某东平台的评价量已经达到了200万,销量极其火……36氪首发搭建AI销售辅助系统,循环智能完成3800万美元新近日,企业服务公司循环智能(RecurrentAI)宣布完成由博裕投资和红杉中国联合领投的3800万美元新一轮融资,老股东金沙江创投、靖亚资本、真格基金、万物资本等跟投。新一轮……
智慧城市,让生活更美好国家计算机病毒应急处理中心监测发现15款移动App及1款SD同比缩短超一小时重点地区快递时限五十七小时春节不停歇!2022京东年货节开启坚果零食居贵州特色年货榜首比特币是否能变成一种储备货币?恒大如果真的倒闭了,损失最大的是谁?不吹不黑,了不起的华为只有初中学历他却造出仿真机器人iQOO8Pro一手用机体验分享!真的有那么绝吗?印度官员富士康在印iPhone工厂1月7日前不太可能重开最危险小行星阿波菲斯将在2029年春季接近地球客厅怎能不装电视?三翼鸟智慧客厅大屏大智慧
春日山居即事十首其二宝华韦健第五代飞艇初体验,最具科技感的音响,音质吊打同行产品有哪些全国都欠它一个985211的大学?樱雪热水器怎么使用樱雪热水器使用说明详解图文孙膑翻译(孙膑原文及翻译注释)公司的通知每日省思如何改善嫉妒之心幸福在很近的地方作文踏青结婚2月男方起诉离婚怎么办理?女人私处发黑如何能改善图解如何折普通小船和有蓬小船

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找江西南阳嘉兴昆明铜陵滨州广东西昌常德梅州兰州阳江运城金华广西萍乡大理重庆诸暨泉州安庆南充武汉辽宁