城市直播房产教育博客汽车
快传网
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

(3)sparkstreaming从kafka接入实时数据流

10月2日 夜如影投稿
  (1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:
  (2)方案说明:
  1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到
  2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理;
  3)将结果数据写入到
  4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台;
  5)在平台上通过拖拽式构建各种数据应用,数据展示;
  (3)代码演示:
  定义一个kafka生产者,模拟数据源packagecom。importcom。alibaba。fastjson。JSONOimportcom。pojo。WaterSimportorg。apache。kafka。clients。producer。KafkaPimportorg。apache。kafka。clients。producer。ProducerRimportorg。apache。kafka。clients。producer。RecordMimportjava。util。Pimportjava。util。RCreatedbyljon20220718。publicclassKafakaProducer{publicfinalstaticStringbootstrapServers127。0。0。1:9092;publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();设置Kafka服务器地址props。put(bootstrap。servers,bootstrapServers);设置数据key的序列化处理类props。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);设置数据value的序列化处理类props。put(value。serializer,org。apache。kafka。common。serialization。StringSerializer);KafkaProducerString,StringproducernewKafkaProducer(props);try{inti0;RandomrnewRandom();String〔〕lang{flink,spark,hadoop,hive,hbase,impala,presto,superset,nbi};while(true){Thread。sleep(2000);WaterSensorwaterSensornewWaterSensor(lang〔r。nextInt(lang。length)〕kafka,i,i);i;StringmsgJSONObject。toJSONString(waterSensor);System。out。println(msg);RecordMetadatarecordMetadataproducer。send(newProducerRecord(kafkadatawaterSensor,null,null,msg))。get();System。out。println(recordMetadata:{recordMetadata});}}catch(Exceptione){System。out。println(e。getMessage());}}}
  根据业务需要,定义各种消息对象packagecom。importjava。io。Simportjava。util。DCreatedbyljon20220713。publicclassWaterSensorimplementsSerializable{publicSpublicWaterSensor(){}publicWaterSensor(Stringid,longts,intvc){this。this。this。}publicintgetVc(){}publicvoidsetVc(intvc){this。}publicStringgetId(){}publicvoidsetId(Stringid){this。}publiclonggetTs(){}publicvoidsetTs(longts){this。}}
  sparkstreaming数据流计算packagecom。importcom。alibaba。fastjson。JSONOimportcom。pojo。WaterSimportorg。apache。kafka。clients。consumer。ConsumerRimportorg。apache。kafka。common。TopicPimportorg。apache。spark。SparkCimportorg。apache。spark。api。java。JavaRDD;importorg。apache。spark。api。java。function。Fimportorg。apache。spark。api。java。function。VoidFunction2;importorg。apache。spark。sql。Dimportorg。apache。spark。sql。Rimportorg。apache。spark。sql。SparkSimportorg。apache。spark。streaming。Dimportorg。apache。spark。streaming。Timportorg。apache。spark。streaming。api。java。JavaDSimportorg。apache。spark。streaming。api。java。JavaInputDSimportorg。apache。spark。streaming。api。java。JavaReceiverInputDSimportorg。apache。spark。streaming。api。java。JavaStreamingCimportorg。apache。spark。streaming。kafka010。ConsumerSimportorg。apache。spark。streaming。kafka010。KafkaUimportorg。apache。spark。streaming。kafka010。LocationSimportjava。util。;Createdbyljon20220718。publicclassSparkSqlKafka{privatestaticStringappNamespark。streaming。privatestaticStringmasterlocal〔〕;privatestaticStringtopicskafkadatawaterSprivatestaticStringbrokers127。0。0。1:9092;publicstaticvoidmain(String〔〕args){初始化sparkConfSparkConfsparkConfnewSparkConf()。setMaster(master)。setAppName(appName);获得JavaStreamingContextJavaStreamingContextsscnewJavaStreamingContext(sparkConf,Durations。minutes(3));设置ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a的级别:避免ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志a重复ssc。sparkContext()。setLogLevel(ERROR);CollectionStringtopicsSetnewHashSet(Arrays。asList(topics。split(,)));kafka相关参数,必要!缺了会报错MapString,ObjectkafkaParamsnewHashMap();kafkaParams。put(metadata。broker。list,brokers);kafkaParams。put(bootstrap。servers,brokers);kafkaParams。put(group。id,group1);kafkaParams。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);kafkaParams。put(key。deserializer,org。apache。kafka。common。serialization。StringDeserializer);kafkaParams。put(value。deserializer,org。apache。kafka。common。serialization。StringDeserializer);通过KafkaUtils。createDirectStream(。。。)获得kafka数据,kafka相关参数由kafkaParams指定JavaInputDStreamConsumerRecordObject,ObjectlinesKafkaUtils。createDirectStream(ssc,LocationStrategies。PreferConsistent(),ConsumerStrategies。Subscribe(topicsSet,kafkaParams));JavaDStreamWaterSensormapDStreamlines。map(newFunctionConsumerRecordObject,Object,WaterSensor(){OverridepublicWaterSensorcall(ConsumerRecordObject,Objects)throwsException{WaterSensorwaterSensorJSONObject。parseObject(s。value()。toString(),WaterSensor。class);returnwaterS}})。window(Durations。minutes(9),Durations。minutes(6));指定窗口大小和滑动频率必须是批处理时间的整数倍;mapDStream。foreachRDD(newVoidFunction2JavaRDDWaterSensor,Time(){Overridepublicvoidcall(JavaRDDWaterSensorwaterSensorJavaRDD,Timetime)throwsException{SparkSessionsparkJavaSparkSessionSingleton。getInstance(waterSensorJavaRDD。context()。getConf());DatasetRowdataFramespark。createDataFrame(waterSensorJavaRDD,WaterSensor。class);创建临时表dataFrame。createOrReplaceTempView(log);DatasetRowresultspark。sql(selectfromlog);System。out。println(time);输出前20条数据result。show();数据写入mysqlwriteDataToMysql(result);}});开始作业ssc。start();try{ssc。awaitTermination();}catch(Exceptione){e。printStackTrace();}finally{ssc。close();}}}
  NBI大数据可视化构建平台对接mysql,构建数据应用:
  NBI可视化
