城市直播房产教育博客汽车
投稿投诉
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

使用ApacheFlink对物联网实时数据分析

8月28日 满月族投稿
  1、前言
  最近项目涉及物联网设备实时数据、采集、传输、展示,以及大数据预警报警等功能(thingsBoardkafkaMongoDB)。统计分析模块采用的传统的基于数据库的统计分析,对于海量实时数据统计分析性能和响应速度压力巨大,要实现低延迟的实时计算和秒级多维实时查询有技术挑战。ApacheFlink在数据分析领域中应用广泛,其实时处理能力以及运算速度都能满足大规模数据处理的需求,可以与物联网技术结合,实现对海量传感器数据的实时分析和管理。而且社区活跃技术方面也不断的创新和优化,支持许多流行数据源,如Kafka、HadoopHDFS、ES等,具有丰富的生态系统,Flink社区也提供了丰富的工具和库。这里主要介绍基本概念,和一个实际的示例。
  2、基本概念
  (1)、定义:
  ApacheFlink是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。
  (2)、Flink架构:
  Flink是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。Flink运行时由两种类型的进程组成:一个JobManager和一个或者多个TaskManager。
  对于分布式执行,Flink将算子的subtasks链接成tasks。每个task由一个线程执行。将算子链接成task是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。
  (3)流处理:
  在自然环境中,数据的产生原本就是流式的。无论是来自Web服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕有界流(bounded)或无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。
  批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
  流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。
  在Flink中,应用程序由用户自定义算子转换而来的流式dataflows所组成。这些流式dataflows形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。
  核心的API,从开发步骤的角度来讲,主要分为四大部分Environment、Source、Transform、Sink,flink执行过程(envsourcetransformsink)。
  Environment,FlinkJob在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。
  流式处理环境获取:StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(1000,CheckpointingMode。ATLEASTONCE);
  批处理环境:valenvExecutionEnvironment。getExecutionEnvironment
  Source,Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理,我们将获取数据的来源称之为数据源。数据源类型很多常用的包括集合类型(数据临时存储到内存中,形成特殊的数据结构后),文件中读取数据,Kafka中读取数据,用户可以自定义数据源。
  Transform算子,转换算子,把当前的DataStream转化为另一个DataStream。数据处理的核心,有很多种算子如下图:
  最常用的map、flatMap、Filter、keyBy、aggregation、reduce等算子,这些算子具体作用基本上看英文名字就知道了,使用方式很灵活支持拉姆达表达式。
  KeyBy
  KeyBy算子将DataStream转换成一个KeyedStream。KeyedStream是一种特殊的DataStream,事实上,KeyedStream继承了DataStream,DataStream的各元素随机分布在各TaskSlot(任务槽)中,KeyedStream的各元素按照Key分组,分配到各TaskSlot中。我们需要向keyBy算子传递一个参数,以告知Flink以什么字段作为Key进行分组。stream。keyBy(0)或者stream。keyBy(newKeySelectorString,String(){OverridepublicStringgetKey(Stringx)throwsException{returnx。();}})
  aggregation,常见的聚合操作有sum、max、min等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟keyBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。与批处理不同,这些聚合函数是对流数据进行聚合,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum算子的功能对该字段进行加和,并将结果保存在该字段上。min操作无法确定其他字段的数值。stream。keyBy(0)。sum(1)。print()0字段分组,1字段求和
  reduce在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。
  Sink其实可以表示为将处理完成数据进行存储,或者将处理完的数据发送到指定的存储系统(比如Oracle、Kafka等)(官方提供了一部分的框架的sink,用户可自定义实现自己的sink)。
  (4)、有状态的数据操作(StatefulOperations)
  在流处理中,有些操作仅仅在某一时间针对单一事件(如事件转换map),有些操作需要记住多个事件的信息并进行处理(windowoperators),后者的这些操作称为有状态的操作。有状态的操作一般被维护在内置的keyvalue存储中。这些状态信息会跟数据流一起分区并且分布存储,并且可以通过有状态的数据操作来访问。因此这些keyvalue的状态信息仅在带key的数据流(通过keyBy()函数处理过)中才能访问到。数据流按照key排列能保证所有的状态更新都是本地操作,保证一致性且无事务问题。同时这种排列方式使Flink能够透明的再分发状态信息和调整数据流分区。
  (5)、窗口
  我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析。
  Flink有一些内置的窗口分配器,如下:
  Fixedwindows(固定窗口):在Flink中被也称为Tumblingwindows(滚动窗口),将时间切割成具有固定时间长度的段。滚动窗口之间不会重叠。
  Slidingwindows(滑动窗口):滑动窗口是滚动窗口更一般化的表现的形式,由窗口大小和滑动间隔这两个属性来定义。如果滑动间隔小于窗口大小,那么不同的窗口之间就会存在重叠;如果滑动间隔大于窗口大小,不同窗口之间就会存在间隔;如果滑动间隔等于窗口大小,就相当于滚动窗口。
  SessionWindows(会话窗口):和滚动窗口与滑动窗口不同的是,会话窗口并没有固定的窗口大小;它是一种动态窗口,通常由超时间隔(timeoutgap)来定义。当超过一段时间没有新的事件到达,则可以认为窗口关闭了。
  有三种最基本的操作窗口内的事件的选项:像批量处理,ProcessWindowFunction会缓存Iterable和窗口内容,供接下来全量计算;或者像流处理,每一次有事件被分配到窗口时,都会调用ReduceFunction或者AggregateFunction来增量计算;或者结合两者,通过ReduceFunction或者AggregateFunction预聚合的增量计算结果在触发窗口时,提供给ProcessWindowFunction做全量计算。stream。keyBy(keyselector)。window(windowassigner)。reduceaggregateprocess(windowfunction);示例代码:DataStreamSensorReadinginput。。。;input。keyBy(xx。key)。window(TumblingEventTimeWindows。of(Time。minutes(1)))。process(newMyWastefulMax());publicstaticclassMyWastefulMaxextendsProcessWindowFunctionSensorReading,输入类型Tuple3String,Long,Integer,输出类型String,键类型TimeWindow{窗口类型Overridepublicvoidprocess(Stringkey,Contextcontext,IterableSensorReadingevents,CollectorTuple3String,Long,Integerout){intmax0;for(SensorReadingevent:events){maxMath。max(event。value,max);}out。collect(Tuple3。of(key,context。window()。getEnd(),max));}}
  (6)、Watermark
  怎么确定一个窗口是否已经结束,这在流式数据处理系统中并非一个很容易解决的问题。如果窗口是基于处理时间的,那么问题确实容易解决,因为处理时间是完全基于本地时钟的;但是如果窗口基于事件时间,由于分布式系统中消息可能存在延迟、乱序到达的问题,即便系统已经接收到窗口边界以外的数据了,也不能确定前面的所有数据都已经到达了。
  (7)、Checkpoint
  Flink的Checkpoint机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据Checkpoint的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。在Flink中,Checkpoint机制采用的是chandylamport(分布式快照)算法,通过Checkpoint机制,保证了Flink程序内部的ExactlyOnce语义。
  3、安装部署
  (1)、下载:
  官网(archive。apache。orgdistflink),flink1。9。3binscala2。12。tgz下载后解压到硬盘目录,
  源码地址:github。comapacheflink
  (2)、运行(需要安装java运行环境)
  只需要进入到解压目录的bin目录下,运行startcluster。bat
  (3)、访问UI
  (4)、运行示例:
  命令行输入:flink。batrunD:flinkexamplesbatchWordCount。jarinputD:flinkREADME。txtoutputD:flinkREADMECountWordResult。txt
  可以在管理界面查看任务运行情况
  4、应用示例
  创建流处理环境,创建一个Javamaven项目,pom文件中引入需要包
  pom文件:?xmlversion1。0encodingUTF8?projectxmlnshttp:maven。apache。orgPOM4。0。0xmlns:xsihttp:www。w3。org2001XMLSchemainstancexsi:schemaLocationhttp:maven。apache。orgPOM4。0。0http:maven。apache。orgxsdmaven4。0。0。xsdmodelVersion4。0。0modelVersiongroupIdorg。examplegroupIdmvntestartifactIdversion1。0SNAPSHOTversionnamemvntestname!FIXMEchangeittotheprojectswebsiteurlhttp:www。example。comurlpropertiesproject。build。sourceEncodingUTF8project。build。sourceEncodingmaven。compiler。source1。7maven。compiler。sourcemaven。compiler。target1。7maven。compiler。targetpropertiesdependenciesdependencygroupIdjunitgroupIdjunitartifactIdversion4。11versionscopetestscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka2。12artifactIdversion1。9。3versionscopetestscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkjavaartifactIdversion1。9。3versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkscala2。12artifactIdversion1。9。3versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkclients2。12artifactIdversion1。9。3versiondependency!dependencygroupIdorg。apache。flinkgroupIdflinkstreamingjava2。12artifactIdversion1。9。3versionscopeprovidedscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkstreamingscala2。12artifactIdversion1。9。3versionscopeprovidedscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka2。12artifactIdversion1。9。3versionscopecompilescopedependencydependenciesbuildpluginManagement!lockdownpluginsversionstoavoidusingMavendefaults(maybemovedtoparentpom)plugins!cleanlifecycle,seehttps:maven。apache。orgrefcurrentmavencorelifecycles。htmlcleanLifecyclepluginmavencleanpluginartifactIdversion3。1。0versionplugin!defaultlifecycle,jarpackaging:seehttps:maven。apache。orgrefcurrentmavencoredefaultbindings。htmlPluginbindingsforjarpackagingpluginmavenresourcespluginartifactIdversion3。0。2versionpluginpluginmavencompilerpluginartifactIdversion3。8。0versionpluginpluginmavensurefirepluginartifactIdversion2。22。1versionpluginpluginmavenjarpluginartifactIdversion3。0。2versionpluginpluginmaveninstallpluginartifactIdversion2。5。2versionpluginpluginmavendeploypluginartifactIdversion2。8。2versionplugin!sitelifecycle,seehttps:maven。apache。orgrefcurrentmavencorelifecycles。htmlsiteLifecyclepluginmavensitepluginartifactIdversion3。7。1versionpluginpluginmavenprojectinforeportspluginartifactIdversion3。0。0versionpluginpluginspluginManagementpluginsplugingroupIdorg。apache。maven。pluginsgroupIdmavencompilerpluginartifactIdconfigurationsource8sourcetarget8targetconfigurationpluginpluginsbuildproject
  创建kafaka数据源StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(1000,CheckpointingMode。ATLEASTONCE);PropertiespropertiesnewProperties();kafka连接信息properties。setProperty(bootstrap。servers,11。11。160。158:9092);properties。setProperty(group。id,kafkagroup);DataStreamStringstreamenv。addSource(newFlinkKafkaConsumer(kafkatopic,newSimpleStringSchema(),properties));
  数据处理:分组stream。keyBy(newKeySelectorString,String(){OverridepublicStringgetKey(Stringx)throwsException{}})窗口。window(TumblingEventTimeWindows。of(Time。minutes(1)))。timeWindow(Time。seconds(30))数据处理。process(newMyProcessHandler())自定义数据输出。addSink(newMySinkFunction())。name(sinktest);
  publicstaticclassMyProcessHandlerextendsProcessWindowFunctionString,inputtypeTuple3String,Long,Integer,outputtypeString,keytypeTimeWindow{windowtypeOverridepublicvoidprocess(Stringkey,Contextcontext,IterableStringevents,CollectorTuple3String,Long,Integerout)throwsException{out。collect(Tuple3。of(key。toString(),context。window()。getEnd(),max));}Overridepublicvoidclear(Contextcontext)throwsException{super。clear(context);}}publicstaticclassMySinkFunctionextendsRichSinkFunctionTuple3String,Long,Integer{Overridepublicvoidopen(Configurationparameters)throwsException{super。open(parameters);}Overridepublicvoidinvoke(Tuple3String,Long,Integervalue,Contextcontext)throwsException{System。out。println(value。f1);System。out。println(context。currentProcessingTime());System。out。println(end);}}
  上传jar包运行任务
  java命令窗口可以查看输出,也可以通过管理UI查看任务运行情况。
  5、总结
  Flink功能强大,性能优秀,在众多场景下都能发挥出巨大价值,尤其是在处理大规模数据、实时分析、机器学习等领域具有广泛的应用和前景。
