type
status
date
slug
summary
tags
category
icon
password
FlinkCDC 允许在一个任务中同步多张表,通过正则表达式的形式传入多表表名,通过这种方式实现了对分库分表的支持。但在一些场景下,我们还是希望知道每一条记录所属的库和表,这就需要使用到元数据列了。
使用方式
元数据列是虚拟列,即它在表格式中没有定义,但是可以通过一些信息提取出来,在 FlinkCDC 的 MySQL 插件中,允许定义下面三种元数据列:
Key | DataType | Description |
table_name | STRING NOT NULL | Name of the table that contain the row. |
database_name | STRING NOT NULL | Name of the database that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database.If the record is read from snapshot of the table instead of the binlog, the value is always 0. |
在使用的时候,需要加上特定的语法,例如:
这样,在查询时就会为每一行额外加上
db_name
table_name
operation_ts
三个字段。下游任务可以根据这三个字段,进行业务处理。读取流程
首先,FlinkCDC 程序需要识别出哪些字段使用了元数据列,这需要通过解析
METADATA FROM
VIRTUAL
实现,不过 Flink 已经提供了一个很简便的接口 SupportsReadingMetadata
, MySqlTableSource
只需要实现这个接口的以下方法就可以了:- listReadableMetadata
表明这个数据源插件支持哪些元数据字段。
- applyReadableMetadata
保存查询表已经使用到了哪些元数据字段。metadataKeys 就包含了元数据字段的名字。
从 applyReadableMetadata 方法获取到了 metadataKeys,FlinkCDC 根据它生成一系列 MetadataConverter。
MetadataConverter 负责从数据中提取出元数据列信息,以
table_name
为例,对应的 MetadataConverter 实现如下。从 MySQL 的 MetadataConverter
实现可以看到,它是提取了 record 里的 SOURCE 和 TABLE_NAME_KEY 信息,所以实际上这些元数据列只能在处理 binlog 的过程中提取到。MetadataConverter
在 RowDataDebeziumDeserializeSchema
生效,RowDataDebeziumDeserializeSchema
是负责将每条数据处理成下游需要的格式的工具类。RowDataDebeziumDeserializeSchema
创建了一个 AppendMetadataCollector
类。在发送数据时,可以看到
RowDataDebeziumDeserializeSchema
会根据是否包含元数据列决定是否使用 AppendMetadataCollector
类。最后,看下
AppendMetadataCollector
类的 collect 方法:从 inputRecord 中生成新的列,并且结合之前的 physicalRow,构建出新的输出记录。总结
得益于 Flink 的良好设计,为 FlinkCDC 数据源插件增加元数据列是不太困难的事,主要包括两个步骤:
1)实现
SupportsReadingMetadata
接口,定义和获取元数据字段;2)定义从原始数据获取元数据的方法,在
RowDataDebeziumDeserializeSchema
类中解析出元数据列,并下发新的记录。