首页 >企业动态 > > 正文

今日热讯:大数据Flink进阶(十七):Apache Flink术语

腾讯云 2023-04-09 22:17:45

Apache Flink术语

Flink计算框架可以处理批数据也可以处理流式数据,Flink将批处理看成是流处理的一个特例,认为数据原本产生就是实时的数据流,这种数据叫做无界流(unbounded stream),无界流是持续不断的产生没有边界,批数据只是无界流中的一部分叫做有界流(bounded stream),针对无界流数据处理叫做实时处理,这种程序一般是7*24不间断运行的;针对有界流数据处理叫做批处理,这种程序处理完当前批数据就停止。下面我们结合一些代码介绍Flink中的一些重要的名词术语。

一、Application与Job

无论处理批数据还是处理流数据我们都可以使用Flink提供好的Operator(算子)来转换处理数据,一个完整的Flink程序代码叫做一个Flink Application,像前面章节我们编写的Flink读取Socket数据实时统计WordCount代码就是一个完整的Flink Application:


(资料图片仅供参考)

/** * 读取Socket数据进行实时WordCount统计 */public class SocketWordCount {    public static void main(String[] args) throws Exception {        //1.准备环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2.读取Socket数据        DataStreamSource ds = env.socketTextStream("node5", 9999);        //3.准备K,V格式数据        SingleOutputStreamOperator> tupleDS = ds.flatMap((String line, Collector> out) -> {            String[] words = line.split(",");            for (String word : words) {                out.collect(Tuple2.of(word, 1));            }        }).returns(Types.TUPLE(Types.STRING, Types.INT));        //4.聚合打印结果        tupleDS.keyBy(tp -> tp.f0).sum(1).print();        //5.execute触发执行        env.execute();    }}

一个完整的Flink Application一般由Source(数据来源)、Transformation(转换)、Sink(数据输出)三部分组成,Flink中一个或者多个Operator(算子)组合对数据进行转换形成Transformation,一个Flink Application 开始于一个或者多个Source,结束于一个或者多个Sink。

编写Flink代码要符合一定的流程,首先我们需要创建Flink的执行环境(Execution Environment),然后再加载数据源Source,对加载的数据进行Transformation转换,进而对结果Sink输出,最后还要执行env.execute()来触发整个Flink程序的执行,编写代码时将以上完整流程放在main方法中形成一个完整的Application。

一个Flink Application中可以有多个Flink Job,每次调用execute()或者executeAsyc()方法可以触发一个Flink Job ,一个Flink Application中可以执行多次以上两个方法来触发多个job执行。但往往我们在编写一个Flink Application时只需要一个Job即可。

二、DataFlow数据流图

一个Flink Job 执行时会按照Source、Transformatioin、Sink顺序来执行,这就形成了Stream DataFlow(数据流图),数据流图是整体展示Flink作业执行流程的高级视图,通过WebUI我们可以看到提交应用程序的DataFlow。

像之前提交的Flink 读取Socket数据实时统计WordCount在WebUI中形成的DataFlow如下,可以看到对应的Source、各个转换算子、Sink部分。

通常Operator算子和Transformation转换之间是一对一的关系,有时一个Transformation转换中包含多个Operator,形成一个算子链,这主要取决于数据之间流转关系和并行度是否相同,关于算子链内容在再做介绍。

三、Subtask子任务与并行度

在集群中运行Flink代码本质上是以并行和分布式方式来执行,这样可以提高处理数据的吞吐量和速度,处理一个Flink流过程中涉及多个Operator,每个Operator有一个或者多个Subtask(子任务),不同的Operator的Subtask个数可以不同,一个Operator有几个Subtask就代表当前算子的并行度(Parallelism)是多少,Subtask在不同的线程、不同的物理机或不同的容器中完全独立执行。

上图下半部分是多并行度DataFlow视图,Source、Map、KeyBy等操作有2个并行度,对应2个subtask分布式执行,Sink操作并行度为1,只有一个subtask,一共有7个Subtask,每个Subtask处理的数据也经常说成处理一个分区(Stream Partition)的数据。一个Flink Application的并行度通常认为是所有Operator中最大的并行度。上图中的Application并行度就为2。

Flink中并行度可以从以下四个层面指定:

Operator Level (算子层面)

算子层面设置并行度是给每个算子设置并行度,直接在算子后面调用.setparallelism()方法,写入并行度即可,只是针对当前算子有效,注意一些算子不能设置并行度,例如:keyBy 返回的对象是KeyedStream,这种分组操作无法设置并行度,socketTextStream是非并行source,只支持1个并行度,也不能设置并行度。

#算子层面设置并行度ds.flatMap(line=>{line.split(" ")}).setParallelism(2)
Execution Environment Level(执行环境层面)

执行环境层面设置并行度直接调用env.setParallelism()写入并行度即可,全局代码有效。

#执行环境层面设置并行度val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(3)
Client Level(客户端层面)

以上无论是算子层面还是执行环境层面设置并行度都会导致硬编码问题,修改并行度时不灵活,我们也可以在客户端提交Flink任务时通过指定命令参数-p来动态设置并行度,并行度作用于全局代码。

如果是基于WebUI提交任务,我们也可以基于WebUI指定并行度:

System Level(系统层面)

我们也可以直接在提交Flink任务的节点配置$FLINK_HOME/conf/flink-conf.yaml文件配置并行度,这个设置对于在客户端提交的所有任务有效,默认值为1。

#配置flink-conf.yaml文件parallelism.default: 5

以上四种不同方式指定Flink并行度的优先级为:Operator Level>Execution Environment Level>Client Level>System Level,本地编写代码时如果没有指定并行度,默认的并行度是当前机器的cpu core数。

四、Operator Chains 算子链

在Flink作业中,用户可以指定Operator Chains(算子链)将相关性非常强的算子操作绑定在一起,这样能够让转换过程上下游的Task数据处理逻辑由一个Task执行,进而避免因为数据在网络或者线程间传输导致的开销,减少数据处理延迟提高数据吞吐量。默认情况下,Flink开启了算子链。例如:下图流处理程序Source/map就形成了一个算子链,keyBy/window/apply形成了以算子链,分布式执行中原本需要多个task执行的情况由于有了算子链减少到由5个Subtask分布式执行即可。

我们在集群中提交Flink任务后,可以通过Flink WebUI中查看到形成的算子链:

那么在Flink中哪些算子操作可以合并在一起形成算子链进行优化?这主要取决于算子之间的并行度与算子之间数据传递的模式。一个数据流在算子之间传递数据可以是一对一(One-to-one)的模式传递,也可以是重分区(Redistributing)的模式传递,两者区别如下:

One-to-one

一对一传递模式(例如上图中的Source和map()算子之间)保留了元素的分区和顺序,类似Spark中的窄依赖。这意味着map()算子的subtask[1]处理的数据全部来自Source的subtask[1]产生的数据,并且顺序保持一致。例如:map、filter、flatMap这些算子都是One-to-one数据传递模式。

Redistributing

重分区模式(如上面的map()和keyBy/window之间,以及keyBy/window和Sink之间)改变了流的分区,这种情况下数据流向的分区会改变,类似于Spark中的宽依赖。每个算子的subtask将数据发送到不同的目标subtask,这取决于使用了什么样的算子操作,例如keyBy()是分组操作,会根据key的哈希值对数据进行重分区,再如,window/apply算子操作的并行度为2,流向了并行度为1的sink操作,这个过程需要通过rebalance操作将数据均匀发送到下游Subtask中。这些传输方式都是重分区模式(Redistributing)。

在Flink中One-to-one的算子操作且并行度一致,默认自动合并在一起形成一个算子链,由一个task执行对应逻辑。我们也可以通过代码禁用算子链或者进行细粒度的控制哪些算子可以合并形成算子链。

通过以下方式来禁用算子链
#禁用算子链StreamExecutionEnvironment.disableOperatorChaining()

编写代码,首先对数据进行过滤,然后进行转换操作,实时统计WordCount,代码中我们可以禁用算子链:

//1.准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.disableOperatorChaining();//2.读取Socket数据DataStreamSource ds = env.socketTextStream("node5", 9999);//3.对数据进行过滤SingleOutputStreamOperator filterDS = ds.filter(s -> s.startsWith("a"));//4.对数据进行单词切分SingleOutputStreamOperator wordDS = filterDS.flatMap((String line, Collector collector) -> {    String[] words = line.split(",");    for (String word : words) {        collector.collect(word);    }}).returns(Types.STRING);//5.对单词进行设置PairWordSingleOutputStreamOperator> pairWordDS =        wordDS.map(s -> new Tuple2<>(s, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));//6.统计单词SingleOutputStreamOperator> result = pairWordDS.keyBy(tp -> tp.f0).sum(1);//7.打印结果result.print();//8.execute触发执行env.execute();

禁用算子链之后,打包执行,提交任务:

#提交任务命令./flink run -m node1:8081 -p 2 -c com.lanson.flinkjava.code.chapter4.TestOperatorChain /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

我们禁用算子链之后再执行任务可以通过WebUI看到算子不再合并在一起执行,而是每个算子都由一个task执行。

默认开启算子链:

关闭算子链:

设置新的算子链
#从当前算子开始一个新的算子链someStream.filter(...).map(...).startNewChain().map(...);

以上是想从哪个算子开始新的算子链就在该算子后调用startNewChain()方法即可。修改代码:

//1.准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取Socket数据DataStreamSource ds = env.socketTextStream("node5", 9999);//3.对数据进行过滤SingleOutputStreamOperator filterDS = ds.filter(s -> s.startsWith("a"));//4.对数据进行单词切分SingleOutputStreamOperator wordDS = filterDS.flatMap((String line, Collector collector) -> {    String[] words = line.split(",");    for (String word : words) {        collector.collect(word);    }}).returns(Types.STRING);//5.对单词进行设置PairWordSingleOutputStreamOperator> pairWordDS =        wordDS.map(s -> new Tuple2<>(s, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).startNewChain();//6.统计单词SingleOutputStreamOperator> result = pairWordDS.keyBy(tp -> tp.f0).sum(1);//7.打印结果result.print();//8.execute触发执行env.execute();

查看WebUI,展示的算子链结果如下:

在算子上禁用算子链

如果我们不想关闭整体作业的算子链,只想关闭某些算子的算子链,我们可以在某个算子后调用disableChaining()方法来打断Flink自动合并算子链。

#打断算子链someStream.map(...).disableChaining();

向从哪个算子开始不再自动合并算子链就在该算子上调用disableChaining()方法。根据以上代码执行的结果,我们看到FaltMap和Map自动合并形成了算子链,我们可以在map算子后调用disableChaining来切断两者形成算子链:

//1.准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取Socket数据DataStreamSource ds = env.socketTextStream("node5", 9999);//3.对数据进行过滤SingleOutputStreamOperator filterDS = ds.filter(s -> s.startsWith("a"));//4.对数据进行单词切分SingleOutputStreamOperator wordDS = filterDS.flatMap((String line, Collector collector) -> {    String[] words = line.split(",");    for (String word : words) {        collector.collect(word);    }}).returns(Types.STRING).startNewChain();//5.对单词进行设置PairWordSingleOutputStreamOperator> pairWordDS =        wordDS.map(s -> new Tuple2<>(s, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).disableChaining();//6.统计单词SingleOutputStreamOperator> result = pairWordDS.keyBy(tp -> tp.f0).sum(1);//7.打印结果result.print();//8.execute触发执行env.execute();

在map算子上打断算子链,将以上代码打包执行,提交任务:

#提交任务命令./flink run -m node1:8081 -p 2 -c com.mashibing.flinkjava.code.chapter4.TestOperatorChain /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

查看WebUI,展示的算子链结果如下:

在Flink编程中默认开启算子链即可,如果遇到一些算子操作非常复杂,我们想让处理该业务逻辑的task独占cpu资源这时可以细粒度管理算子链,大多数情况选择让Flink默认划分算子链即可。

上一篇:朝阳街道组织开展主干道环境卫生整治行动-世界今日讯 下一篇:最后一页
x
推荐阅读

今日热讯:大数据Flink进阶(十七):Apache Flink术语

2023-04-09

朝阳街道组织开展主干道环境卫生整治行动-世界今日讯

2023-04-09

滚动:WTT新乡冠军赛开幕

2023-04-09

热门:全流程服务 政策找企“更快更准” 护航高新企业转型升级

2023-04-09

用好党建引领“指挥棒”,奏响宝山社区治理“和谐曲” 天天新要闻

2023-04-09

合同拒绝履行怎么办 世界热讯

2023-04-09

光大证券:深南电路(002916.SZ)2022年业绩持续增长 服务器市场带动长期成长-环球通讯

2023-04-09

2023年4月9日1卢布能换多少新币_天天新资讯

2023-04-09

“链动2+1模式”快速促进老用户与新用户的裂变-世界观热点

2023-04-09

年轻有为的近义词_年轻有为

2023-04-09

当前焦点!Hystrix请求合并的使用(一)

2023-04-08

送母亲的生日礼物排行榜_每日播报

2023-04-08

百事通!搭建青年创新创业的广阔平台——“互联网+”大学生创新创业大赛综述

2023-04-08

天天即时看!城市“昆虫家谱”邀你编写,上海自然博物馆启动公民科学项目

2023-04-08

【全球播资讯】紫牛拍咖丨紫藤花开,不能错过的音乐台春光

2023-04-08

苏科大凌晨4点发考研待录取通知并要求半小时内确认,校方:已电话通知考生

2023-04-08

立刻暂停!马斯克等千人紧急发声 A股最火赛道跳水 全球头条

2023-04-08

市场热闹消费不火,这个家电困局怎么破?_实时焦点

2023-04-08

开放共享 共创美好——写在第三届消博会即将开幕之际-重点聚焦

2023-04-08

人工智能机器人医疗股票(纪念币预约官网)

2023-04-08

山茄子 全球观天下

2023-04-08

ukiss官网_u kiss

2023-04-08

二年级体育教学计划进度表_二年级体育教学计划 全球新要闻

2023-04-07

海南矿业赴多地招聘高校毕业生 助力“百万人才进海南”

2023-04-07

辽宁省举办“心系中小微 助民企发展”用工保障专项行动专场招聘会-全球即时

2023-04-07

环球观察:嘉泽新能(601619):营业收入7.54亿元,与上期同比增加5.87%

2023-04-07

160架飞机!空中客车公司再签中国大订单

2023-04-07

《鱼和它的自行车》:疲惫生活中的英雄梦想|速看

2023-04-07

赓续友谊 开拓合作 | 滇越经贸合作奔赴美好前路_全球今日报

2023-04-07

十大不建议买的纯牛奶排行?纯牛奶好不好看什么指标

2023-04-07

当前热讯:北航2023年强基计划4月11日起网上报名

2023-04-07

每日聚焦:郭浩任中原银行党委书记

2023-04-07

实时焦点:云南杞麓湖治污弄虚作假案细节披露:副市长、专家、商人沆瀣一气

2023-04-07

1ml粉底液能用多久?

2023-04-07

首付款交了可以退房吗

2023-04-07

中公入局直播带货?网传账号暂未开通直播,或仍将聚焦职教 快看

2023-04-07

全球观点:我国移动网络IPv6流量首次突破50%

2023-04-07

直接税和间接税的例子_直接税和间接税_全球新视野

2023-04-07

每日动态!逆战女角色死亡动作视频_逆战女角色死亡动作

2023-04-07

冰箱耗电量怎么算_冰箱耗电

2023-04-07

证监会同意新莱福、经纬股份、普莱得、同星科技创业板IPO注册|环球今亮点

2023-04-06

【权威部门话开局】多项税收数据反映经济发展实现较好开局_天天新动态

2023-04-06

【当前热闻】西班牙国家德比进球榜,据433数据统计,梅西以2...

2023-04-06

头条焦点:长春高新: 关于2022年限制性股票与股票期权激励计划部分股票期权注销完成的公告

2023-04-06

“冠军”基金经理离职,英大基金何去何从?

2023-04-06

大兴安岭呼中区汇聚“税力量”构建共治“大网格”_天天快资讯

2023-04-06

成交环比增长超五成,3月深圳二手住宅过户量迎23个月来最高值 每日热讯

2023-04-06

新能源ETF(516160)年内份额扩容近6成续刷历史新高 近4亿资金加速净流入|世界观焦点

2023-04-06

一拳超人:最弱S级英雄,不是King和童帝,而是一身肌肉的他-天天观焦点

2023-04-06

【新要闻】欧文末节砍19分!东契奇:当我们最需要他的时候 他出现了

2023-04-06

加快建设乡村数字经济体系 世界讯息

2023-04-06

日创立对外军援新机制,首次明确背离禁止将国际援助用于军事目的规定

2023-04-06

美国滥施单边制裁阻碍国际人权事业发展

2023-04-06

常规赛全部结束 广东队锁定第二-全球速读

2023-04-06

四车道被挤成双车道 郑州多部门联合“会诊”普罗旺世停车乱

2023-04-06

驻美国使馆发言人就蔡英文窜美并会见美众议长麦卡锡发表谈话|全球快报

2023-04-06

高飞张杰伴奏_高飞张杰

2023-04-06

洛阳治毛囊炎的医院哪家好

2023-04-05

天天热文:关于礼物的环境描写

2023-04-05

快手小店怎么评价商品?快手小店如何添加商品?_重点聚焦

2023-04-05

过去24小时全国雷电活动15032次,湘粤赣三省占全国70%以上-环球今热点

2023-04-05

为减少打嗝、放屁!英国计划给奶牛喂甲烷抑制剂 每日快播

2023-04-05

张家港经开区(杨舍镇)塘市办事处:“敢”字当头,释放基层党建新动能 天天微动态

2023-04-05

起亚:争取2026年销售100万辆电动汽车_快播报

2023-04-05

程潇告诉我们:将白T恤折起来5cm,能展示小蛮腰,时尚好看!_当前热讯

2023-04-05

可研报告包括哪些内容_个人主页包括哪些内容-世界微动态

2023-04-05

当前快报:乳房多发结节怎么治疗_如何最佳治疗乳房多发结节

2023-04-05

嘴哥to勇士球迷:你们是全世界最棒的球迷 谢谢你们的爱和支持 每日资讯

2023-04-05

世界实时:15万特斯拉车型要来了 计划先产400万辆

2023-04-05

Steam新一周游戏销量榜:《生化危机4RE》卫冕,《渔帆暗涌》上榜

2023-04-05

每日信息:和黄医药涨超10% 启动HMPL-453治疗肝内胆管癌及赛沃替尼治疗胃癌患者招募

2023-04-05

廖文伟被查

2023-04-04

江苏盐城:以“力量之聚”成就“工业之强”-环球新视野

2023-04-04

天天快播:社区老年大学让“夕阳红”更灿烂

2023-04-04

有没有那么一首歌歌词是什么_有没有那么一首歌歌词|环球微资讯

2023-04-04

2023五一国内旅游好去处|环球热点

2023-04-04

大宗交易:垒知集团成交498.53万元,折价8.85%(04-04)

2023-04-04

意杯前瞻:半决赛上演国家德比 老妇人三线力争下赛季欧战资格

2023-04-04

沪指四连阳收复3300点 中字头股票表现活跃 当前看点

2023-04-04

山海英雄传:精卫的石子

2023-04-04

环球热点!读万卷书重要还是行千里路重要辩论-行千里路 读万卷书的解释 谁会呀

2023-04-04

每日消息!电势能与电场强度的关系_电势能

2023-04-04

全球头条:abab式的词语有哪些四字词语大全_abab式的词语有哪些

2023-04-04

突发!以色列凌晨突袭邻国首都_环球时讯

2023-04-04

暴雨蓝色预警:9省区有大到暴雨 江西福建局地大暴雨

2023-04-04

制造业数据不及预期 美债收益率周一下滑|速递

2023-04-04

今日观点!李宇嘉专栏丨“带押过户”的降成本效应 将助力商品房市场良性循环

2023-04-04

打印机怎么扫描京瓷(京瓷打印扫描一体机怎么扫描) 天天速看

2023-04-04

【T-ARA】《第九领娱》--- 第433章 小忙内大智慧_天天滚动

2023-04-04

被16家品牌紧急切割,张继科的商业版图「塌了」

2023-04-03

第二届中国国际培育钻石产业发展与创新大会将于5月25-26日在广州举办

2023-04-03

长春市开展青少年“清明祭英烈”系列活动

2023-04-03

浙江诸暨:“借”春天之约“开”发展大会“引”200亿回归 焦点

2023-04-03

环球热门:昆仑万维龙虎榜:三个交易日机构净卖出3.24亿元

2023-04-03

焦点热门:倾力守护候鸟生命线,“千年鸟道”一路平安——归程万里 湖南有爱

2023-04-03

当前关注:江西润达地产所持赣州润达企管1亿元股权遭冻结

2023-04-03

保障房电梯选哪个?罗湖安居答案是:居民说了算|全球热点评

2023-04-03

焦点报道:003航母再曝新照,烟囱怪异,核动力?

2023-04-03

古魂手游兑换码魂核抽卡配置推荐冲榜攻略|热门

2023-04-03

当前速递!宿松县司法局推进“合事点”建设

2023-04-03