【基础】Flink -- Fault Tolerance
创始人
2025-05-29 06:57:59

Flink -- Fault Tolerance

  • 检查点 Checkpoint
    • 检查点的保存
    • 从检查点恢复状态
    • 检查点算法
      • 检查点分界线 Barrier
      • 分布式快照算法
    • 检查点的配置
      • 检查点启用
      • 检查点存储
      • 其他高级配置
    • 保存点 Savepoint
  • 状态一致性
    • 端到端的状态一致性
  • 端到端的精确一次
    • 输入端
    • 输出端
    • Flink 和 Kafka 的精确一次

检查点 Checkpoint

在 Flink 当中,流处理当中的任务都是有状态的,为了实现对状态的快速读写,一般将状态存储在内存当中。但是当程序发生故障时,内存中存储的状态将全部丢失,这意味着之前计算的数据将全部丢失,同时会影响未来数据的整合计算。

为解决上述问题,Flink 提供了一种“存档”的机制,即检查点。当产生故障重启时,程序可以从检查点中读档,恢复故障之前的状态,继续数据的处理。

检查点的保存

检查点保存的关键是要等待所有任务将同一个数据源处理完毕,这样可以保证检查点存储的各个状态是一致的,从而避免上游状态计算了某数据而下游状态没有计算某数据的问题。

下面以统计词频的程序为例,描述检查点保存的流程:

在这里插入图片描述

  1. 源算子从数据源读取数据,将读取的偏移量作为算子状态记录;

  2. 数据经 map 算子转换为二元组(“hello”, 1)的形式,map 为无状态算子,因此直接向下游发送数据;

  3. 数据通过按键分区,最终由 sum 算子进行统计,并将每条数据处理后的结果作为状态存储;

  4. 当所有的任务都完成了对某一条数据的处理时,可以将当前的状态作为一个检查点写入外部存储;

从检查点恢复状态

在进行流式处理的过程中,Flink 会周期性的保存检查点,当程序发生故障时,需要找到最近一次的检查点来恢复状态。我们通过模拟一次故障,描述一下状态的恢复流程。

如下图所示,源算子已经处理了四个数据,在读取到第五条数据“world”时,源算子保存的偏移量置为 5,经 map 算子向下游传递至 sum 算子,在进行计算时 sum 出现故障,对“world”的处理结果未保存到状态当中。

在这里插入图片描述

从检查点恢复状态的流程如下:

  1. 重启应用,应用重启之后不存在任何状态;

    在这里插入图片描述

  2. 读取检查点,重置状态。查询最近的一次检查点中存储的状态进行恢复,假设最近一次检查点存储的状态是处理完前三个数据时的状态;

    在这里插入图片描述

  3. 重放数据。根据源算子存储的偏移量,向外部数据源重新提交偏移量,从当前检查点开始重新读取检查点到故障之间处理的数据,即第四第五条数据,进行数据的重放;

    在这里插入图片描述

  4. 继续进行数据处理,将重放数据以及新流入的数据按照既定任务逻辑处理即可,程序恢复完成;

    在这里插入图片描述

检查点算法

检查点算法主要用于解决何时触发检查点保存的问题的。如上文所描述的,应该在所有的任务都处理完同一条数据时触发检查点的保存,而关键的问题就是如何使不同的任务识别出一条相同的数据。

检查点分界线 Barrier

为解决上述的问题,Flink 提供了检查点分界线 Barrier 用于触发检查点的保存。其作用类似于水位线,即当需要触发检查点的保存时,就在数据流中插入一个特殊的数据结构。之后所有的任务在获取到这个分界线时,就意味着在该标识之前的所有数据均已经处理完毕,此时就会执行状态的持久化快照保存。

检查点的分界线将数据流进行了截流,在检查点之前处理的数据所导致的状态变化,都保存在当前检查点中;而在分界线之后处理的数据所导致的状态变化,都保存在下一个检查点中。

