🗒️Hudi|CDC构建增量数仓
2023-6-11
| 2023-6-12
字数 797阅读时长 2 分钟
type
status
date
slug
summary
tags
category
icon
password

前提

查询cdc的函数hudi_table_changes还没有合并到主分支,需要基于这个pr构建hudi代码 https://github.com/apache/hudi/pull/8729

验证

建表参数
参数
说明
hoodie.table.cdc.enabled
是否开启CDC,取值如下:true:开启CDC。false(默认值):不开启CDC。
hoodie.table.cdc.supplemental.logging.mode
CDC文件存储模式,共有三种等级: • op_key_only:存储记录的主键和操作类型。 • data_before:存储记录的主键、操作类型、记录修改前的值。 • data_before_after(默认值):存储记录的主键、操作类型、记录修改前的值和修改后的值。
测试的建表语句如下,这里设置只记录主键和操作类型即可。
插入数据。
查询变更数据。
结果集包含四个字段:op、ts_ms、before、after。
执行变更操作,包含插入、删除、更新。
查询变更数据。
可以看到刚好有对应操作的三条数据。

应用

hudi_table_changes的返回值里包含了操作、时间、前值和后值,但是前值、后值都是json格式的,所以我们需要调整一下,从json转换成实际的列,Spark提供了一个函数FROM_JSON。
这里我添加了一个Spark存储过程copy_cdc_to_temp_view封装json解析过程,代码如下:
这个存储过程的使用方式如下:
下游根据这个op字段执行对应的操作就可以了。
  • Hudi
  • SparkSQL内核剖析 第五、六章Hudi|删除数据实现
    Loading...