type
status
date
slug
summary
tags
category
icon
password
建立连接
BinaryLogClient
类用于建立与 MySQL 服务器的连接。下面的操作都在其 connect 方法中。- 在连接前,需要配置好 hostname、keepAlive、connectTimeout 等参数,
BinaryLogClient
会建立一个 Socket 连接。
BinaryLogClient
根据之前配置的 binlog 文件发送一个dumpBinaryLogCommand
请求。可以看到,请求里包含了 serverId, binlogFilename, binlogPosition 三项。
- 开始在
listenForEventPackets
方法中监听和处理来自 binlog 中的内容。
在开始接受 binlog 网络流以后,
BinaryLogClient
会将流数据解析成一个个的 Event
。Event
由 EventHeader
和 EventData
组成:EventHeader
包含了这个事件的类型信息和 EventData
的长度信息,EventData
就包含了具体的表变更信息,有多个实现类。以
WriteRowsEventData
为例,包含这几个成员变量。事件解析
EventDeserializer
负责从流中解析出 Event
。大致上就是从 inputStream 中先解析出
EventHeader
,然后根据其描述的类型使用对应的 EventDataDeserializer
进行解析得到 EventData
。对于类型为 TABLE_MAP 的 Event
,它包含了表结构的信息,解析后会将表结构和其对应的 tableid 放到
Map<Long, TableMapEventData> tableMapEventByTableId
map 中,用来辅助数据变更事件的解析。以
WriteRowsEventDataDeserializer
为例,其 EventData
的反序列化实现如下:其实就是读取并反序列化出一个个成员变量。
事件处理
在拿到
Event
以后,BinaryLogClient
会用预先注册好的 EventListener
进行处理。以 FlinkCDC 注册的
EventListener
为例,为不同的事件类型提供了不同的处理方法。(以下代码在 MySqlStreamingChangeEventSource
类中)以处理写入事件的 handleInsert 方法为例:
handleChange
方法会检验这个表是否在被捕获列表中,如果不在的话,就不会继续处理。SourceRecord
会最终放到一个 queue 中。这个队列的大小是可以通过 max.queue.size 参数配置的,增加这个队列的大小可以一定程度提升性能。