在这里插入图片描述

在 Flink 的 JobManager 中存在一个检查点协调器 checkpoint coordinator,其专门用于处理检查点的相关操作。检查点协调器会定期的向 TaskManager 发出指令,要求执行检查点的保存,并发送检查点的 ID。TaskManager 接收到指令,首先控制所有的源算子将当前的偏移量作为算子状态保存起来,并将携带者检查点 ID 的分界线插入当前数据流中并向下游传递,随后源算子可以正常的读取新数据并送入下游算子。

而对于下游的算子来说,在收到分界线之前,仍然正常的进行数据的处理;当接收到分界线时,触发对当前状态的持久化操作。这样就可以保证所有算子所保存的状态都是处理完相同数据的。

分布式快照算法

对于通过插入分界线来触发检查点存储的方法,在单一的流中,数据依次进行处理,顺序保持不变;但对于分布式流处理来说,很难保证数据按顺序执行处理。

为解决分布式处理中的问题,Flink 使用 Chandy-Lamport 算法的变体,即异步分界线快照算法 asynchronous barrier snapshotting。该算法的核心原则有两个:

  • 当单个上游任务向多个并行的下游任务发送 barrier 时,需要进行广播;

  • 当多个上游任务向同一个下游任务传递 barrier 时,下游任务需要执行分界线对齐,即等待所有上游任务的 barrier 都到齐,才能执行状态的保存;

检查点的配置

检查点启用

默认情况下 Flink 禁用检查点,若需要启用检查点则需要调用执行环境的enableCheckpointing()方法:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(5000L);

该方法需要传入一个长整型的毫秒数,表示周期性保存检查点的时间。检查点间隔时间的设置需要权衡处理性能和故障恢复速度:若希望对性能的影响更小,则应该设置较大的检查点间隔;若希望故障重启后迅速赶上实时的数据处理,则应该设置较小的检查点间隔。

检查点存储

检查点持久化存储的位置取决于检查点存储的设置。默认情况下检查段存储在 JobManager 的堆内存 heap 当中。

可以调用检查点配置的setCheckpointStorage()方法来指定,需要传入一个 CheckpointStorage 的实现类,Flink 为我们提供了两种 CheckpointStorage:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置检查点存储在 JobManager 的内存中
environment.getCheckpointConfig().setCheckpointStorage(
new JobManagerCheckpointStorage());
// 配置检查点存储到文件系统
environment.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

在实际的生产应用当中,一般会将检查点存储在高可用的分布式文件系统当中。

其他高级配置

通过检查点的配置对象可以进行更多配置:

  • 检查点模式 checkpointingMode:设置检查点一致性的保证级别,Flink 提供两种级别,分别为精确一次 exactly-once 和至少一次 at-least-once;

  • 超时时间 checkpointTimeout:用于指定检查点保存的超时时间,超时未完成的将会被丢弃。需要传入长整型的毫秒数作为参数,表示超时时间;

  • 最小间隔时间 minPauseBetweenCheckpoints:用于指定两次检查点触发的最小时间间隔。即使已经到达了周期性的触发时间,若两次触发时间间隔小于最小时间间隔,仍不能进行下一次检查点的保存,该参数的设置可以为正常数据的处理留下充足的时间。当指定该参数时,maxConcurrentCheckpoints 强制为 1;

  • 最大并发检查点数量 maxConcurrentCheckpoints:用于指定同时运行的检查点的数量。因为可能存在下游任务还未完成上一轮的检查点存储,上游任务就已经开启了新一轮的检查点存储;

  • 开启外部持久化存储 enableExternalizedCheckpoints:用于开启检查点的外部持久化,该设置默认在任务失败时不会自动清理外部存储,需手动清理。传入参数 ExternalizedCheckpointCleanup 可设置任务取消时外部检查点的清理模式。Flink 提供两种清理模式,即 DELETE_ON_CANCELLATION 模式(任务取消时自动删除外部检查点)和 RETAIN_ON_CANCELLATION 模式(任务取消时保留外部检查点);

  • 检查点异常时是否让整个任务失败 failOnCheckpointingErrors:用于指定检查点异常是是否让任务直接失败退出,默认为 true,若检查点失败则任务直接退出;

  • 不对齐检查点 enableUnalignedCheckpoints:不执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式必须为 exctly-once,并且并发的检查点个数为 1;