投诉 评论 转载

一只令我心生连串问号的鸟儿我家门外有一棵树,树顶刚好比我二楼房间的窗台略高一些。此时正值暮春,树似乎生怕辜负了春的热情和希望,所以长得枝繁叶茂。花若盛开,蝴蝶自来。也许是树的繁茂华美,每年春天的清……关于睡眠的诸多解答,睡眠不好的一定要看看人的一生有三分之一的时间在睡眠中度过,可见睡眠这件事情有多重要。良好的睡眠是心身健康的保障,可以说,其他的任何一种方式都不能代替睡眠。但是,面对睡眠一些常见的问题,却都很……甘肃省十五运群众组中国象棋围棋桥牌比赛第二日赛况集锦6月3日是甘肃省第十五届运动会群众组中国象棋、围棋、桥牌比赛的第二天,赛场上依旧战况激烈焦灼。中国象棋进行了第三、四、五轮的比赛;围棋进行了第三、四轮的比拼。桥牌则进行男子团体……嘉兴博物馆与班布里市博物馆线上文化交流活动圆满完成【来源:嘉兴市文化广电旅游局文旅要闻】12月17日下午,嘉兴博物馆会议室很是热闹,中英文自如切换,还不时有笑声响起。原来,现场正在举行一场连线直播活动,连接的另一端是远在……1400万人次围观热议,近500部魅力短视频参赛!2022东近日,由东营市文化和旅游局主办,东营周刊承办,东营区文化和旅游局、河口区文化和旅游局、垦利区文化和旅游局、广饶县文化和旅游局、利津县文化和旅游局、东营经济技术开发区管委会综合部……(3)sparkstreaming从kafka接入实时数据流(1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:(2)方案说明:1)我们通过kafka与各个业务系统的数……超越苦难,莫辜负父母给予的生命超越苦难,莫辜负父母给予的生命董连辉启一元复始,待四序更新。在这个悲风呼啸的寒冬,元旦暖阳无法带来温暖,随着慈母去世,我彻底成为没有根的孤儿游子。长城脚下,那座拥有……换新电池的特斯拉ModelY在美国上市,真比较好?根据外媒报道,日前美国特斯拉官网被发现有着疑似被称为StandardRangeAWD的ModelY车型代码,而且外媒指出该车型会采用4680电池组,等于搭载4680电池车型不再……虚增超9成利润模塑科技和高管集体收行政处罚来源:中国经济网模塑科技的信披违规案终于等来了处罚落地,12月27日晚间,模塑科技(000700。SZ)公告收到了行政处罚事先告知书。截至12月28日收盘,模塑科技股价为……肾病综合征,想要病情不反复,就要注意三点肾病综合征具有一定的复发率,有的病人复发,是因为疾病本身引起的,但也有部分患者的复发,是因为患者自身因素所导致的,肾病综合征反复发作,可能会加快疾病的进展,这也是很多患者所焦虑……电脑超简单之电脑如何远程开机ampampamp访问远程文件很多人肯定遇到过这样的问题,有时候需要访问家里电脑上的文件资料或者在家办公时需要访问公司电脑上的文件资料。这个时候如果可以远程开机并且访问到电脑上的文件资料那是一件多么好的事情……30万亿资产,迎来历史巨变,像极了2014年大牛市前夜前不久,高层对央企估值偏低的情况,发表了看法,认为这是市场的偏见,应该建立中国特色的估值体系。这番表态之后,中字头的央企股价大幅飙升,甚至连中国建筑这样万年乌龟行情的,都……
张天爱分手后,最新照大杀四方,美艳如她却翻车了?好身材就是嚣张的本钱!横野堇写真集,来个力求低谷大翻身沙漠枭雄,让万氏兄弟一秒带你穿越荒漠泰山队现在是七名外援有合同,但能确定下个赛季留队的是四名外援50!斯诺克四强诞生丁俊晖苦主横扫晋级,世界冠军爆冷出局ELLE风尚大典红毯娜扎晓彤唐嫣杨幂周笔畅谁更胜一筹?火线签约!朱芳雨拿下4221魔兽,杜锋掐辽宁队软肋,杨鸣被动说说我家阳的经历医疗类主动基金投资阶段总结爱得起放得下电子书市场做不下去了?s29免费皮肤曝光,有388点券巨赚,孙悟空传说帅哭,吕布玩
爱成殇痛依旧探究桂平挖蛇事件经过揭秘金色大蟒背后的故事房屋出租合同谁将成为德国新总理?吴亦凡都美竹蹭流量事件何时能停止七夕会美食白虎汤的妙用顽固咽喉炎哮喘持续状态的诊断方法出租租房合同发型也可以让你变得更美吴尊老婆私服挺显嫩,T恤配半裙远看似小姑娘,一看就挺受宠的蓝莓吃多了有什么害处?蓝莓可以一天吃一盒吗?

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找江西南阳嘉兴昆明铜陵滨州广东西昌常德梅州兰州阳江运城金华广西萍乡大理重庆诸暨泉州安庆南充武汉辽宁