type
status
date
slug
summary
tags
category
icon
password
在 Flink 1.19 引进的特性。
背景描述
在 Apache Flink 的某些使用场景中,一个 Flink 作业可能会首先处理一段有界的积压数据流,然后再处理无限的数据流。在这两个阶段,用户对检查点(Checkpoint)频率的要求有所不同。
例如,一个作业可能希望从由 HDFS Source 和 Kafka Source 组成的 HybridSource 中消费数据,并使用 Paimon Sink(或任何具有"精确一次"语义的 Sink,其吞吐量随着检查点间隔减小而增加)来生成数据。用户可能希望在第一个阶段每 30 分钟触发一次检查点,以便在故障恢复后最多处理 30 分钟的工作; 而在第二阶段希望每 30 秒触发一次检查点,以便 Sink 每 30 秒将数据提交给下游应用程序(保证数据的延迟)。
目前很难满足这种需求,因为用户只能配置一个静态的全局检查点间隔。如果在上述示例中将检查点间隔设置为 30 秒,那么由于不必要的高频检查点,Flink 作业在第一个阶段的吞吐量会很低。而如果将检查点间隔设置为 30 分钟,在第二阶段生成记录的延迟又会远高于预期。
在这个 FLIP(Flink Improvement Proposal)中,提议允许 Source 算子报告它是否正在处理积压数据,并允许用户指定一个较长的检查点间隔,当任何 Source 正在处理积压数据时,可以使用这个间隔。
这也适用于 Source 是 MySql CDC 的场景。MySQL CDC 内部有两个阶段,分别是从有界快照读取和从无限 binlog 读取。读取快照时通常不需要低延迟。因此,我们希望支持用户为这两个阶段指定不同的检查点间隔。
核心概念
引入一个名为
IsProcessingBacklog
的一级概念。定义
每个源都有一个属性,名为
isProcessingBacklog
,它是一个布尔值,可以在作业执行期间动态变化。每条记录(逻辑上)也有一个属性,名为 isBacklog
,这是一个布尔值,在记录生成时确定。如果记录是在 Source 的 isProcessingBacklog
为 true
时生成的,或者用于派生此记录(通过操作符)的某些记录的 isBacklog
为 true
,那么此记录的 isBacklog
应该为 true
。否则,记录的 isBacklog
应该为 false
。注意:我们不会将
isBacklog
添加到每个物理行/记录/事件。每条记录的 isBacklog
信息将通过 RecordAttributes
(将在 FLIP-327 中引入)进行传播,因此其开销将远低于 WaterMark,假设 isBacklog
状态不会频繁变化。语义
isBacklog
信息有效地告知 Flink 运行时,记录是否应该以低处理延迟生成和发送。在流模式中,如果记录的
isBacklog = false
,则应生成和发送记录,并保持低处理延迟,除非 Flink 运行时另有指示(例如,由于两阶段提交 Sink 的检查点触发或执行端到端延迟 > 0)。如果记录的 isBacklog = true
,则此记录没有处理延迟要求,这意味着操作符可以任意长时间地缓存此记录,直到输入结束。设置 isProcessingBacklog = true
对 Source 算子的原则:
源应在记录允许以任意长的延迟处理时设置
isProcessingBacklog = true
。这可能是因为源中的记录已经是历史数据,或者因为 Flink 作业本身滞后,端到端延迟受吞吐量而非每记录处理延迟的瓶颈影响。一些示例:
- MySQL CDC 源将从有界的快照记录流读取,然后再读取无限的 binlog 记录流。快照记录概念上被认为是旧的,不需要低延迟处理。因此,当读取快照时,MySQL CDC 源可以将
isProcessingBacklog
设置为true
;读取 binlog 时设置为false
。
- 一个 Flink 作业从一个 Kafka 主题的 5 天前的偏移量开始读取记录。没有必要以低延迟处理 5 天前的记录。因此,Kafka 源可以允许用户指定一个阈值(例如 5 分钟),并在
current_system_time - watermark > threshold
时设置isProcessingBacklog
为true
。
对非 Source 算子设置 isBacklog = true
原则:
操作符应当发出
RecordAttributes(isBacklog=true)
(在 FLIP-327 中引入),前提是其发出的记录允许以任意长的延迟处理。如果操作符的所有输入的 isBacklog
值为 true
,则其输出记录的 isBacklog
值应为 true
。否则,如果所有输入的 isBacklog
值为 false
,则其输出记录的 isBacklog
值应为 false
。否则,由操作符根据其具体日志确定其输出记录的 isBacklog
值。一些示例:
- 一个 map 算子应当发出
RecordAttributes(isBacklog=true)
,前提是从其输入处接收到RecordAttributes(isBacklog=true)
。
- 一个 join 算子应当发出
RecordAttributes(isBacklog=true)
,前提是从其任一输入接收到RecordAttributes(isBacklog=true)
。这是因为 join 生成的记录只有在两侧的记录都"低延迟"时才是"低延迟"的。
- 一个 union 算子应当发出
RecordAttributes(isBacklog=true)
,前提是从其所有输入接收到RecordAttributes(isBacklog=true)
。这是因为如果任何输入有"低延迟"记录,union 操作符仍然能够输出"低延迟"记录。
传播 isProcessingBacklog/isBacklog
信息:
isBacklog
信息应由 Source 算子生成,并通过 RecordAttributes
传播到作业图中的其他各处(类似于 WaterMark 的传播方式)。每个操作符应基于其输入的 isBacklog
状态来确定其输出记录的 isBacklog
状态,具体规则和原则如上所述。新增接口
- 引入一个新概念,名为
isProcessingBacklog
,并添加Source 算子在作业执行期间报告其isProcessingBacklog
值的 API。
- 定义积压:一个作业在且仅在其任何一个 Source 报告
isProcessingBacklog=true
时被认为处于积压状态。一个源在且仅在其发出的记录不需要下游操作符优化处理延迟时报告isProcessingBacklog=true
。当一个源报告isProcessingBacklog=true
时,它有效地允许下游操作符专注于吞吐量而不是中间结果的延迟。是否利用此信息由下游操作符决定。例如,一个聚合操作符可以优化吞吐量,通过在处理和发出结果之前缓冲/排序接收到的记录。
例如,HybridSource 在执行其第一个 Source 时应报告
isProcessingBacklog=true
;MySQL CDC Source 在快照阶段应报告 isProcessingBacklog=true
。- 添加作业配置参数
execution.checkpointing.interval-during-backlog
: - 配置参数:
- 名称:
execution.checkpointing.interval-during-backlog
- 类型:
Duration
- 默认值:
null
- 描述: 如果不为 null 且任何 Source 设置
isProcessingBacklog=true
,则这是定期调度检查点的间隔。检查点触发可能会因execution.checkpointing.max-concurrent-checkpoints
和execution.checkpointing.min-pause
设置而被延迟。注意,如果不为 null,则值必须为 0(表示在积压状态期间禁用检查点)或大于等于execution.checkpointing.interval
。
- 更新
execution.checkpointing.interval
的描述,说明它可以被execution.checkpointing.interval-during-backlog
覆盖。
- 更新描述:
- 获取定期调度检查点的间隔。此设置定义基线间隔。检查点触发可能会因
execution.checkpointing.max-concurrent-checkpoints
、execution.checkpointing.min-pause
以及execution.checkpointing.interval-during-backlog
设置而被延迟。
变更
- 更新
CheckpointCoordinator
,在存在任何设置isProcessingBacklog=true
的 Source时,使用execution.checkpointing.interval-during-backlog
作为检查点间隔。注意,如果 Source 算子多次设置isProcessingBacklog
值,则使用最后一次设置的值。
- 更新
HybridSource
,基于正在运行的 Source 报告isProcessingBacklog
。假设HybridSource
由 N 个 Source 组成,其中 N >= 2。它应在开始运行第一个 Source 时报告isProcessingBacklog=true
。在开始运行最后一个 Source 时,应设置isProcessingBacklog=false
。
- 更新社区维护的其他 Source(例如 MySQL CDC Source),以适当地设置
isProcessingBacklog
。例如,MySQL CDC Source 在全量阶段开始时应设置isProcessingBacklog=true
。在增量阶段开始时,应设置isProcessingBacklog=false
。
配置示例
假设一个作业需要从由 HDFS Source 和 Kafka Source组成的 HybridSource (或 MySql CDC Source)消费数据,并将数据生成到 Paimon Sink(或任何具有"精确一次"语义的 Sink)。用户希望在第一个阶段每 30 分钟触发一次检查点,以便在故障恢复后最多处理 30 分钟的工作; 而在第二阶段希望每 30 秒触发一次检查点,以便 Sink 每 30 秒将数据提交给下游应用程序(保证数据的低延迟)。
这个用例可以通过在
flink-conf.yaml
中使用以下配置来解决:参考资料: https://apppejqdt9q1797.h5.xiaoeknow.com/p/course/ecourse/course_2ZTFDp7b0NmtssQtobnZkCbM0rv 流批融合部分的第一节分享