示例:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点,间隔时间为1s
environment.enableCheckpointing(1000L);
// 获取检查点设置对象
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
// 设置检查点模式为精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置检查点最小时间间隔
checkpointConfig.setMinPauseBetweenCheckpoints(500L);
// 设置检查点超时时间
checkpointConfig.setCheckpointTimeout(60000L);
// 设置检查点最大并发数量为 1
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后保留检查点
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 不执行检查点的分界线对齐操作
checkpointConfig.enableUnalignedCheckpoints();

保存点 Savepoint

除检查点外,Flink 还提供了另一个镜像保存的功能,即保存点 Savepoint。其原理以及算法与检查点完全一致,只是多保存了一些额外的元数据。

保存点与检查点最大的不同在于两者的触发机制。检查点是由 Flink 自动进行管理的,其定期触发,故障后自动读取并进行恢复,类似于游戏中的”自动存档“;而保存点不会自动创建,必须由用户手动触发,即”手动存档“。

保存点的适用场景包括:

  • 版本管理和归档存储;

  • 更新 Flink 版本,更新应用程序;

  • 调整并行度;

  • 暂停应用程序;

保存点需要使用命令行工具。为运行的作业创建一个保存点镜像,命令行如下:

bin/flink savepoint :jobId [:targetDirectory]

其中:

  • jobId 为要做经i选哪个保存的作业的 ID;

  • targetDirectory 为可选参数,表示保存点存储的路径;

对于保存点的存储路径,可以通过 flink-conf.yaml 中的 state.savepoints.dir 属性来设置:

state.savepoints.dir: hdfs:///flink/savepoints

同样的,也可以通过在程序代码中对单独的任务进行存储路径的配置:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

除了可以对正在运行的任务创建保存点,还可以在停止一个作业的同时创建保存点:

bin/flink stop --savepointPath [:targetDirectory] :jobId

从保存点重启一个应用的命令如下:

bin/flink run -s :savepointPath [:runArgs]

状态一致性

简单来说,一致性就是结果的正确性。在分布式系统中,一致性表现为在各个节点读取同一个数据得到的值是相同的;在事务中一致性则表现为事务提交之后,便能够读取到新的数据。

在 Flink 中,数据按流式处理,故在正常的处理过程当中处理结果肯定是正确的;但是如果程序发生故障,那么就需要通过检查点回滚状态来确保一致性。

笼统来讲,一致性存在三种级别:

  • 最多一次 AT-MOST-ONCE:每个数据最多被处理一次,即发生故障时直接重启,不进行任何操作,继续处理数据;

  • 至少一次 AT-LEAST-ONCE:每个数据最少被处理一次,即不会存在数据的丢失,可以确保所有的数据都被处理过,但不保证每条数据只被处理一次,数据可能被重复处理;

  • 精确一次 EXACTLY-ONCE:每个数据只被处理一次,可以保证处理结果的绝对正确;

端到端的状态一致性

在完整的流式处理当中,总体包括数据源、流处理算子以及外部存储系统三大部分,三者组合起来的完整应用的一致性就叫做“端到端的状态一致性”,其取决于三个组件中最弱的一环。

一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。

端到端的精确一次

在 Flink 中,存在检查点机制可以保持一致性,因此端到端的精确一次的关键点在于输入的数据源端和输出的外部存储端。

输入端

