🗒️Paimon dynamic bucket 的实现
2024-7-29
| 2024-7-29
字数 807阅读时长 3 分钟
type
status
date
slug
summary
tags
category
icon
password

介绍

dynamic bucket 是和 fixed bucket 相对的:
  • fixed bucket
需要在创建表的时候指定 bucket 的数量,读取写入都根据数据的 hash 计算得到 bucket,bucket 数量不能修改,每个 bucket 的数据量会持续增长。
  • dynamic bucket
建表的时候不需要指定 bucket 的数量,只需要指定每个 bucket 的容量,每个 bucket 的大小受 dynamic-bucket.target-row-num 控制,默认值是 2000000。当达到了这个数据量就需要写入一个新的 bucket。

实现

Paimon dynamic bucket 实现的核心是一个 HashBucketAssigner 类,其作用是为每一条数据提供一个固定的 bucket 值。
notion image
Paimon 会将数据的 hash 值和映射的 bucket 信息写入到一个索引文件中。在 BucketAssigner 启动时,BucketAssigner 会读取这些文件,将之前记录的映射关系加载到一个 Int2ShortHashMap 类中(这个变量名为 hash2Bucket),并且将每个分区的数据量记录到一个 Map<Integer, Long> 的变量 nonFullBucketInformation 中。
对于每个到来的数据,HashBucketAssigner 会按照下面三个步骤来确定其 bucket 值:
notion image
  1. 在 hash2Bucket 中找到了 hash 值对应的 bucket,那么就直接返回。
    1. 这种情况存在不同的数据计算得到相同的 hash 值的场景,这样每个 bucket 可能会被设定值大一点,不过这是可以接受的。
  1. 遍历 nonFullBucketInformation,找到第一个没有满的 bucket,返回这个 bucket 值,并且更新 hash2Bucket。
  1. 从 0 到 Short.MAX_VALUE 遍历所有数字,找到没有被使用的编号,返回这个 bucket 值,并且更新 hash2Bucket 和 nonFullBucketInformation。
如果只有一个 BucketAssigner 去做这件事情,那么性能肯定是不好的,可是如果有多个 BucketAssigner,那么在往 bucket 添加元素和创建新 bucket 肯定会有冲突的情况,怎么解决这个问题呢?
其实很简单,在发给 BucketAssigner 处理数据之前,先按照并行度进行一次 hash 分发,而在创建 bucket 时,也只创建按照总并行度整除后余数等于当前并行度的 bucket 值,这样就可以避免互相影响了。例如,总共有 5 个并行度,那么并行度编号为 1 的节点只处理 hash 值能被 5 整除余数为 1 的数据,也只能创建编号为 1、6、11、16 等的 bucket。
最后,贴一下代码:

拓扑

Flink Paimon Sink完整的支持 dynamic bucket 的拓扑链路应该为:
Source ⇒ partition by Hash of primary key ⇒ assign bucket ⇒ partition by bucket ⇒ Sink
  • Paimon
  • Paimon 数据写入流程FLIP 309: 在处理堆积数据时支持设置更大的 checkpoint 间隔
    Loading...