type
status
date
slug
summary
tags
category
icon
password
前提
查询cdc的函数
hudi_table_changes
还没有合并到主分支,需要基于这个pr构建hudi代码 https://github.com/apache/hudi/pull/8729。验证
建表参数
参数 | 说明 |
hoodie.table.cdc.enabled | 是否开启CDC,取值如下:
• true:开启CDC。
• false(默认值):不开启CDC。 |
hoodie.table.cdc.supplemental.logging.mode | CDC文件存储模式,共有三种等级:
• op_key_only:存储记录的主键和操作类型。
• data_before:存储记录的主键、操作类型、记录修改前的值。
• data_before_after(默认值):存储记录的主键、操作类型、记录修改前的值和修改后的值。 |
测试的建表语句如下,这里设置只记录主键和操作类型即可。
插入数据。
查询变更数据。
结果集包含四个字段:op、ts_ms、before、after。
执行变更操作,包含插入、删除、更新。
查询变更数据。
可以看到刚好有对应操作的三条数据。
应用
hudi_table_changes
的返回值里包含了操作、时间、前值和后值,但是前值、后值都是json格式的,所以我们需要调整一下,从json转换成实际的列,Spark提供了一个函数FROM_JSON。这里我添加了一个Spark存储过程
copy_cdc_to_temp_view
封装json解析过程,代码如下:这个存储过程的使用方式如下:
下游根据这个op字段执行对应的操作就可以了。