若想保证精确一次,则外部数据源必须拥有数据重放的能力,即对数据进行持久化保存,并且可以重置偏移量。这样在发生故障从检查点恢复状态后,便可以根据 Flink 源算子存储的偏移量状态重新读取未处理的数据。

输出端

若想保证精确一次,对输出端也有额外的要求,即需要在输出端避免数据的重复写入。能够保证精确一次一致性要求的写入方式有两种:

  • 幂等写入:幂等操作,即一个操作重复多次都只会导致一次更改。如数学中e的x方求导,无论求导几次,结果都是自身;

  • 事务写入:当 sink 算子获取到分界线时,在开启状态的保存的同时开启一个事务,此后所有的数据都将写入这个事务。当检查点保存完毕时,提交该事务,此时所有数据便可正常使用。若在保存状态时出现故障,状态将回退到上一个检查点同时回滚事务,此时写入到外部的数据便会被撤销。事务写入又能分为两种方式:

    • 预写日志 WAL:在检查点完成时将数据批量写入外部系统

      • 首先将结果数据作为日志状态进行持久化存储;

      • 进行检查点保存时同样对这些数据进行保存;

      • 检查点保存完成时,将所有结果一次性写入外部系统;

    • 两阶段提交 2PC:先进行预提交,再进行正式提交。Flink 提供了 TwoPhaseCommitSinkFunction 接口,方便我们自定义实现两阶段提交的 SinkFunction 的实现,提供了真正端到端的 exactly-once 保证。

      • 当收到第一条数据或分界线标志时,sink 算子开启一个事务;

      • 接下来收到的所有数据都通过事务的方式写入外部系统,此时数据已经写入外部系统但不可用,属于预提交状态;

      • 检查点保存完成时,正式提交事务,结果真正可用;

Flink 和 Kafka 的精确一次

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我
们可以在 Source 任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中。

Flink 官方实现的 Kafka 连接器中,提供了写入到 Kafka 的 FlinkKafkaProducer,它实现
了 TwoPhaseCommitSinkFunction 接口。

具体的流程如下所示:

在这里插入图片描述
在这里插入图片描述

在具体应用中,实现真正的端到端 exactly-once,还需要有一些额外的配置:

  1. 必须启用检查点;

  2. 在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE;

  3. 配置 Kafka 读取数据的消费者的隔离级别为 read_committed;

  4. Flink 的 Kafka连接器中配置的事务超时时间 transaction.timeout.ms 默认是 1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。两个超时时间,前者应该小于后者;
    要有一些额外的配置:

相关内容

热门资讯

