🗒️FlinkCDC 创建元数据列流程
2023-10-24
| 2023-10-24
字数 826阅读时长 3 分钟
type
status
date
slug
summary
tags
category
icon
password
FlinkCDC 允许在一个任务中同步多张表,通过正则表达式的形式传入多表表名,通过这种方式实现了对分库分表的支持。但在一些场景下,我们还是希望知道每一条记录所属的库和表,这就需要使用到元数据列了。

使用方式

元数据列是虚拟列,即它在表格式中没有定义,但是可以通过一些信息提取出来,在 FlinkCDC 的 MySQL 插件中,允许定义下面三种元数据列:
Key
DataType
Description
table_name
STRING NOT NULL
Name of the table that contain the row.
database_name
STRING NOT NULL
Name of the database that contain the row.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
It indicates the time that the change was made in the database.If the record is read from snapshot of the table instead of the binlog, the value is always 0.
在使用的时候,需要加上特定的语法,例如:
这样,在查询时就会为每一行额外加上 db_name table_name operation_ts 三个字段。下游任务可以根据这三个字段,进行业务处理。

读取流程

首先,FlinkCDC 程序需要识别出哪些字段使用了元数据列,这需要通过解析 METADATA FROM VIRTUAL 实现,不过 Flink 已经提供了一个很简便的接口 SupportsReadingMetadataMySqlTableSource 只需要实现这个接口的以下方法就可以了:
  • listReadableMetadata
表明这个数据源插件支持哪些元数据字段。
  • applyReadableMetadata
保存查询表已经使用到了哪些元数据字段。metadataKeys 就包含了元数据字段的名字。
从 applyReadableMetadata 方法获取到了 metadataKeys,FlinkCDC 根据它生成一系列 MetadataConverter。
MetadataConverter 负责从数据中提取出元数据列信息,以 table_name 为例,对应的 MetadataConverter 实现如下。从 MySQL 的 MetadataConverter 实现可以看到,它是提取了 record 里的 SOURCE 和 TABLE_NAME_KEY 信息,所以实际上这些元数据列只能在处理 binlog 的过程中提取到。
MetadataConverterRowDataDebeziumDeserializeSchema 生效,RowDataDebeziumDeserializeSchema 是负责将每条数据处理成下游需要的格式的工具类。RowDataDebeziumDeserializeSchema 创建了一个 AppendMetadataCollector 类。
在发送数据时,可以看到 RowDataDebeziumDeserializeSchema 会根据是否包含元数据列决定是否使用 AppendMetadataCollector 类。
最后,看下 AppendMetadataCollector 类的 collect 方法:从 inputRecord 中生成新的列,并且结合之前的 physicalRow,构建出新的输出记录。

总结

得益于 Flink 的良好设计,为 FlinkCDC 数据源插件增加元数据列是不太困难的事,主要包括两个步骤:
1)实现 SupportsReadingMetadata 接口,定义和获取元数据字段;
2)定义从原始数据获取元数据的方法,在 RowDataDebeziumDeserializeSchema 类中解析出元数据列,并下发新的记录。
 
  • FlinkCDC
  • FLIP 中文翻译汇总Flink 网络流控与反压剖析
    Loading...