type
status
date
slug
summary
tags
category
icon
password
Paimon 仓库本身没有基于 Flink Sink v2 版本的 API 进行实现,FlinkCDC 接入新的 Sink 需要使用这个 API,因此需要自行实现,这里记录下实现过程和踩过的坑。
基本实现
在版本迭代中,Flink Sink 经历过 SinkFunction、Sink V1、 Sink V2 的 API 演进过程。对于 Flink 1.17/1.18 版本的 connector 来说,更建议使用 Sink V2 的 API 实现。
Sink V2 API
Sink V2 提供了 Sink、StateFulSink、TwoPhaseCommittingSink 三个基本的接口:
Sink | 普通 Sink。需要提供一个写入数据的 Writer。 |
StateFulSink | 有状态的 Sink。需要提供一个写入数据的有状态的 Writer,并且提供 Writer 如何从状态恢复的方法。 |
TwoPhaseCommittingSink | 两阶段提交的 Sink。需要提供一个写入数据的能够产生预提交信息的 Writer,并且提供一个执行事务提交的 Committer。 |
对于 Paimon 来说,需要进行事务提交来让数据可见,因此需要实现一个 TwoPhaseCommittingSink,这里列出一下接口。
Writer
Paimon Writer 需要实现 TwoPhaseCommittingSink#PrecommittingSinkWriter,因此需要实现 write 和 prepareCommit 两个方法。
这肯定是需要借助 Paimon 仓库的 API 来写入。具体来说,在 Paimon 中的 StoreSinkWrite 接口提供了这两个能力,因此我们只需要创建出 StoreSinkWrite 的实现类就可以了。
Committer
Paimon Committer 需要实现 commit 方法,这也需要借助 Paimon 仓库的 API 来写入。具体来说,在 Paimon 中的 StoreMultiCommitter 接口提供了这个能力,因此我们只需要创建出 StoreMultiCommitter 的实现就可以了。
Writer 踩坑
背景
在 Paimon 中,数据是按照 bucket 粒度分布的,每个数据对应的 bucket 是固定的,这也就要求了某一个时刻里同一个 bucket 是不能够有多个 writer 去写入的,否则会出现冲突。但是在 Flink Sink 中,并发度大于 1 是很正常的,不同的 subtask 可能需要写入同一个 bucket 的数据,这样在 commit 阶段就会出现冲突。
解决方案
我们需要在 Sink 写入数据之前就满足“某一个时刻里同一个 bucket 是不能够有多个 writer 去写入的”这种限制,这在 Source ⇒ Sink 的简单链路里是不能实现的,我们需要增加一个中间算子,这就要求数据需要进行 shuffle,让上游相同 bucket 的数据下发到同一个并行度里,即 Source ⇒ Shuffle ⇒ Sink。
这个算子怎么加进来呢,在 Flink Sink V2 提供了这种能力,我们可以通过实现 WithPreWriteTopology 接口来增加数据重新分发的能力。
具体来说,我们可以在 addPreWriteTopology 里加上一个 datastream.partitionCustom 转换,将上游的数据按照对应的 bucket 的哈希值发送给对应的下游,这样相同 bucket 的数据始终发送到同一个 writer 来处理,就不会出现多个 writer 写同一个 bucket 的情况了。
Committer 踩坑
背景
在作业运行的过程中,出现失败重启是很正常的情况,但是在测试作业中,发现当前实现的 Paimon Sink 重启之后总是会出现 commit 冲突的报错,并且不能够恢复到正常运行的状态。
解决方案
通过打印日志的方式,定位到是有相同的文件被提交到 Paimon 快照里导致的,说明 commit 信息被提交了两次。这是因为 Flink 的实现中考虑到了重启的情况,将 Committer 需要提交的信息已经预先保存到状态了,在重启的时候,会将之前没完成的 commit 信息继续提交,而 Paimon 的 storeMultiCommitter.commit 并没有进行文件是否已经提交过的校验,因此会导致同一个文件被提交了两次。
Paimon 仓库中其实已经考虑到了这种场景,提供了 filterAndCommit 来支持这种情况,会将已经处理过的文件给过滤掉。因此只需要将 storeMultiCommitter.commit 的调用改成 storeMultiCommitter.filterAndCommit 即可,当然这样性能会有下滑,这是有待后续优化的地方。
总结
在对接 Paimon 时,如何避免 commit 冲突是一件需要认真考虑的事情。