实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大。以ApacheHadoop生态为核心的数据看板业务一般只能实现离线的业务。在部分领域,数据实时处理的能力已经成为限制企业数据变现的重要瓶颈之一。搭建数据看板快节奏地进行数据分析,已经成为了一种必然的选择。 实时数仓发展 实时数仓有三个著名的分水岭:第一个分水岭是从无到有,ApacheStorm的出现打破了MapReduce的单一计算方式,让业务能够处理T0的数据;第二个分水岭是从有到全,Lambda与Kappa架构的出现,使离线数仓向实时数仓迈进了一步,而Lambda架构到Kappa架构的演进,实现了离线数仓模型和实时数仓模型的紧密结合;第三个分水岭是从繁到简,Flink技术栈的落地使实时数仓架构变得精简,并且是现在公认的流批一体最佳解决方案。 以Flink作为实时计算引擎实现的实时数仓,将一部分复杂的计算转嫁给OLAP分析引擎上,使得应用层的分析需求更加灵活。但仍然无法改变数据仓库变更数据的排斥。下一代的实时数仓平台,不仅要提供更为优秀的性能,同时也需要更为完善的功能以匹配不同的业务。 作为一款全平台极速MPP架构,StarRocks提供了多种性能优化手段与灵活的建模方式,在预聚合、宽表和星型雪花等多种模型上,都可以获得极致的性能体验。通过StarRocks结合Flink构建开源实时数仓的方案,可以同时提供秒级数据同步和极速分析查询的能力。同时,通过StarRocks主键模型,也可以更好地支持实时和频繁更新等场景。 基于Flink的开源实时数仓痛点 原有基于Flink构建实施数仓的方案中,由于数据源的多样性,需要使用不同的采集工具,如Flume、Canal、Logstash。对于不同的业务,我们通常会采用不同的分析引擎。比如,对于固定报表业务,根据已知的查询语句可以预先将事实表与维度表打平成宽表,充分利用ClickHouse强大的单表查询能力;对于高并发的查询请求,可以使用ApacheDruid承受大量用户高峰时期集中使用带来的并发压力。通过技术栈堆叠的方式确实可以满足业务要求,但也会让分析层变得臃肿,增加开发与运维的成本。 一般来说,StarRocksXFlink构建开源实时数仓生态架构分为五层:第一层是数据源。数据源可以是多种多样的,比如说MySQLBinlog、爬虫数据或者是平面文件;第二层是数据采集层。用户使用多种不同的CDC工具,比如Canal、Debezium拉取上游的增量数据,通常会将数据写入到Kafka中,而后在通过Flink消费Kafka中的数据;第三层是实时计算层。可以通过Flink的实时计算能力完成轻量级的ETL工作,如拼宽表或数据清洗等;第四层是数据存储层。Flink相比其他的实时技术栈更加依赖OLAP引擎;最后一层是后端应用层。可以是实时监控系统,实时报表系统,实时推荐系统以及实时数据接口服务。 我们常说,天下武功,唯快不破。以Flink为计算引擎构建的实时数仓系统,最关心的就是数据摄入速度足够快,延迟足够低。在这样一套架构中,数据从数据源到OLAP分析系统途径采集工具层,消息队列层,实时计算层。冗长的链路给开发和运维带来了极大的风险,任何一个模块的阻塞都会对实时性产生影响。同时,在数据存储层上,我们也会选择不同的存储引擎适配不同的业务。对于上面的数据链路,我们也面临着诸多的挑战,需要从时效性、功能性及可维护性上做更多的探索,由此可以总结归纳出多个方面尚待优化:CDC组件不统一,链路过长,任何组件出现瓶颈都会对时效性产生影响,组件过多,需要多部门协作维护,学习成本与维护成本成倍增长;部分同步组件,如Debezium在保证数据一致性时,需要对读取的表加锁,可能会影响业务更新;分析层使用多种数据存储产品适应不同的业务类型,难以有一种产品能够适应大部分的业务;去重操作对应逻辑复杂,需要在flink里面增加MapStat逻辑。FlinkCDC,打通端到端链路 FlinkCDC是由Flink社区开发的集数据采集、数据转换、数据装载一体的组件,可以直接从MySQL、PostgreSQL、Oracle等数据源直接读取全量或增量数据并写入下游的OLAP数据存储系统。使用FlinkCDC后,可以简单高效的抓取上游的数据变更,同步到下游的OLAP数据仓库中。 构建一体化数据传输链路 在传统的实时数仓建设中,数据采集工具是不可或缺的。由于上游的数据源不一致,通常来说我们可能会在数据采集层接入不同的同步与采集工具,比如采集Oracle中的数据时,我们通常选择GoldenGate,而对于MySQL,我们可能会选择Canal或Debezium。有些采集工具支持全量数据同步,有些支持增量数据同步。数据经过采集层后,会传输到消息队列中如Kafka,然后通过Flink消费Kafka中的增量数据再写入下游的OLAP数据仓库或者数据湖中。 在业务开发中,上游的数据源、消息中间件、Flink以及下游的分析性数据仓库通常在不同的部门进行维护。遇到业务变更或者故障调试时,可能需要多个部门协作处理,增加了业务开发与测试的难度。通过使用FlinkCDC替换上图中的数据采集组件与消息队列,将虚线框中的采集组件与消息队列合并到计算层Flink中,从而简化分析链路,降低维护成本。同时更少的组件也意味着更少的故障与传输瓶颈,数据实效性会进一步的提高。 在使用FlinkCDC之后,数据链路中的组件变得更少,架构变得清晰简单,维护变得更方便。如在上面的例子中,我们使用FlinkCDC拉取MySQL中的增量数据,通过FlinkSQL创建事实与维度的MySQLCDC表,并在Flink中进行打宽操作,将结果写入到下游的StarRocks中。通过一个FlinkCDC作业就可以完成抓取,转换,装载的全过程。 全量增量数据同步 在传统的数据同步框架中,我们通常会分为两个阶段:全量数据同步阶段:通过全量同步工具,如DataX或sqoop等,进行快照级别的表同步。增量数据同步阶段:通过增量同步工具,如Canal或GoldenGate等,实时拉取快照之后的增量数据进行同步。 在全量数据同步时,为了加快导入的速度,我们可以选择多线程的导入模式。在多线程模型下进行全量数据同步时,在对数据进行切分后,通过启动多个并发任务完成数据的同步。由于多个并发业务之间可能不属于同一个读事务,并且存在一定的时间间隔,所以不能严格的保证数据的一致性。为了保证数据的一致性,从工程学与技术实现的角度做平衡,我们有两种方案:停止数据的写入操作,通过锁表等方式保证快照数据的静态性。但这将影响在线的业务。采用单线程同步的方式,不再对数据进行切片。但导入性能无法保证。 通过FlinkCDC,可以统一全量增量的数据同步工作。FlinkCDC1。x版本中,采用Debezium作为底层的采集工具,在全量的数据读取过程中,为了保证数据的一致性,也需要对库或表进行加锁操作。为了解决这个问题,Flink2。0中引入了Chunk切分算法保证数据的无锁读取。Chunk的切分算法类似分库分表原理,通过表的主键对数据进行分片操作。 在经过Chunk数据分片后,每个Chunk只负责自己主键范围内的数据,只要保证每个Chunk的读取一致性,这也是无锁算法的基本原理。StarRocks,实时数据更新新方案 StarRocks是一款极速全场景MPP企业级数据仓库产品,具备水平在线扩缩容能力,金融级高可用,兼容MySQL协议和MySQL生态,提供全面向量化引擎与多种数据源联邦查询等重要特性。作为一款MPP架构的分析性数据仓库,StarRocks能够支撑PB级别的数据量,拥有灵活的建模方式,可以通过物化视图、位图索引、稀疏索引等优化手段构建极速统一的分析层数据存储系统。 StarRocks在1。19版本推出了主键模型(PrimaryKeymodel)。相较更新模型,主键模型可以更好地支持实时和频繁更新等场景。主键模型要求表有唯一的主键(传统数据仓库中的primarykey),支持对表中的行按主键进行更新和删除操作。 主键模型对实时数据变更的优化 在OLAP数据仓库中,可变数据通常是不受欢迎的。在传统数仓中,一般我们会使用批量更新的方式处理大量数据变更的场景。对于数据的变更我们有两种方法处理:在新的分区中插入修改后的数据,通过分区交换完成数据变更。部分OLAP数据仓库产品提供了基于MergeonRead模型的更新功能,完成数据变更。 分区交换数据更新模式 对于大部分的OLAP数据仓库产品,我们可以通过操作分区的方式,将原有的分区删除掉,然后用新的分区代替,从而实现对大量数据的变更操作。一般来说需要经历三个步骤:创建一张新的分区表,根据业务变更,将新的数据存储到新表中;卸载并删除原有的分区;将新表中的分区装载到目标表中。 通过交换分区来实现大规模数据变更是一个相对较重的操作,适用于低频的批量数据更新。由于涉及到了表定义的变更,一般来说开发人员无法通过该方案独立完成数据变更。 MergeonRead数据更新模式 部分的OLAP数据仓库提供了基于MergeonRead的数据变更模型,如ClickHouse提供了MergeTree引擎,可以完成异步更新,但无法做到数据实时同步。在指定FINAL关键字后,ClickHouse会在返回结果之前完成合并,从而实现准实时的数据更新同步操作。但由于FINAL操作高昂的代价,不足以支撑实时数仓带来的频繁维度更新需求。同时,即便是在低频的更新场景中,也无法将ClickHouseMergeTree的方案复制到其他存储系统中。 StarRocks提供了与ClickHouseMergeTree类似的更新模型(UniqueKeymodel),通过MergeonRead的模式完成数据的更新操作。在更新模型中,StarRocks内部会给每一个批次导入的数据分配一个版本号,同一主键可能存在多个版本,在查询时进行版本合并,返回最新版本的记录。 MergeonRead模式在写入时简单高效,但读取时会消耗大量的资源在版本合并上,同时由于merge算子的存在,使得谓词无法下推、索引无法使用,严重的影响了查询的性能。StarRocks提供了基于DeleteandInsert模式的主键模型,避免了因为版本合并导致的算子无法下推的问题。主键模型适合需要对数据进行实时更新的场景,可以更好的解决行级别的更新操作,支撑百万级别的TPS,特别适合MySQL或其他业务库同步到StarRocks的场景。 在TPCH标准测试集中,我们选取了部分的查询进行对比,基于DeleteandInsert模式的主键模型相较于基于MergeonRead的UniqueKey模型,性能有明显的提高: Query 数据量 PrimaryKey(DeleteandInsert) UniqueKey(MergeonRead) 性能提升 导入过程中 SELECTCOUNT()FROM 8000万 0。24sec 1。15sec 6。29x SELECTCOUNT()FROM 1。6亿 0。31sec 3。4sec 10。97x SELECTCOUNT(),SUM(quantify)FROMordersWHERErevenue2000; 1000万 0。23sec 3。49sec 15。17x 导入后 SELECTCOUNT()FROM 2亿 0。32sec 1。17sec 3。66x SELECTCOUNT(),SUM(quantify)FROMordersWHERErevenue2000; 1200万 0。34sec 1。52sec 4。47x 主键模型对去重操作的支持 消除重复数据是实际业务中经常遇到的难题。在数据仓库中,重复数据的删除有助于减少存储所消耗的容量。在一些特定的场景中,重复数据也是不可接受的,比如在客群圈选与精准营销业务场景中,为了避免重复推送营销信息,一般会根据用户ID进行去重操作。在传统的离线计算中,可以通过distinct函数完成去重操作。在实时计算业务中,去重是一个增量和长期的过程,我们可以在Flink中通过添加MapState逻辑进行去重操作。但通过MapStat,多数情况下只能保证一定的时间窗口内数据去重,很难实现增量数据与OLAP库中的存量数据进行去重。随着时间窗口的增加,Flink中的去重操作会占用大量的内存资源,同时也会使计算层变得臃肿复杂。 主键模型要求表拥有唯一的主键,支持表中的行按照主键进行更新和删除操作。主键的唯一性与去重操作的需求高度匹配,在数据导入时,主键模型就已经完成了去重操作,避免了手动去重带来的资源消耗。通过对业务逻辑的拆解,我们可以选取合适去重列作为主键,在数据导入时通过DeleteandInsert的方式完成数据根据唯一主键进行去重的需求。相比于在Flink中实现去重,StarRocks主键模型可以节省大量的硬件资源,操作更为简单,并且可以实现增量数据加存量数据的去重操作。 主键模型对宽表数据变更优化 在固定报表业务场景中,通常会根据固定的查询,在Flink中对数据进行简单的业务清洗后打平成宽表,借用宽表极佳的多维分析性能,助力查询提速,同时也简化了分析师使用的数据模型。但由于宽表需要预聚合的属性,在遇到维度数据变更的情况,需要通过重跑宽表以实现数据更新。StarRocks的主键模型不仅可以应用于数据变更场景,同时部分列更新的功能,也高度契合多种业务对宽表中不同字段进行部分更新的需求。 在宽表模型中,一般会有几十上百甚至上千列。这给通过UPSERT方式完成数据更新的主键模型带了一定的挑战。我们需要获得变更行的所有信息后,才能后完成宽表的数据更新。这使得变更操作会附带上回表读取的操作,需要从StarRocks中拉取变更的数据行,然后拼出插入的语句完成数据更新。这给StarRocks带来了极大的查询压力。部分列更新的功能(particalupdate)极大程度的简化upsert操作。在开启参数partialupdate后,我们可以根据主键,只修改部分指定的列,原有的value列保持不变。 如下面的例子中,我们可以通过RoutineLoad导入方式来消费Kafka中的数据。在properties中需要设置partialupdatetrue,指定开启部分列更新模式,并指定需要更新的列名COLUMN(id,name)。 CREATEROUTINELOADroutineloadpaticalupdatedemoonexampletableCOLUMNS(id,name),COLUMNSTERMINATEDBY,PROPERTIES(partialupdatetrue)FROMKAFKA(kafkabrokerlistbroker1:9092,broker2:9092,broker3:9092,kafkatopicmytopic,kafkapartitions0,1,2,3,kafkaoffsets101。0。0。200);StarRocksXFlinkCDC,打造极速统一的开源实时数仓平台 FlinkCDC解决了数据链路冗长的问题,而StarRocks在OLAP分析层提供了极致的性能与一体化的数据存储方案以匹配不同的业务场景。通过StarRocks结合FlinkCDC构建的实时数仓平台的方案,能够极大程度的减少开发与运维的成本。 StarRocksXFlinkCDC,宽表实时数仓架构 使用StarRocks与FlinkCDC的联合解决方案,我们可以较为清晰的将实时数仓规划成为四层结构:数据源层,实时应用层,与原有架构相同,未做调整数据传输与计算层,通过引入FlinkCDC,将数据采集层,消息队列与事实计算层都放置在FlinkCDC中,简化了数据链路,减少了开发与运维成本。数据分析与存储层,StarRocks中作为分析层数据存储引擎,能够提供不同的数据模型支撑不同类型的业务,简化了分析层数据存储复杂的技术栈。 在ETL不复杂的场景,我们可以将大部分ETL的操作放在Flink中实现。在某些场景下,业务模型相对简单,事实数据与维度数据利用Flink多流join的能力打平成宽表,在Flink中完成了DWD,DWS与ADS层模型划分。同时对于非结构化的数据,也可以增量写入到Iceberg、Hudi或Hive中,利用StarRocks的外表功能完成湖仓一体的架构。 当ETL的过程中引入较为复杂的业务逻辑是,可能会在Flink计算层占用大量的内存资源。同时,宽表的模式无法应对查询不固定的多维度分析场景。我们可以选择使用星型模型来替换宽表模型,将数据清洗与建模的操作放到StarRocks中完成。 StarRocksXFlinkCDC,实时数据变更架构 在某些复杂的业务,如自助BI报表,运营分析等场景中,分析师往往会从不同的维度进行数据探查。查询的随机性与灵活性要求OLAP分析引擎对性能和多种建模方式都有良好的支持,以满足使用者近乎随意的在页面上拉去指标和维度,下钻、上卷和关联查询。 对于StarRocks而言,可以使用更为灵活的星型模型代替大宽表。为了增强多表实时关联能力,StarRocks提供了不同的join方式,如BoardcastJoin、ShuffleJoin、BucketJoin、ReplicaShuffleJoin、ColocationJoin。CBO会根据表的统计信息选择joinreorder与join的类型。同时也提供了多种优化手段,如谓词下推、limit下推、延迟物化等功能,进行多表关联的查询加速。 基于StarRocks的实时join能力,我们可以将ETL操作后置到StarRocks中,在StarRocks通过实时join的方式完成数据建模。同时通过PrimaryKey模型对于数据变更的支持,可以在StarRocks中创建缓慢变化维实现维度数据变更。 通过星型雪花模型构建的实时数仓,可以将计算层Flink的建模操作后置到StarRocks引擎中。在Flink中,只需要做ODS层数据的清洗工作,维度表与事实表会通过FlinkCDC同步写入到StarRocks中。StarRocks中会在ODS层进行事实数据与维度数据的落地,通过聚合模型或物化视图完成与聚合操作。利用StarRocks的实时多表关联能力,配合智能CBO优化器,稀疏索引及向量化引擎等多种优化手段,能够快速计算查询结果,保证业务的在不同模型层的数据高度同源一致。 在现实生活中,维度的属性并非是静止的,会随着时间的流逝发生缓慢的变化。星型模型可以将事实表与维度表独立存储,将维度数据从宽表中解藕,从而利用StarRocks的主键模型处理缓慢变化维的问题。一般来说,我们有三种方案处理缓慢变化维的问题:使用主键模型,直接根据主键覆盖原有的维度值。这种方式较为容易实现,但是没有保留历史数据,无法分析历史维度变化信息;使用明细模型,直接添加维度行,通过version列来管理不同的维度属性版本,改种方案在查询是需要根据业务条件筛选出合适的维度version使用主键模型,在主键中引入version信息,混合使用直接修改与新添加维度行的方法,该方法较为复杂,但也能更全面的处理复杂的维度变化需求StarRocksXFlinkCDC用户案例 在某知名电商平台业务中,通过使用StarRocks与FlinkCDC极大程度的简化聊数据链路的复杂度。用户通过StarRocks构建实时数据看板平台,实现了多维度数据筛选、灵活漏斗分析、不同维度上卷下钻的灵活分析。 困难与挑战 在电商数据看板平台中,最初选择了ClickHouse作为数据分析层的存储引擎。但随着业务的发展,ClickHouse在部分场景中无法有效的支撑,主要体现在以下几个方面:根据用户下单的操作,部分订单的状态会发生变化。但一般来说,超过两周的订单状态基本不会发生变化;部分变化的数据不适合通过宽表的形式存储,部分的业务需求迭代较为频繁,宽表星型模型的建模方式可以更好的服务于业务变更;ClickHouse扩缩容操作复杂,无法自动对表进行rebalance操作,需要较长的业务维护窗口。 为了解决以上的问题,该电商平台重新做了技术选型。经过不断的对比与压测,最终选择使用StarRocks作为分析层的数据存储引擎。 系统架构 在实时看板业务中,主要可以划分成五个部分: 数据源层:数据源注意有两种,来自Web端与客户端的埋点日志,以及业务库中的订单数据; FlinkCDC:FlinkCDC抓取上游的埋点日志与业务数据,在FlinkCDC中进行数据的清洗与转换,写入到StarRocks中; 数据存储层:根据业务的需求,将DWD层中的事实数据联合维度数据拼成宽表,通过视图的方式写入到DWS层,在ADS层划分出不同的主题域; 数据服务层:包含了数据指标平台和漏斗分析平台两部分,根据内部的指标、漏斗定义进行逻辑计算,最终生成报表供分析师查看; 数据中台:围绕大数据分析平台,提供稳定性保障、数据资产管理、数据服务体系等基础服务; 选型收益 数据传输层:通过FlinkCDC可以直接拉取上游的埋点数据与MySQL订单库中的增量数据。相比于MySQLCanalKakfaFlink的链路,架构更加清晰简单。特别是对于上游的MySQL分库分表订单交易库,可以在FlinkCDC中通过Mapping的方式,将不同的库中的表和合并,经过清洗后统一写入到下游的StarRocks中。省略了Canal与Kafka组件,减少了硬件资源成本与人力维护成本。 数据存储层:通过StarRocks替换ClickHouse,可以在业务建模时,不必限制于宽表的业务模型,通过更为灵活的星型模型拓展复杂的业务。主键模型可以适配MySQL业务库中的订单数据变更,根据订单ID实时修改StarRocks中的存量数据。同时,在节点扩容时,StarRocks更为简单,对业务没有侵入性,可以完成自动的数据重分布。 性能方面:单表400亿与四张百万维度表关联,平均查询时间400ms,TP99在800ms左右,相较于原有架构有大幅的性能提升。替换StarRocks后,业务高峰期CPU使用从70下降到40。节省了硬件成本。在极速统一上更进一步 一款优秀的产品,只提供极致的性能是不够的。还需要丰富的功能适配用户多样的需求。未来我们也会对产品的功能进行进一步的拓展,同时也会在稳定性与易用性上做进一步的提升。 日前,阿里云EMapReduce与StarRocks社区合作,推出了首款StarRocks云上产品。我们也可以在EMR上选择相应规格的Flink与StarRocks。为了提供更好的使用体验,阿里云EMapReduce团队与StarRocks也在不断的对产品进行优化,在未来的几个月会提供以下的功能:多表物化视图:StarRocks将推出多表关联物化视图功能,进一步加强StarRocks的实时建模能力;湖仓一体架构:StarRocks进一步ApacheIceberg与ApacheHudi外表功能,打造StarRocks湖仓一体架构;表结构变更同步:在实时同步数据的同时,还支持将源表的表结构变更(增加列信息等)实时同步到目标表中;分库分表合并同步:支持使用正则表达式定义库名和表名,匹配数据源的多张分库分表,合并后同步到下游的一张表中;自定义计算列同步:支持在源表上新增计算列,以支持您对源表的某些列进行转换计算; 一款优秀的产品也离不开社区的生态,欢迎大家参与StarRocks与Flink社区的共建,也欢迎大家测试StarRocksPrimaryKeyXFlinkCDC的端到端实时数仓链路方案。 原文链接:http:click。aliyun。comm1000346410 本文为阿里云原创内容,未经允许不得转载。