1、前言 最近项目涉及物联网设备实时数据、采集、传输、展示,以及大数据预警报警等功能(thingsBoardkafkaMongoDB)。统计分析模块采用的传统的基于数据库的统计分析,对于海量实时数据统计分析性能和响应速度压力巨大,要实现低延迟的实时计算和秒级多维实时查询有技术挑战。ApacheFlink在数据分析领域中应用广泛,其实时处理能力以及运算速度都能满足大规模数据处理的需求,可以与物联网技术结合,实现对海量传感器数据的实时分析和管理。而且社区活跃技术方面也不断的创新和优化,支持许多流行数据源,如Kafka、HadoopHDFS、ES等,具有丰富的生态系统,Flink社区也提供了丰富的工具和库。这里主要介绍基本概念,和一个实际的示例。 2、基本概念 (1)、定义: ApacheFlink是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。 (2)、Flink架构: Flink是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。Flink运行时由两种类型的进程组成:一个JobManager和一个或者多个TaskManager。 对于分布式执行,Flink将算子的subtasks链接成tasks。每个task由一个线程执行。将算子链接成task是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。 (3)流处理: 在自然环境中,数据的产生原本就是流式的。无论是来自Web服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕有界流(bounded)或无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。 批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。 流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。 在Flink中,应用程序由用户自定义算子转换而来的流式dataflows所组成。这些流式dataflows形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。 核心的API,从开发步骤的角度来讲,主要分为四大部分Environment、Source、Transform、Sink,flink执行过程(envsourcetransformsink)。 Environment,FlinkJob在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。 流式处理环境获取:StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(1000,CheckpointingMode。ATLEASTONCE); 批处理环境:valenvExecutionEnvironment。getExecutionEnvironment Source,Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理,我们将获取数据的来源称之为数据源。数据源类型很多常用的包括集合类型(数据临时存储到内存中,形成特殊的数据结构后),文件中读取数据,Kafka中读取数据,用户可以自定义数据源。 Transform算子,转换算子,把当前的DataStream转化为另一个DataStream。数据处理的核心,有很多种算子如下图: 最常用的map、flatMap、Filter、keyBy、aggregation、reduce等算子,这些算子具体作用基本上看英文名字就知道了,使用方式很灵活支持拉姆达表达式。 KeyBy KeyBy算子将DataStream转换成一个KeyedStream。KeyedStream是一种特殊的DataStream,事实上,KeyedStream继承了DataStream,DataStream的各元素随机分布在各TaskSlot(任务槽)中,KeyedStream的各元素按照Key分组,分配到各TaskSlot中。我们需要向keyBy算子传递一个参数,以告知Flink以什么字段作为Key进行分组。stream。keyBy(0)或者stream。keyBy(newKeySelectorString,String(){OverridepublicStringgetKey(Stringx)throwsException{returnx。();}}) aggregation,常见的聚合操作有sum、max、min等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟keyBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。与批处理不同,这些聚合函数是对流数据进行聚合,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum算子的功能对该字段进行加和,并将结果保存在该字段上。min操作无法确定其他字段的数值。stream。keyBy(0)。sum(1)。print()0字段分组,1字段求和 reduce在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。 Sink其实可以表示为将处理完成数据进行存储,或者将处理完的数据发送到指定的存储系统(比如Oracle、Kafka等)(官方提供了一部分的框架的sink,用户可自定义实现自己的sink)。 (4)、有状态的数据操作(StatefulOperations) 在流处理中,有些操作仅仅在某一时间针对单一事件(如事件转换map),有些操作需要记住多个事件的信息并进行处理(windowoperators),后者的这些操作称为有状态的操作。有状态的操作一般被维护在内置的keyvalue存储中。这些状态信息会跟数据流一起分区并且分布存储,并且可以通过有状态的数据操作来访问。因此这些keyvalue的状态信息仅在带key的数据流(通过keyBy()函数处理过)中才能访问到。数据流按照key排列能保证所有的状态更新都是本地操作,保证一致性且无事务问题。同时这种排列方式使Flink能够透明的再分发状态信息和调整数据流分区。 (5)、窗口 我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析。 Flink有一些内置的窗口分配器,如下: Fixedwindows(固定窗口):在Flink中被也称为Tumblingwindows(滚动窗口),将时间切割成具有固定时间长度的段。滚动窗口之间不会重叠。 Slidingwindows(滑动窗口):滑动窗口是滚动窗口更一般化的表现的形式,由窗口大小和滑动间隔这两个属性来定义。如果滑动间隔小于窗口大小,那么不同的窗口之间就会存在重叠;如果滑动间隔大于窗口大小,不同窗口之间就会存在间隔;如果滑动间隔等于窗口大小,就相当于滚动窗口。 SessionWindows(会话窗口):和滚动窗口与滑动窗口不同的是,会话窗口并没有固定的窗口大小;它是一种动态窗口,通常由超时间隔(timeoutgap)来定义。当超过一段时间没有新的事件到达,则可以认为窗口关闭了。 有三种最基本的操作窗口内的事件的选项:像批量处理,ProcessWindowFunction会缓存Iterable和窗口内容,供接下来全量计算;或者像流处理,每一次有事件被分配到窗口时,都会调用ReduceFunction或者AggregateFunction来增量计算;或者结合两者,通过ReduceFunction或者AggregateFunction预聚合的增量计算结果在触发窗口时,提供给ProcessWindowFunction做全量计算。stream。keyBy(keyselector)。window(windowassigner)。reduceaggregateprocess(windowfunction);示例代码:DataStreamSensorReadinginput。。。;input。keyBy(xx。key)。window(TumblingEventTimeWindows。of(Time。minutes(1)))。process(newMyWastefulMax());publicstaticclassMyWastefulMaxextendsProcessWindowFunctionSensorReading,输入类型Tuple3String,Long,Integer,输出类型String,键类型TimeWindow{窗口类型Overridepublicvoidprocess(Stringkey,Contextcontext,IterableSensorReadingevents,CollectorTuple3String,Long,Integerout){intmax0;for(SensorReadingevent:events){maxMath。max(event。value,max);}out。collect(Tuple3。of(key,context。window()。getEnd(),max));}} (6)、Watermark 怎么确定一个窗口是否已经结束,这在流式数据处理系统中并非一个很容易解决的问题。如果窗口是基于处理时间的,那么问题确实容易解决,因为处理时间是完全基于本地时钟的;但是如果窗口基于事件时间,由于分布式系统中消息可能存在延迟、乱序到达的问题,即便系统已经接收到窗口边界以外的数据了,也不能确定前面的所有数据都已经到达了。 (7)、Checkpoint Flink的Checkpoint机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据Checkpoint的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。在Flink中,Checkpoint机制采用的是chandylamport(分布式快照)算法,通过Checkpoint机制,保证了Flink程序内部的ExactlyOnce语义。 3、安装部署 (1)、下载: 官网(archive。apache。orgdistflink),flink1。9。3binscala2。12。tgz下载后解压到硬盘目录, 源码地址:github。comapacheflink (2)、运行(需要安装java运行环境) 只需要进入到解压目录的bin目录下,运行startcluster。bat (3)、访问UI (4)、运行示例: 命令行输入:flink。batrunD:flinkexamplesbatchWordCount。jarinputD:flinkREADME。txtoutputD:flinkREADMECountWordResult。txt 可以在管理界面查看任务运行情况 4、应用示例 创建流处理环境,创建一个Javamaven项目,pom文件中引入需要包 pom文件:?xmlversion1。0encodingUTF8?projectxmlnshttp:maven。apache。orgPOM4。0。0xmlns:xsihttp:www。w3。org2001XMLSchemainstancexsi:schemaLocationhttp:maven。apache。orgPOM4。0。0http:maven。apache。orgxsdmaven4。0。0。xsdmodelVersion4。0。0modelVersiongroupIdorg。examplegroupIdmvntestartifactIdversion1。0SNAPSHOTversionnamemvntestname!FIXMEchangeittotheprojectswebsiteurlhttp:www。example。comurlpropertiesproject。build。sourceEncodingUTF8project。build。sourceEncodingmaven。compiler。source1。7maven。compiler。sourcemaven。compiler。target1。7maven。compiler。targetpropertiesdependenciesdependencygroupIdjunitgroupIdjunitartifactIdversion4。11versionscopetestscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka2。12artifactIdversion1。9。3versionscopetestscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkjavaartifactIdversion1。9。3versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkscala2。12artifactIdversion1。9。3versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkclients2。12artifactIdversion1。9。3versiondependency!dependencygroupIdorg。apache。flinkgroupIdflinkstreamingjava2。12artifactIdversion1。9。3versionscopeprovidedscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkstreamingscala2。12artifactIdversion1。9。3versionscopeprovidedscopedependencydependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka2。12artifactIdversion1。9。3versionscopecompilescopedependencydependenciesbuildpluginManagement!lockdownpluginsversionstoavoidusingMavendefaults(maybemovedtoparentpom)plugins!cleanlifecycle,seehttps:maven。apache。orgrefcurrentmavencorelifecycles。htmlcleanLifecyclepluginmavencleanpluginartifactIdversion3。1。0versionplugin!defaultlifecycle,jarpackaging:seehttps:maven。apache。orgrefcurrentmavencoredefaultbindings。htmlPluginbindingsforjarpackagingpluginmavenresourcespluginartifactIdversion3。0。2versionpluginpluginmavencompilerpluginartifactIdversion3。8。0versionpluginpluginmavensurefirepluginartifactIdversion2。22。1versionpluginpluginmavenjarpluginartifactIdversion3。0。2versionpluginpluginmaveninstallpluginartifactIdversion2。5。2versionpluginpluginmavendeploypluginartifactIdversion2。8。2versionplugin!sitelifecycle,seehttps:maven。apache。orgrefcurrentmavencorelifecycles。htmlsiteLifecyclepluginmavensitepluginartifactIdversion3。7。1versionpluginpluginmavenprojectinforeportspluginartifactIdversion3。0。0versionpluginpluginspluginManagementpluginsplugingroupIdorg。apache。maven。pluginsgroupIdmavencompilerpluginartifactIdconfigurationsource8sourcetarget8targetconfigurationpluginpluginsbuildproject 创建kafaka数据源StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(1000,CheckpointingMode。ATLEASTONCE);PropertiespropertiesnewProperties();kafka连接信息properties。setProperty(bootstrap。servers,11。11。160。158:9092);properties。setProperty(group。id,kafkagroup);DataStreamStringstreamenv。addSource(newFlinkKafkaConsumer(kafkatopic,newSimpleStringSchema(),properties)); 数据处理:分组stream。keyBy(newKeySelectorString,String(){OverridepublicStringgetKey(Stringx)throwsException{}})窗口。window(TumblingEventTimeWindows。of(Time。minutes(1)))。timeWindow(Time。seconds(30))数据处理。process(newMyProcessHandler())自定义数据输出。addSink(newMySinkFunction())。name(sinktest); publicstaticclassMyProcessHandlerextendsProcessWindowFunctionString,inputtypeTuple3String,Long,Integer,outputtypeString,keytypeTimeWindow{windowtypeOverridepublicvoidprocess(Stringkey,Contextcontext,IterableStringevents,CollectorTuple3String,Long,Integerout)throwsException{out。collect(Tuple3。of(key。toString(),context。window()。getEnd(),max));}Overridepublicvoidclear(Contextcontext)throwsException{super。clear(context);}}publicstaticclassMySinkFunctionextendsRichSinkFunctionTuple3String,Long,Integer{Overridepublicvoidopen(Configurationparameters)throwsException{super。open(parameters);}Overridepublicvoidinvoke(Tuple3String,Long,Integervalue,Contextcontext)throwsException{System。out。println(value。f1);System。out。println(context。currentProcessingTime());System。out。println(end);}} 上传jar包运行任务 java命令窗口可以查看输出,也可以通过管理UI查看任务运行情况。 5、总结 Flink功能强大,性能优秀,在众多场景下都能发挥出巨大价值,尤其是在处理大规模数据、实时分析、机器学习等领域具有广泛的应用和前景。