type
status
date
slug
summary
tags
category
icon
password
以 MYSQL 的 binlog 为例,说明 FlinkCDC 是如何读取增量数据的。
FlinkCDC 读取 binlog 依赖 Debezium 的实现,这里会包括 Debezium 的源码。
直接进入到
MySqlStreamingChangeEventSource
类的 execute
方法,这是核心入口。基本流程是,配置好一个
BinaryLogClient
后,注册监听器,创建连接获取变更事件,由监听器负责具体事件处理。这个
listener
是一个 lambda 方法,所以实际上是调用 handleEvent
方法。这里的
eventHandlers
是一个 map 结构,保存着EventType
到 BlockingConsumer<Event>
的映射关系,而 BlockingConsumer
作为消费者,通过 accept
方法处理事件。eventHandlers
保存的关系就很多了,包括了 MYSQL 里面的多种事件,这里列举出 insert 和 update 操作映射。我们看一下
EventType.UPDATE_ROWS
这个事件的处理逻辑,进入 handleUpdate
方法,再进入 handleChange
方法:可以看到,数据是通过 changeEmitter.emit 发送出去的。
对于 Update 类型的数据, changeEmitter 的发送逻辑如下:
数据经过
streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers)
方法被放入队列中 queue.enqueue(changeEventCreator.createDataChangeEvent(record))
。