type
status
date
slug
summary
tags
category
icon
password
介绍
dynamic bucket 是和 fixed bucket 相对的:
- fixed bucket
需要在创建表的时候指定 bucket 的数量,读取写入都根据数据的 hash 计算得到 bucket,bucket 数量不能修改,每个 bucket 的数据量会持续增长。
- dynamic bucket
建表的时候不需要指定 bucket 的数量,只需要指定每个 bucket 的容量,每个 bucket 的大小受 dynamic-bucket.target-row-num 控制,默认值是 2000000。当达到了这个数据量就需要写入一个新的 bucket。
实现
Paimon dynamic bucket 实现的核心是一个 HashBucketAssigner 类,其作用是为每一条数据提供一个固定的 bucket 值。
Paimon 会将数据的 hash 值和映射的 bucket 信息写入到一个索引文件中。在 BucketAssigner 启动时,BucketAssigner 会读取这些文件,将之前记录的映射关系加载到一个 Int2ShortHashMap 类中(这个变量名为 hash2Bucket),并且将每个分区的数据量记录到一个 Map<Integer, Long> 的变量 nonFullBucketInformation 中。
对于每个到来的数据,HashBucketAssigner 会按照下面三个步骤来确定其 bucket 值:
- 在 hash2Bucket 中找到了 hash 值对应的 bucket,那么就直接返回。
这种情况存在不同的数据计算得到相同的 hash 值的场景,这样每个 bucket 可能会被设定值大一点,不过这是可以接受的。
- 遍历 nonFullBucketInformation,找到第一个没有满的 bucket,返回这个 bucket 值,并且更新 hash2Bucket。
- 从 0 到 Short.MAX_VALUE 遍历所有数字,找到没有被使用的编号,返回这个 bucket 值,并且更新 hash2Bucket 和 nonFullBucketInformation。
如果只有一个 BucketAssigner 去做这件事情,那么性能肯定是不好的,可是如果有多个 BucketAssigner,那么在往 bucket 添加元素和创建新 bucket 肯定会有冲突的情况,怎么解决这个问题呢?
其实很简单,在发给 BucketAssigner 处理数据之前,先按照并行度进行一次 hash 分发,而在创建 bucket 时,也只创建按照总并行度整除后余数等于当前并行度的 bucket 值,这样就可以避免互相影响了。例如,总共有 5 个并行度,那么并行度编号为 1 的节点只处理 hash 值能被 5 整除余数为 1 的数据,也只能创建编号为 1、6、11、16 等的 bucket。
最后,贴一下代码:
拓扑
Flink Paimon Sink完整的支持 dynamic bucket 的拓扑链路应该为:
Source ⇒ partition by Hash of primary key ⇒ assign bucket ⇒ partition by bucket ⇒ Sink