4.Spark 与Iceberg 整合DDL操作
创建分区表
create table xxx (col1 xx...) using iceberg partitioned by (col1)
注意:向Iceberg分区表中插入数据之前,需要对数据按照分区列进行排序
创建隐藏分区
timestampe转换分区
years
months
days
hours
create table ... using iceberg as select ....
replace table .... as select ....
drop table ...
Alter 增加列、删除列、重命名列
alter table ... add column ...
alter table ... drop column...
alter table .... rename column to ...
Alter 增加、删除分区
alter table... add partition ...
alter table ... drop partition ...
5.Spark与Iceberg整合查询操作
1.DataFrame 读取Iceberg表
spark.read.format("iceberg").load("iceberg table path")
spark.table("iceberg tablename")
2.SparkSQL查询iceberg表快照信息
select * from ${catalog名称}.${库名称}.${表名}.snapshots
3.SparkSQL 查询iceberg表历史
select * from ${catalog名称}.${库名称}.${表名}.history
4.SparkSQL 查询iceberg datafiles
select * from ${catalog名称}.${库名称}.${表名}.files
5.SparkSQL 查询iceberg manifest
select * from ${catalog名称}.${库名称}.${表名}.manifest
6.Spark查询Iceberg指定快照数据
spark.read
.option("snapshot-id",5725035405694513291L)
.format("iceberg")
.load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
.show()
spark.sql(
"""
| call hadoop_prod.system.set_current_snapshot('mydb.mytest',5725035405694513291)
""".stripMargin)
7.Spark 指定时间戳查询iceberg数据
spark.read
.option("as-of-timestamp","1640672100000")
.format("iceberg")
.load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
.show()
spark.sql(
"""
| call hadoop_prod.system.rollback_to_timestamp("mydb.mytest",TIMESTAMP '2021-12-28 14:15:00.000')
""".stripMargin)
8.Spark 回滚Iceberg快照
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
catalog.setConf(conf)
val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
table.manageSnapshots().rollbackTo(4428163433183700863L).commit()
spark.sql(
"""
|call hadoop_prod.system.rollback_to_snapshot('mydb.mytest',4428163433183700863)
""".stripMargin)
9.合并Iceberg数据文件
val conf = new Configuration()<br>val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")<br>val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))<br>SparkActions.get(spark).rewriteDataFiles(table).option("target-file-size-bytes", "1024").execute()
10.删除历史快照
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
table.expireSnapshots().expireOlderThan(1640677739793L).commit()
删除历史快照时对应的不再引用的parquet数据文件也会被删除
SQL 方式删除快照:CALL ${Catalog 名称}.system.expire_snapshots("${库名.表名}",TIMESTAMP '年-月-日 时-分-秒.000',N)
目前SQL 方式删除快照有bug问题
在创建iceberg表时可以指定参数处理元数据文件增多问题
CREATE TABLE ${CataLog名称}.${库名}.${表名}
( id bigint, name string ) using iceberg
PARTITIONED BY ( loc string )
TBLPROPERTIES (
'write.metadata.delete-after-commit.enabled'= true,
'write.metadata.previous-version-max' = 3
)
write.metadata.delete-after-commit.enabled - 每次表提交后是否删除旧的元数据文件
write.metadata.previous-version-max - 要保留旧的元数据文件数量
6.Spark 与Iceberg整合写操作
1.Insert into
2.Merge into
merge into hadoop_prod.default.a t1
using (select id,name,age,tp from hadoop_prod.default.b ) t2
on t1.id = t2.id
when matched and t2.tp = 'delete' then delete
when matched and t2.tp = 'update' then update set t1.name = t2.name ,t1.age = t2.age
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)
操作的是 t1表中的数据
注意: 更新数据时,在查询的数据匹配条件中只能有一条数据匹配,否则报错。
3.Insert overwrite
普通表insert overwrite
spark.sql(<br> """<br> | insert overwrite hadoop_prod.iceberg_db.test2 select id,name,loc from hadoop_prod.iceberg_db.test3<br> """.stripMargin)
将test3表中的记录全部覆盖到test2表中<br>
分区表overwrite
动态覆盖
spark.sql(<br> """<br> | insert overwrite hadoop_prod.iceberg_db.test1 select id,name,loc from hadoop_prod.iceberg_db.test3<br> """.stripMargin)
会将test1的原数据全部删除,将test3的数据覆盖到test1中
静态覆盖
spark.sql(<br> """<br> | insert overwrite hadoop_prod.iceberg_db.test1<br> | partition (loc = 'guangzhou')<br> | select id,name from hadoop_prod.iceberg_db.test3<br> """.stripMargin)
会覆盖匹配上分区的数据,没有则新增: test1如果分区名字对应上,test3的数据会覆盖匹配上分区test1里面的数据,如果没有该分区,则新增新分区
4.delete from
delete from 如果where条件删除的是整个分区,那么只会修改元数据
delete from 如果where条件删除的是某条数据,iceberg会重写当前这条数据所在的paquet文件数据
5.Update
update .... set ...where ..
6.DataFrame 读写Iceberg
df.writeTo("hadoop_prod.iceberg_db.df_tbl1").create()
写入普通表
df.sortWithinPartitions($"loc")<br> .writeTo("hadoop_prod.default.df_tbl2")<br> .partitionedBy($"loc")<br> .create()
写入分区表 写入前一定要按照分区排序
spark.read.table("hadoop_prod.default.df_tbl2").show()
操作
df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ...
df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ..
df.write(tbl).append() 相当于 INSERT INTO ...
df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...
7.Structured Streaming 实时向Iceberg写数据
1.StructuredStreaming 向Iceberg写数据编码
2.注意事项
写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。
实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。