🗒️Debezium binlog 处理流程
2024-3-24
| 2024-3-25
字数 975阅读时长 3 分钟
type
status
date
slug
summary
tags
category
icon
password

建立连接

BinaryLogClient 类用于建立与 MySQL 服务器的连接。下面的操作都在其 connect 方法中。
  1. 在连接前,需要配置好 hostname、keepAlive、connectTimeout 等参数,BinaryLogClient 会建立一个 Socket 连接。
  1. BinaryLogClient 根据之前配置的 binlog 文件发送一个 dumpBinaryLogCommand 请求。可以看到,请求里包含了 serverId, binlogFilename, binlogPosition 三项。
  1. 开始在 listenForEventPackets 方法中监听和处理来自 binlog 中的内容。
在开始接受 binlog 网络流以后,BinaryLogClient 会将流数据解析成一个个的 Event
EventEventHeaderEventData 组成:EventHeader 包含了这个事件的类型信息和 EventData 的长度信息,EventData 就包含了具体的表变更信息,有多个实现类。以 WriteRowsEventData 为例,包含这几个成员变量。

事件解析

EventDeserializer 负责从流中解析出 Event
大致上就是从 inputStream 中先解析出 EventHeader,然后根据其描述的类型使用对应的 EventDataDeserializer 进行解析得到 EventData。对于类型为 TABLE_MAP 的 Event,它包含了表结构的信息,解析后会将表结构和其对应的 tableid 放到 Map<Long, TableMapEventData> tableMapEventByTableId map 中,用来辅助数据变更事件的解析。
 
WriteRowsEventDataDeserializer 为例,其 EventData 的反序列化实现如下:
其实就是读取并反序列化出一个个成员变量。

事件处理

在拿到 Event 以后,BinaryLogClient 会用预先注册好的 EventListener 进行处理。
以 FlinkCDC 注册的 EventListener 为例,为不同的事件类型提供了不同的处理方法。(以下代码在 MySqlStreamingChangeEventSource 类中)
以处理写入事件的 handleInsert 方法为例:
handleChange 方法会检验这个表是否在被捕获列表中,如果不在的话,就不会继续处理。
SourceRecord 会最终放到一个 queue 中。
这个队列的大小是可以通过 max.queue.size 参数配置的,增加这个队列的大小可以一定程度提升性能。
 
自定义实现 Paimon Flink SinkCanal 学习笔记
Loading...