在 Flink 当中,流处理当中的任务都是有状态的,为了实现对状态的快速读写,一般将状态存储在内存当中。但是当程序发生故障时,内存中存储的状态将全部丢失,这意味着之前计算的数据将全部丢失,同时会影响未来数据的整合计算。
为解决上述问题,Flink 提供了一种“存档”的机制,即检查点。当产生故障重启时,程序可以从检查点中读档,恢复故障之前的状态,继续数据的处理。
检查点保存的关键是要等待所有任务将同一个数据源处理完毕,这样可以保证检查点存储的各个状态是一致的,从而避免上游状态计算了某数据而下游状态没有计算某数据的问题。
下面以统计词频的程序为例,描述检查点保存的流程:
源算子从数据源读取数据,将读取的偏移量作为算子状态记录;
数据经 map 算子转换为二元组(“hello”, 1)的形式,map 为无状态算子,因此直接向下游发送数据;
数据通过按键分区,最终由 sum 算子进行统计,并将每条数据处理后的结果作为状态存储;
当所有的任务都完成了对某一条数据的处理时,可以将当前的状态作为一个检查点写入外部存储;
在进行流式处理的过程中,Flink 会周期性的保存检查点,当程序发生故障时,需要找到最近一次的检查点来恢复状态。我们通过模拟一次故障,描述一下状态的恢复流程。
如下图所示,源算子已经处理了四个数据,在读取到第五条数据“world”时,源算子保存的偏移量置为 5,经 map 算子向下游传递至 sum 算子,在进行计算时 sum 出现故障,对“world”的处理结果未保存到状态当中。
从检查点恢复状态的流程如下:
重启应用,应用重启之后不存在任何状态;
读取检查点,重置状态。查询最近的一次检查点中存储的状态进行恢复,假设最近一次检查点存储的状态是处理完前三个数据时的状态;
重放数据。根据源算子存储的偏移量,向外部数据源重新提交偏移量,从当前检查点开始重新读取检查点到故障之间处理的数据,即第四第五条数据,进行数据的重放;
继续进行数据处理,将重放数据以及新流入的数据按照既定任务逻辑处理即可,程序恢复完成;
检查点算法主要用于解决何时触发检查点保存的问题的。如上文所描述的,应该在所有的任务都处理完同一条数据时触发检查点的保存,而关键的问题就是如何使不同的任务识别出一条相同的数据。
为解决上述的问题,Flink 提供了检查点分界线 Barrier 用于触发检查点的保存。其作用类似于水位线,即当需要触发检查点的保存时,就在数据流中插入一个特殊的数据结构。之后所有的任务在获取到这个分界线时,就意味着在该标识之前的所有数据均已经处理完毕,此时就会执行状态的持久化快照保存。
检查点的分界线将数据流进行了截流,在检查点之前处理的数据所导致的状态变化,都保存在当前检查点中;而在分界线之后处理的数据所导致的状态变化,都保存在下一个检查点中。
在 Flink 的 JobManager 中存在一个检查点协调器 checkpoint coordinator,其专门用于处理检查点的相关操作。检查点协调器会定期的向 TaskManager 发出指令,要求执行检查点的保存,并发送检查点的 ID。TaskManager 接收到指令,首先控制所有的源算子将当前的偏移量作为算子状态保存起来,并将携带者检查点 ID 的分界线插入当前数据流中并向下游传递,随后源算子可以正常的读取新数据并送入下游算子。
而对于下游的算子来说,在收到分界线之前,仍然正常的进行数据的处理;当接收到分界线时,触发对当前状态的持久化操作。这样就可以保证所有算子所保存的状态都是处理完相同数据的。
对于通过插入分界线来触发检查点存储的方法,在单一的流中,数据依次进行处理,顺序保持不变;但对于分布式流处理来说,很难保证数据按顺序执行处理。
为解决分布式处理中的问题,Flink 使用 Chandy-Lamport 算法的变体,即异步分界线快照算法 asynchronous barrier snapshotting。该算法的核心原则有两个:
当单个上游任务向多个并行的下游任务发送 barrier 时,需要进行广播;
当多个上游任务向同一个下游任务传递 barrier 时,下游任务需要执行分界线对齐,即等待所有上游任务的 barrier 都到齐,才能执行状态的保存;
默认情况下 Flink 禁用检查点,若需要启用检查点则需要调用执行环境的enableCheckpointing()
方法:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(5000L);
该方法需要传入一个长整型的毫秒数,表示周期性保存检查点的时间。检查点间隔时间的设置需要权衡处理性能和故障恢复速度:若希望对性能的影响更小,则应该设置较大的检查点间隔;若希望故障重启后迅速赶上实时的数据处理,则应该设置较小的检查点间隔。
检查点持久化存储的位置取决于检查点存储的设置。默认情况下检查段存储在 JobManager 的堆内存 heap 当中。
可以调用检查点配置的setCheckpointStorage()
方法来指定,需要传入一个 CheckpointStorage 的实现类,Flink 为我们提供了两种 CheckpointStorage:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置检查点存储在 JobManager 的内存中
environment.getCheckpointConfig().setCheckpointStorage(
new JobManagerCheckpointStorage());
// 配置检查点存储到文件系统
environment.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
在实际的生产应用当中,一般会将检查点存储在高可用的分布式文件系统当中。
通过检查点的配置对象可以进行更多配置:
检查点模式 checkpointingMode:设置检查点一致性的保证级别,Flink 提供两种级别,分别为精确一次 exactly-once 和至少一次 at-least-once;
超时时间 checkpointTimeout:用于指定检查点保存的超时时间,超时未完成的将会被丢弃。需要传入长整型的毫秒数作为参数,表示超时时间;
最小间隔时间 minPauseBetweenCheckpoints:用于指定两次检查点触发的最小时间间隔。即使已经到达了周期性的触发时间,若两次触发时间间隔小于最小时间间隔,仍不能进行下一次检查点的保存,该参数的设置可以为正常数据的处理留下充足的时间。当指定该参数时,maxConcurrentCheckpoints 强制为 1;
最大并发检查点数量 maxConcurrentCheckpoints:用于指定同时运行的检查点的数量。因为可能存在下游任务还未完成上一轮的检查点存储,上游任务就已经开启了新一轮的检查点存储;
开启外部持久化存储 enableExternalizedCheckpoints:用于开启检查点的外部持久化,该设置默认在任务失败时不会自动清理外部存储,需手动清理。传入参数 ExternalizedCheckpointCleanup 可设置任务取消时外部检查点的清理模式。Flink 提供两种清理模式,即 DELETE_ON_CANCELLATION 模式(任务取消时自动删除外部检查点)和 RETAIN_ON_CANCELLATION 模式(任务取消时保留外部检查点);
检查点异常时是否让整个任务失败 failOnCheckpointingErrors:用于指定检查点异常是是否让任务直接失败退出,默认为 true,若检查点失败则任务直接退出;
不对齐检查点 enableUnalignedCheckpoints:不执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式必须为 exctly-once,并且并发的检查点个数为 1;
示例:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点,间隔时间为1s
environment.enableCheckpointing(1000L);
// 获取检查点设置对象
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
// 设置检查点模式为精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置检查点最小时间间隔
checkpointConfig.setMinPauseBetweenCheckpoints(500L);
// 设置检查点超时时间
checkpointConfig.setCheckpointTimeout(60000L);
// 设置检查点最大并发数量为 1
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后保留检查点
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 不执行检查点的分界线对齐操作
checkpointConfig.enableUnalignedCheckpoints();
除检查点外,Flink 还提供了另一个镜像保存的功能,即保存点 Savepoint。其原理以及算法与检查点完全一致,只是多保存了一些额外的元数据。
保存点与检查点最大的不同在于两者的触发机制。检查点是由 Flink 自动进行管理的,其定期触发,故障后自动读取并进行恢复,类似于游戏中的”自动存档“;而保存点不会自动创建,必须由用户手动触发,即”手动存档“。
保存点的适用场景包括:
版本管理和归档存储;
更新 Flink 版本,更新应用程序;
调整并行度;
暂停应用程序;
保存点需要使用命令行工具。为运行的作业创建一个保存点镜像,命令行如下:
bin/flink savepoint :jobId [:targetDirectory]
其中:
jobId 为要做经i选哪个保存的作业的 ID;
targetDirectory 为可选参数,表示保存点存储的路径;
对于保存点的存储路径,可以通过 flink-conf.yaml 中的 state.savepoints.dir 属性来设置:
state.savepoints.dir: hdfs:///flink/savepoints
同样的,也可以通过在程序代码中对单独的任务进行存储路径的配置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
除了可以对正在运行的任务创建保存点,还可以在停止一个作业的同时创建保存点:
bin/flink stop --savepointPath [:targetDirectory] :jobId
从保存点重启一个应用的命令如下:
bin/flink run -s :savepointPath [:runArgs]
简单来说,一致性就是结果的正确性。在分布式系统中,一致性表现为在各个节点读取同一个数据得到的值是相同的;在事务中一致性则表现为事务提交之后,便能够读取到新的数据。
在 Flink 中,数据按流式处理,故在正常的处理过程当中处理结果肯定是正确的;但是如果程序发生故障,那么就需要通过检查点回滚状态来确保一致性。
笼统来讲,一致性存在三种级别:
最多一次 AT-MOST-ONCE:每个数据最多被处理一次,即发生故障时直接重启,不进行任何操作,继续处理数据;
至少一次 AT-LEAST-ONCE:每个数据最少被处理一次,即不会存在数据的丢失,可以确保所有的数据都被处理过,但不保证每条数据只被处理一次,数据可能被重复处理;
精确一次 EXACTLY-ONCE:每个数据只被处理一次,可以保证处理结果的绝对正确;
在完整的流式处理当中,总体包括数据源、流处理算子以及外部存储系统三大部分,三者组合起来的完整应用的一致性就叫做“端到端的状态一致性”,其取决于三个组件中最弱的一环。
一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。
在 Flink 中,存在检查点机制可以保持一致性,因此端到端的精确一次的关键点在于输入的数据源端和输出的外部存储端。
若想保证精确一次,则外部数据源必须拥有数据重放的能力,即对数据进行持久化保存,并且可以重置偏移量。这样在发生故障从检查点恢复状态后,便可以根据 Flink 源算子存储的偏移量状态重新读取未处理的数据。
若想保证精确一次,对输出端也有额外的要求,即需要在输出端避免数据的重复写入。能够保证精确一次一致性要求的写入方式有两种:
幂等写入:幂等操作,即一个操作重复多次都只会导致一次更改。如数学中e的x方求导,无论求导几次,结果都是自身;
事务写入:当 sink 算子获取到分界线时,在开启状态的保存的同时开启一个事务,此后所有的数据都将写入这个事务。当检查点保存完毕时,提交该事务,此时所有数据便可正常使用。若在保存状态时出现故障,状态将回退到上一个检查点同时回滚事务,此时写入到外部的数据便会被撤销。事务写入又能分为两种方式:
预写日志 WAL:在检查点完成时将数据批量写入外部系统
首先将结果数据作为日志状态进行持久化存储;
进行检查点保存时同样对这些数据进行保存;
检查点保存完成时,将所有结果一次性写入外部系统;
两阶段提交 2PC:先进行预提交,再进行正式提交。Flink 提供了 TwoPhaseCommitSinkFunction 接口,方便我们自定义实现两阶段提交的 SinkFunction 的实现,提供了真正端到端的 exactly-once 保证。
当收到第一条数据或分界线标志时,sink 算子开启一个事务;
接下来收到的所有数据都通过事务的方式写入外部系统,此时数据已经写入外部系统但不可用,属于预提交状态;
检查点保存完成时,正式提交事务,结果真正可用;
输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我
们可以在 Source 任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中。
Flink 官方实现的 Kafka 连接器中,提供了写入到 Kafka 的 FlinkKafkaProducer,它实现
了 TwoPhaseCommitSinkFunction 接口。
具体的流程如下所示:
在具体应用中,实现真正的端到端 exactly-once,还需要有一些额外的配置:
必须启用检查点;
在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE;
配置 Kafka 读取数据的消费者的隔离级别为 read_committed;
Flink 的 Kafka连接器中配置的事务超时时间 transaction.timeout.ms 默认是 1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。两个超时时间,前者应该小于后者;
要有一些额外的配置: