城市直播房产教育博客汽车
投稿投诉
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

万字长文Flinkcdc源码精讲(推荐收藏)(四)

2月14日 眉梢欢投稿
  SnapshotSplitReader。submitSplit方法publicvoidsubmitSplit(MySqlSplitmySqlSplit){this。currentSnapshotSplitmySqlSplit。asSnapshotSplit();statefulTaskContext。configure(currentSnapshotSplit);拿到context的queue,在pollSplitSrecords的时候需要this。queuestatefulTaskContext。getQueue();this。nameAdjusterstatefulTaskContext。getSchemaNameAdjuster();this。hasNextElement。set(true);this。reachEnd。set(false);主要读取逻辑在readTask中this。splitSnapshotReadTasknewMySqlSnapshotSplitReadTask(statefulTaskContext。getConnectorConfig(),statefulTaskContext。getOffsetContext(),statefulTaskContext。getSnapshotChangeEventSourceMetrics(),statefulTaskContext。getDatabaseSchema(),statefulTaskContext。getConnection(),statefulTaskContext。getDispatcher(),statefulTaskContext。getTopicSelector(),StatefulTaskContext。getClock(),currentSnapshotSplit);提交一个runnable到线程中,主要是执行readTask的execute方法executor。submit((){try{currentTaskR自己实现的contextImpl主要记录高水位和低水位用finalSnapshotSplitChangeEventSourceContextImplsourceContextnewSnapshotSplitChangeEventSourceContextImpl();执行readTaskSnapshotResultsnapshotResultsplitSnapshotReadTask。execute(sourceContext);finalMySqlBinlogSplitbackfillBinlogSplitcreateBackfillBinlogSplit(sourceContext);optimizationthatskipthebinlogreadwhenthelowwatermarkequalshighwatermark如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的readfinalbooleanbinlogBackfillRequiredbackfillBinlogSplit。getEndingOffset()。isAfter(backfillBinlogSplit。getStartingOffset());if(!binlogBackfillRequired){dispatchHighWatermark(backfillBinlogSplit);currentTaskR}snapshot执行完成后,开始binlogReadTask的读取操作if(snapshotResult。isCompletedOrSkipped()){根据snapshotreadtask读取结束后,会记录高低水位,水位线作为参数构建binlogreadtaskfinalMySqlBinlogSplitReadTaskbackfillBinlogReadTaskcreateBackfillBinlogReadTask(backfillBinlogSplit);执行binlogreadtask,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlogreadtask中,会以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游backfillBinlogReadTask。execute(newSnapshotBinlogSplitChangeEventSourceContextImpl());}else{readExceptionnewIllegalStateException(String。format(Readsnapshotformysqlsplitsfail,currentSnapshotSplit));}}catch(Exceptione){currentTaskRLOG。error(String。format(Executesnapshotreadtaskformysqlsplitsfail,currentSnapshotSplit),e);readE}});}MySqlSnapshotSplitReadTask。execute(sourceContext)方法OverridepublicSnapshotResultexecute(ChangeEventSourceContextcontext)throwsInterruptedException{SnapshottingTasksnapshottingTaskgetSnapshottingTask(previousOffset);就是new了一个finalSnapshotCtry{ctxprepare(context);重新new了一个context对象,比较无用}catch(Exceptione){LOG。error(Failedtoinitializesnapshotcontext。,e);thrownewRuntimeException(e);}try{上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可returndoExecute(context,ctx,snapshottingTask);}catch(InterruptedExceptione){LOG。warn(Snapshotwasinterruptedbeforecompletion);}catch(Exceptiont){thrownewDebeziumException(t);}}MySqlSnapshotSplitReadTask。doExecute(sourceContext)方法OverrideprotectedSnapshotResultdoExecute(ChangeEventSourceContextcontext,SnapshotContextsnapshotContext,SnapshottingTasksnapshottingTask)throwsException{finalRelationalSnapshotChangeEventSource。RelationalSnapshotContextctx(RelationalSnapshotChangeEventSource。RelationalSnapshotContext)snapshotCctx。offsetoffsetC一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了finalSignalEventDispatchersignalEventDispatchernewSignalEventDispatcher(offsetContext。getPartition(),topicSelector。topicNameFor(snapshotSplit。getTableId()),dispatcher。getQueue());其实log输出的日志就已经很清晰了记录低水位finalBinlogOffsetlowWatermarkcurrentBinlogOffset(jdbcConnection);LOG。info(Snapshotstep1Determininglowwatermark{}forsplit{},lowWatermark,snapshotSplit);((SnapshotSplitReader。SnapshotSplitChangeEventSourceContextImpl)(context))。setLowWatermark(lowWatermark);signalEventDispatcher。dispatchWatermarkEvent(snapshotSplit,lowWatermark,SignalEventDispatcher。WatermarkKind。LOW);LOG。info(Snapshotstep2Snapshottingdata);读取数据主要方法重点介绍的地方createDataEvents(ctx,snapshotSplit。getTableId());记录高水位finalBinlogOffsethighWatermarkcurrentBinlogOffset(jdbcConnection);LOG。info(Snapshotstep3Determininghighwatermark{}forsplit{},highWatermark,snapshotSplit);signalEventDispatcher。dispatchWatermarkEvent(snapshotSplit,highWatermark,SignalEventDispatcher。WatermarkKind。HIGH);((SnapshotSplitReader。SnapshotSplitChangeEventSourceContextImpl)(context))。setHighWatermark(highWatermark);returnSnapshotResult。completed(ctx。offset);}我们看看createDataEvents调用过程privatevoidcreateDataEvents(RelationalSnapshotChangeEventSource。RelationalSnapshotContextsnapshotContext,TableIdtableId)throwsException{EventDispatcher。SnapshotReceiversnapshotReceiverdispatcher。getSnapshotChangeEventReceiver();LOG。debug(Snapshottingtable{},tableId);createDataEventsForTable(snapshotContext,snapshotReceiver,databaseSchema。tableFor(tableId));receiver的逻辑我们就不看了,我这里介绍一下就好receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻snapshotReceiver。completeSnapshot();}createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑privatevoidcreateDataEventsForTable(RelationalSnapshotChangeEventSource。RelationalSnapshotContextsnapshotContext,EventDispatcher。SnapshotReceiversnapshotReceiver,Tabletable)throwsInterruptedException{longexportStartclock。currentTimeInMillis();LOG。info(Exportingdatafromsplit{}oftable{},snapshotSplit。splitId(),table。id());构建sqlfinalStringselectSqlStatementUtils。buildSplitScanQuery(snapshotSplit。getTableId(),snapshotSplit。getSplitKeyType(),snapshotSplit。getSplitStart()null,snapshotSplit。getSplitEnd()null);LOG。info(Forsplit{}oftable{}usingselectstatement:{},snapshotSplit。splitId(),table。id(),selectSql);try(PreparedStatementselectStatementStatementUtils。readTableSplitDataStatement(创建statement,然后查询sqljdbcConnection,selectSql,snapshotSplit。getSplitStart()null,snapshotSplit。getSplitEnd()null,snapshotSplit。getSplitStart(),snapshotSplit。getSplitEnd(),snapshotSplit。getSplitKeyType()。getFieldCount(),connectorConfig。getQueryFetchSize());然后对查询出来的数据进行封装成sourceRecord发送下游ResultSetrsselectStatement。executeQuery()){ColumnUtils。ColumnArraycolumnArrayColumnUtils。toArray(rs,table);longrows0;Threads。TimerlogTimergetTableScanLogTimer();while(rs。next()){finalObject〔〕rownewObject〔columnArray。getGreatestColumnPosition()〕;for(inti0;icolumnArray。getColumns()。i){ColumnactualColumntable。columns()。get(i);row〔columnArray。getColumns()〔i〕。position()1〕readField(rs,i1,actualColumn,table);}if(logTimer。expired()){longstopclock。currentTimeInMillis();LOG。info(Exported{}recordsforsplit{}after{},rows,snapshotSplit。splitId(),Strings。duration(stopexportStart));snapshotProgressListener。rowsScanned(table。id(),rows);logTimergetTableScanLogTimer();}这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解dispatcher。dispatchSnapshotEvent(table。id(),getChangeRecordEmitter(snapshotContext,table。id(),row),就是new了一个snapshotReceiver);}LOG。info(Finishedexporting{}recordsforsplit{},totalduration{},rows,snapshotSplit。splitId(),Strings。duration(clock。currentTimeInMillis()exportStart));}catch(SQLExceptione){thrownewConnectException(Snapshottingoftabletable。id()failed,e);}}dispatcher。dispatchSnapshotEvent方法之后的流程进入evnentDisptcher。dispatchSnapshotEvent方法publicvoiddispatchSnapshotEvent(TdataCollectionId,ChangeRecordEmitterchangeRecordEmitter,SnapshotReceiverreceiver)throwsInterruptedException{DataCollectionSchemadataCollectionSchemaschema。schemaFor(dataCollectionId);if(dataCollectionSchemanull){errorOnMissingSchema(dataCollectionId,changeRecordEmitter);}changeRecordEmitter。emitChangeRecords(dataCollectionSchema,newReceiver(){OverridepublicvoidchangeRecord(DataCollectionSchemaschema,Operationoperation,Objectkey,Structvalue,OffsetContextoffset,ConnectHeadersheaders)throwsInterruptedException{eventListener。onEvent(dataCollectionSchema。id(),offset,key,value);真正的放入队列的逻辑在这里调用receiver使我们传入的对应BufferingSnapshotChangeRecordReceiver类receiver。changeRecord(dataCollectionSchema,operation,key,value,offset,headers);}});}BufferingSnapshotChangeRecordReceiver的changeRecord方法前面简单介绍过他的处理逻辑了,就不必多做介绍了OverridepublicvoidchangeRecord(DataCollectionSchemadataCollectionSchema,Operationoperation,Objectkey,Structvalue,OffsetContextoffsetContext,ConnectHeadersheaders)throwsInterruptedException{Objects。requireNonNull(value,valuemustnotbenull);LOGGER。trace(Receivedchangerecordfor{}operationonkey{},operation,key);if(bufferedEvent!null){queue。enqueue(bufferedEvent。get());}SchemakeySchemadataCollectionSchema。keySchema();StringtopicNametopicSelector。topicNameFor((T)dataCollectionSchema。id());therecordisproducedlazily,sotohavethecorrectoffsetaspertheprepostcompletioncallbacksbufferedEvent(){SourceRecordrecordnewSourceRecord(offsetContext。getPartition(),offsetContext。getOffset(),topicName,null,keySchema,key,dataCollectionSchema。getEnvelopeSchema()。schema(),value,null,headers);returnchangeEventCreator。createDataChangeEvent(record);};}
