Hudi源码分析之Upsert在Hudi中的实现分析
2023-09-19 20:52:24 0 举报
此图主要是描述Hudi 0.8.x版本的upsert功能源码流程
作者其他创作
大纲/内容
并生成UPDATE类型的桶信息
SparkLazyInsertIterable
计算该小文件中还可插入多少条记录
缓存record
handleUpsertPartition
BucketType
构造该对象时会利用profile信息来进行必要的初始化,在构造方法中就有assignUpdates、assignInserts两个关键方法
handleUpdateInternal
优先插入小文件,对于剩余的记录则写入新文件
getPartitioner()
INSERT
构建WorkloadProfile
Hudi支持Upsert语义,即将数据插入更新至Hudi数据集中,在借助索引机制完成数据查询后(查找记录位于哪个文件),再将该记录的位置信息回推至记录本身,然后对于已经存在于文件的记录使用UPDATE,而未存在于文件中的记录使用INSERT。
handleUpdate()
assignUpdates
tagLocation()
遍历record需要插入的每个分区下所有小数据文件
upsert
Bloom Filter在Hudi中应用已经过源码分析,此次着重分析将位置信息回推到记录中,然后进行record的upsert
此处upsert代码中并没有此方法,为了源码分段分析自行规整
遍历需要更新的记录
AbstractWriteHelper.write()
此处关键处理就是将所有的record进行key-->record转换,然后与已经通过Bloom Filter找到文件位置的record进行left join,从而tag记录退回record中
获取当前要插入的record所涉及的partition
BaseSparkCommitActionExecutor.execute()
Hudi Upsert总结:对于Upsert而言,Hudi总体的处理流程是先根据索引给记录打标签,然后进行一次重新分区,对于新插入的记录,会优先插入小文件中,避免出现太多小文件,而且也会根据数据文件的具体配置控制数据文件的大小;而对于更新的记录,则会与旧记录进行合并、必要时复制旧记录到新文件(FileId与旧文件的FileId相同,commitTime不同)中。
不管是对于INSERT还是UPDATE,其都会借助BoundedInMemoryExecutor来转发对记录的处理(涉及生产者-消费者-队列模型,后续会单独分析)
clusteringHandleUpdate()
如果旧记录(文件中的旧记录)在新纪录(新写入的记录)中存在,将旧记录与新纪录合并(合并策略可以自定义实现,默认新记录覆盖旧记录),合并后再写入新文件(与原来FileId相同,但是commitTime不同,commitTime越大,文件越新),如果旧记录不存在,那么需要复制旧记录,然后写入新文件中。这样便完成了文件中已存在记录的更新和文件中未存在记录的复制,保证无记录丢失。
handleInsert()
SparkBoundedInMemoryExecutor
assignInserts
END
HoodieMergeHandle.write()
UPDATE
对于值存在的记录,则表示插入,写入数据文件,然后释放记录的内容,当然在调用该write方法写入之前,需要先判断该文件还能不能写入(当前文件大小是否大于配置的最大数据文件大小和分区路径是否一致),若不能写入,则会在原来FileId上加从0开始的递增序列以生成新文件写入
getUpsertPartitioner()
HoodieCreateHandle.write()
如果分区无记录,则直接返回空迭代器,否则会创建一个迭代器进行处理。
BaseCommitActionExecutor.execute()
SparkMergeHelper.runMerge()
然后根据不同的表类型(Merge On Read/Copy On Write)来获取对应的Partitioner进行重新分区
若record文件分布在集群中,则在update或者insert之前需要进行check
文件名 -> 桶序号的映射、桶序号与桶信息的映射进行保存
计算当前record大小
不同引擎(Spark/Flink)实现各自不同的具体upsert逻辑,这里主要分析BaseSparkCommitActionExecutor
0 条评论
下一页