欧佩克+同意7月再增产41.1... 为了增产惩罚超产国并争夺市场份额,欧佩克+连续第三个月大幅增产,美国页岩油生产商或首当其冲,美油一度...
更名!“天府证券”来了 天府证... 【导读】宏信证券更名为天府证券中国基金报记者 吴君这家券商,历史上第二次更名。5月末,工商信息显示,...
两家A股公司,收终止上市决定 ... 又有两家A股上市公司收到股票终止上市决定,6月10日进入退市整理期。*ST鹏博(600804)公告称...
瑞幸降价迈入“6块9”时代?瑞... 说起最近几年的咖啡茶饮市场,相信每个人都不会陌生,各家咖啡茶饮企业的各种降价消息是此起彼伏,就在最近...
主次节奏:6.1黄金 - 每周... 本文每周初更新发布梳理各级别走势分析和预期主次节奏:做有品质的三方服务黄金月线图(超长线) 月线图...
超400亿资金狂涌!这类ETF... 债券ETF市场持续扩容。今年以来,债券市场表现震荡,债券类基金回报远不及预期,但这并未妨碍债券型ET...
坚定信心 行稳致远(记者手记) 侯琳良 最近一段时间,海尔集团上世纪90年代投资制作的《海尔兄弟》动画片,在多个视频平台上线高清重制...
世纪大辩论2——哈耶克与凯恩斯... 本来节后决定启动一个项目,但家里临时有事,需要陪家人去一趟北京,节后拉群的事,因此要推迟一周左右(具...
4月广州消费品市场表现强劲 1-4月,随着消费品以旧换新等促消费政策持续发力和各类会展活动陆续开展,政策相关消费快速增长,升级类...
金价,又跌了! 人民财讯5月31日电,5月30日,COMEX黄金期货收跌0.92%,报3313.1美元/盎司。 从高...
10万吨改性项目!巴斯夫、金发... 【DT新材料】获悉,6月3日,沪市主板新股海阳科技将启动申购,上市在即! 资料显示,海阳科技前身为南...
湾财周报|大事记 比亚迪驳斥“... 一周大事记(5月26日-6月1日) 头条 比亚迪驳斥! 长城“车圈恒大论”是行业警示还是危言耸听?...
通源石油跌1.96%,成交额1... 5月30日,通源石油跌1.96%,成交额1.03亿元,换手率4.40%,总市值23.54亿元。 异动...
中国邮储银行浙江分行2025校... 点这里 ↑ 老满说高考 作者 l 老满 生涯规划师l 升学顾问l 拆书家 这是 老满说高考公众号 的...
公募基金规模首次突破33万亿元... 每经记者:肖芮冬 每经编辑:叶峰 天赐良基日报第654期 一、今日基金新闻速览 1、华润元大基金贾...
湾财周报 大事记 比亚迪驳斥“... 一周大事记(5月26日-6月1日)头条比亚迪驳斥!长城“车圈恒大论”是行业警示还是危言耸听?近日,关...
EL表达式JSTL标签库 EL表达式     EL:Expression Language 表达式语言     ...
关于测试,我发现了哪些新大陆 关于测试 平常也只是听说过一些关于测试的术语,但并没有使用过测试工具。偶然看到编程老师...
工信部、中汽协紧急发声!汽车“... 文/刘育英新一轮汽车价格战再起。近日,工信部、中汽协纷纷发声表示反对。工业和信息化部表示,将加大对汽...
3 ROS1通讯编程提高(1) 3 ROS1通讯编程提高3.1 使用VS Code编译ROS13.1.1 VS Code的安装和配置...
募资39亿,全亏光了,账上不到... 关于天然气,用户的感觉是价格一直在上涨,但很奇怪,不管怎么涨,天然气企业仍然亏,还亏得一塌糊涂。这是...
资阳房产评估公司 这是(tel-15828298733)整理的信息,希望能帮助到大家 在当今社会,随着经济的发展和城...
华桥汇利(中国)投资基金管理有... 今年第一季度,美国企业利润出现大幅下降,且面临着来自关税上升的持续压力,这一局面可能会在今年进一步加...
ESG 报告合规与鉴证:全球政... 在当下全球经济格局里,ESG(环境、社会和公司治理)已然成为衡量企业可持续发展能力的关键指标。随着全...
【Unity 手写PBR】Bu... 写在前面 前期积累: GAMES101作业7提高-实现微表面模型你需要了解的知识 【技...
与锤巨子生物的大嘴博士持股同一... 医美龙头巨子生物“成分争议”风波持续发酵。日前,美妆博主大嘴博士(香港大学化学博士郝宇)发文,质疑巨...
Linux之进程间通信 目录 进程间通信介绍 一、为什么要进行进程间通信? 二、进程间通信目的 三、进程间通信...
从“造城”到“留客”,文旅局长... 你有没有刷到最近各地文旅局局长全体“尬舞”的视频?领导们放下架子开始跳魔性舞蹈,这场舞的背后啊,可不...
Hazel引擎学习(十一) 我自己维护引擎的github地址在这里,里面加了不少注释,有需要的可以看...
孩子的教育金,分享3个「有效」... 点击 “简七读财” ,发送消息“ 理财小工具 ”免费领取“40个赚钱工具资源包”晚上好,我是简七编...