投诉 评论 转载

毛主席逝世后,李敏打电话给高智高叔叔,你为何还不来悼念爸爸1976年9月9日,伟大领袖毛主席永远离开了他深爱的人民,伟人长眠,举国悲恸,成千上万的百姓,乃至全世界的人民都沉浸在悲痛中,人们纷纷上京,要去参加毛主席的悼念活动,送别他们敬……唐山事件警示旁观者如何才敢见义勇为?6月10日凌晨两点,河北唐山某烧烤店,一黑衣男子进入餐厅转悠了一圈,走到几位女士桌前,不知嘴里说了句什么,并用手碰了那位白衣女子的背部。白衣女子回应了一句,你有病呀,并且……国内航空公司2498亿人民币购买欧洲飞机2022年7月1日,国航、东航、南航三大航空公司均宣布与欧洲空客公司签订协议,购买空客A320NEO系列飞机。据统计,三家航空公司,此次合计购买292架,总价格约为372。57……三国98马超素质三连戳,让曹操大喊吾命休矣,直言这仗不好打马超:曹贼,你也有脸提朝廷二字吗。你欺君罔上,杀我父兄。我誓当生擒汝,食汝肉寝汝皮。曹操:哈哈哈,怎么有这么多的人,想食吾肉,寝吾皮呢。视频加载中。。。随着曹……民主党派成员共话更加均衡协调的经济发展新格局图为2021年3月,在宁夏银川市永宁县闽宁镇,福建农林大学教授林占熺(右二)与农户交流菌草种植技术。(王鹏摄)中共二十大报告提出,促进区域协调发展,深入实施区域协调发展战……资治通鉴为何要治理中层阶级?【原文1】秋,匈奴入右北平、定襄,各数万骑,杀略千馀人。山东大水,民多饥乏。天子遣使者虚郡国仓廥以振贫民,犹不足,又募豪富吏民能假贷贫民者以名闻,尚不能相救。乃徙贫……香蕉人杨安泽称以亚裔的身份感到羞耻,以华人身份竞选美国总统俗话说,老乡见老乡,两眼泪汪汪,大家都是远在异国他乡求生的同胞,应该互相帮助。就算不帮忙,也不能在旁边冷嘲热讽。而杨安泽,则是一副高高在上的样子,对待华裔群体和亚裔群体,他的所……老故事新说生前哥俩好,死后却掘坟?你不知道的李世民和魏征【老故事新说】魏征,中国人尽皆知的一代贤臣,犯颜直谏。他与李世民的君臣之道,成为一段佳话,传颂至今。在他死后,唐太宗李世民十分痛苦惋惜,这才有了那有名的一段话:夫以铜为镜……把幸福到到万家演技出色的7位演员,放在一起看,名次就有了《幸福到万家》即将大结局,而这部剧中的几位演员,也确实只凭演技就让人印象深刻了。特别是这几位之前很多人都不熟悉的演员。包括饰演王庆来的唐曾,饰演王秀玉的林思意,以及……万字长文Flinkcdc源码精讲(推荐收藏)(四)SnapshotSplitReader。submitSplit方法publicvoidsubmitSplit(MySqlSplitmySqlSplit){this。curren……无锡到绩溪两日游多少钱?线路住宿如何安排比较合适?无锡到绩溪两日游,怎么游玩,那看您是自驾车还是高铁呢,那两种方式可以有不同游玩的方式。如果自驾游,首先开车全程也就300公里左右,走锡宜高速和宁绩高速,约4小时车程。……联合国报告预计全球人口11月15日将达80亿据联合国《世界人口展望2022》报告预计,到11月15日,全球人口将达到80亿。预计到2030年,全球人口将增长至85亿左右,2050年达到97亿,21世纪80年代达到约104……
猖獗造句用猖獗造句大全最短命首相纪录或被刷新,英国人要求苏纳克下台,原因是太有钱了我想改变工作的制度,我想创办个公司,让员工干完活就走,不想占MacPro非常好用的软件有哪些?新面板上线后百分比职业全部沦陷第剑蔡林记一天卖面碗大牌男装里的奇葩恶俗装盘点组图抠图技巧如何快速制作手写签名效果软文标题怎么写更有吸引力?身强造句用身强造句大全红岩的读书笔记与心得体会塔罗牌占卜你跟的感情真的无法挽回了吗
12岁女孩实现财务自由孩子,正确的努力,让一切皆有可能东莞民事律师父亲欠钱子女有义务还吗?父亲欠钱去世子女需要还钱 什么样的邮票收藏价值高不小心逆向行驶扣几分今日寒露,停止秋冻!这5条禁忌比进补更重要疯狂高三作文梦想公共图书馆图书宣传工作的困惑与策略怎么样才能快速锻炼肌肉呢长篇小说《女记者》第二部第十七章热评聚热点网 聊斋志异狐嫁女白话文故事内容腮红帮你解决各种脸部问题

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找江西南阳嘉兴昆明铜陵滨州广东西昌常德梅州兰州阳江运城金华广西萍乡大理重庆诸暨泉州安庆南充武汉辽宁