投诉 评论 转载

产品升级成为趋势第二十三届中国塑料博览会见闻28日上午,第二十三届中国塑料博览会在浙江省宁波市余姚市开幕,占地4。2万平方米的余姚市中塑国际会展中心,8个展馆里挤满了400余家国内外参展企业。第二十三届中国塑料博览……HCL网络设备模拟器安装教程为你呈现最详细的操作步骤自我设限,固步自封,唯有突破极限,才能发掘潜能。大家好,我是每天分享《网络技术》和《系统运维技术》的网络系统技艺者,右上角点关注陪你一起成长,见证更强大的自己。前言:欢迎……80岁还是小又就好了陈小齐又又快12岁了,但妈妈还喊他小又。小又当儿童当得真是好。吃喝拉撒睡非常省心,节约大人的体力。从婴儿时期就是如此。如果是古代,一定乐意立他为太子。但家里并没有皇……使用ApacheFlink对物联网实时数据分析1、前言最近项目涉及物联网设备实时数据、采集、传输、展示,以及大数据预警报警等功能(thingsBoardkafkaMongoDB)。统计分析模块采用的传统的基于数据库的……青训又出瑰宝!布莱顿18岁神锋横空出世,未来将被豪门疯抢?对阵利物浦的赛事,布莱顿最终是在主场30轻松取胜,与伤兵满营且状态低迷的利物浦相比,布莱顿这边状态无疑是更好的,麦卡利斯特、凯塞多、格罗斯、马希、三笘薫等人状态都打出了优秀的表……2023年月子中心30天真实经历现在随着经济条件越来越好,生完孩子,特别流行去月子中心或者家里请月嫂。现在刚从月子中心回家,说说我的真实经历,供大家参考,在选择月子中心时可以擦亮眼睛,结合自己情况选择合适的月……一个孤独症患儿父亲的讲述参与孩子的世界,完成他的游戏4月1日上午,湖南中医药大学第一附属医院一楼大厅里,喔喔提着一袋新玩具向爸爸胡毅(化名)招手示意,嘴里支支吾吾地想说些什么。喔喔,告诉爸爸,这是什么东西?胡毅耐心地比划着……如何正确的治疗痤疮痤疮的治疗方法有多种,其中包括药物治疗、非药物治疗和联合治疗。药物治疗是治疗痤疮的主要方法之一,包括外用药物和口服药物。外用药物包括过氧苯甲酰凝胶、克林霉素凝胶、阿达帕林……棋差一着23惜败JDG4月5号的BLG打得有点出乎意料,但又有点不尽人意,这场比赛打满了整个bo5,而BLG几乎每局前期都能拿到优势,这是让人没想到的,而拿到优势的BLG却没有稳住,中期总是会浪几波……澳门3月入境旅客环比增加日均达6。1万至6。2万人次智通财经APP获悉,4月3日,中国澳门旅游业持续加快复苏,旅游局长文绮华表示,截至上周数据,3月日均入境旅客有6。1万至6。2万人次,较二月增加,平日与周末的客量落差不大。截至……科技进步不能各自为战要靠统筹规划官方将组建中央科技委员会头条热榜一直以来中国的科技发展受一些经济学家的影响很大,他们认为市场万能,市场可以解决一切,我们国家成功的举国体制被他们说成落后的计划经济。……光屁股拍戏一天8000,女生更贵,横漂高收入背后,没那么简单文小昕编辑小昕娱乐圈是个大染缸,常年在这个圈子里生活,很容易瞎了眼,被掌声、欢呼声毒害。Beyond的吉他手黄贯中对年轻人的寄语,也撕开了娱乐圈的遮羞布。……
天太热自然避暑夺冠靠砸钱?勇士夺冠成本曝光,狼王批评乔丹自私,拉尔萨秀恩爱过去十年全球气候投融资实现两位数增长我的孩子抑郁了,我却以为他只是不开心家庭自助机制建立最关键华为从代理销售到通信巨头的传奇之路我发现前一天晚上只要用这个方法!第二天真的绝了!护肤后厨无厨师,全靠料包加水!和府捞面凭啥卖40一碗?iPhone15最新消息爆料汇总A17性能提升大,一项设计遗二十四岁定律塞尔达传说王国之泪的粉丝利用预告片线索找到公主的位置手机市场疲了没有新故事,没有换机潮周星驰为什么后悔,没对达叔亲口说声ILoveYou?

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找