Iceberg总结
2023-01-12 11:04:16 0 举报
AI智能生成
Iceberg数据湖总结
作者其他创作
大纲/内容
什么是数据湖
概念:数据湖就是集中式的数据存储库,可以存储各种数据格式数据,例如:非结构化、结构化数据、文本、视频...
大数据中为什么需要数据湖
数据湖做到了离线和实时底层数据存储的统一,解决了Kappa架构的痛点问题
Kappa架构痛点问题
1.Kafka不支持海量数据存储
2.Kappa架构中使用Kafka做分层,Kafka不支持SQL OLAP分析
3.Kafka做分层不能很好的集成原有的数据血缘关系系统、数据质量管理系统
4.Kafka不支持数据的更新,只支持数据的Append
Iceberg概念及特点
概念:Apache Iceberg是用于海量数据分析场景的表格式(Table Format),单表可以支持数十PB数据存储。可以和Hive、Presto、Spark、Flink做高效整合。Iceberg是一种数据湖解决方案。
特点
iceberg支持实时/批量数据写入和读取,支持Spark/Flink计算引擎。
Iceberg支持事务ACID,支持添加、删除、更新数据。
不绑定任何底层存储,支持Parquet、ORC、Avro格式兼容行存储和列存储。
Iceberg支持隐藏分区和分区变更,方便业务进行数据分区策略。
Iceberg支持快照数据重复查询,具备版本回滚功能。
Iceberg扫描计划很快,读取表或者查询文件可以不需要分布式SQL引擎。
Iceberg通过表元数据来对查询进行高效过滤。
基于乐观锁的并发支持,提供多线程并发写入能力并保证数据线性一致。
注意:Iceberg非常轻量级,与Spark、Flink进行整合时就是一个jar包,官网:https://iceberg.apache.org
Iceberg数据存储格式
Iceberg术语
Data file - 数据文件
Snapshot - 表快照
Manifest list - 清单列表
Manifest file - 清单文件
Iceberg表格式
Table Format 表格式就是指元数据与数据组织方式
Iceberg表格式处于底层存储(例如:HDFS)和上层计算框架(Flink、Spark)之间
Iceberg底层表格式
分支主题
Iceberg特点
Iceberg支持分区隐藏分区
Iceberg支持表演化
Iceberg支持Schema演化
Iceberg支持分区演化
Iceberg支持列顺序的演化
Iceberg支持数据类型
boolean 布尔类型,true或者false
int 32位有符号整形 可以转换成long类型
long 64位有符号整形
float 单精度浮点型 可以转换成double类型
double 双精度浮点型
decimal(P,S) decimal(P,S) P代表精度,决定总位数,S代表规模,决定小数位数。P必须小于等于38。
date 日期,不含时间和时区
time 时间,不含日期和时区 以微秒存储,1000微秒 = 1毫秒
timestamp 不含时区的timestamp 以微秒存储,1000微秒 = 1毫秒
timestamptz 含时区的timestamp 以微秒存储,1000微秒 = 1毫秒
string 任意长度的字符串类型 UTF-8编码
fixed(L) 长度为L的固定长度字节数组
binary 任意长度的字节数组
struct<...> 任意数据类型组成的一个结构化字段
list <E> 任意数据类型组成的List
map<K,V>
Hive与Iceberg整合
版本匹配
Iceberg0.12.1 与Hive 2.x & Hive3.1.2匹配
配置
1.将 iceberg-hive-runtime.jar 与 libfb303-0.9.3.jar 上传到Hive服务端与客户端对应的lib目录中
2.在Hive客户端 $HIVE_HOME/conf/hive-site.xml中配置开启Iceberg支持
<property> <name>iceberg.engine.hive.enabled</name> <value>true</value> </property>
Hive 操作Iceberg格式表
Hive操作Iceberg支持多种Catalog元数据管理方式:Hive、Hadoop、第三方、自定义
配置Catalog 属性
iceberg.catalog.<catalog_name>.type -- 指定catalog类型
iceberg.catalog.<catalog_name>.warehouse -- 指定数据路径
Hive&Iceberg 整合Catalog使用情况
1.不设置iceberg.catalog默认使用Hive catalog
2.设置iceberg.catalog ,类型可以是Hive、Hadoop,设置hadoop必须指定warehouse路径
3.iceberg.catalog指定成location_based_table,默认是将iceberg数据对应的路径加载成iceberg表
注意
1.如果Iceberg表有分区,使用Hive创建时直接指定分区就可以,Hive创建不支持隐藏分区
2.如果加载iceberg数据路径是分区表,创建iceberg表时分区字段当做普通字段处理就可以
Iceberg表数据组织与查询
Iceberg底层数据存储结构
Iceberg如何根据元数据查询数据
查询当前最新数据
查询指定快照数据
根据时间戳查询数据
Spark3.1.2与Iceberg0.12.1整合
1.Spark与Iceberg 版本匹配,iceberg0.12.1支持Spark2.4+,但是不完善,建议使用Spark3.x以上版本
2.需要导入maven pom相关依赖
3.SparkSQL中设置Catalog
Hive Catalog
配置
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
.config("iceberg.engine.hive.enabled", "true")
代码实操
Hadoop Catalog
配置
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hive_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
代码实操
4.Spark 与Iceberg 整合DDL操作
创建分区表
create table xxx (col1 xx...) using iceberg partitioned by (col1)
注意:向Iceberg分区表中插入数据之前,需要对数据按照分区列进行排序
创建隐藏分区
timestampe转换分区
years
months
days
hours
create table ... using iceberg as select ....
replace table .... as select ....
drop table ...
Alter 增加列、删除列、重命名列
alter table ... add column ...
alter table ... drop column...
alter table .... rename column to ...
Alter 增加、删除分区
alter table... add partition ...
alter table ... drop partition ...
5.Spark与Iceberg整合查询操作
1.DataFrame 读取Iceberg表
spark.read.format("iceberg").load("iceberg table path")
spark.table("iceberg tablename")
2.SparkSQL查询iceberg表快照信息
select * from ${catalog名称}.${库名称}.${表名}.snapshots
3.SparkSQL 查询iceberg表历史
select * from ${catalog名称}.${库名称}.${表名}.history
4.SparkSQL 查询iceberg datafiles
select * from ${catalog名称}.${库名称}.${表名}.files
5.SparkSQL 查询iceberg manifest
select * from ${catalog名称}.${库名称}.${表名}.manifest
6.Spark查询Iceberg指定快照数据
spark.read
.option("snapshot-id",5725035405694513291L)
.format("iceberg")
.load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
.show()
spark.sql(
"""
| call hadoop_prod.system.set_current_snapshot('mydb.mytest',5725035405694513291)
""".stripMargin)
7.Spark 指定时间戳查询iceberg数据
spark.read
.option("as-of-timestamp","1640672100000")
.format("iceberg")
.load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
.show()
spark.sql(
"""
| call hadoop_prod.system.rollback_to_timestamp("mydb.mytest",TIMESTAMP '2021-12-28 14:15:00.000')
""".stripMargin)
8.Spark 回滚Iceberg快照
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
catalog.setConf(conf)
val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
table.manageSnapshots().rollbackTo(4428163433183700863L).commit()
spark.sql(
"""
|call hadoop_prod.system.rollback_to_snapshot('mydb.mytest',4428163433183700863)
""".stripMargin)
9.合并Iceberg数据文件
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
SparkActions.get(spark).rewriteDataFiles(table).option("target-file-size-bytes", "1024").execute()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
SparkActions.get(spark).rewriteDataFiles(table).option("target-file-size-bytes", "1024").execute()
10.删除历史快照
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
table.expireSnapshots().expireOlderThan(1640677739793L).commit()
删除历史快照时对应的不再引用的parquet数据文件也会被删除
SQL 方式删除快照:CALL ${Catalog 名称}.system.expire_snapshots("${库名.表名}",TIMESTAMP '年-月-日 时-分-秒.000',N)
目前SQL 方式删除快照有bug问题
在创建iceberg表时可以指定参数处理元数据文件增多问题
CREATE TABLE ${CataLog名称}.${库名}.${表名}
( id bigint, name string ) using iceberg
PARTITIONED BY ( loc string )
TBLPROPERTIES (
'write.metadata.delete-after-commit.enabled'= true,
'write.metadata.previous-version-max' = 3
)
write.metadata.delete-after-commit.enabled - 每次表提交后是否删除旧的元数据文件
write.metadata.previous-version-max - 要保留旧的元数据文件数量
6.Spark 与Iceberg整合写操作
1.Insert into
2.Merge into
merge into hadoop_prod.default.a t1
using (select id,name,age,tp from hadoop_prod.default.b ) t2
on t1.id = t2.id
when matched and t2.tp = 'delete' then delete
when matched and t2.tp = 'update' then update set t1.name = t2.name ,t1.age = t2.age
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)
操作的是 t1表中的数据
注意: 更新数据时,在查询的数据匹配条件中只能有一条数据匹配,否则报错。
3.Insert overwrite
普通表insert overwrite
spark.sql(
"""
| insert overwrite hadoop_prod.iceberg_db.test2 select id,name,loc from hadoop_prod.iceberg_db.test3
""".stripMargin)
"""
| insert overwrite hadoop_prod.iceberg_db.test2 select id,name,loc from hadoop_prod.iceberg_db.test3
""".stripMargin)
将test3表中的记录全部覆盖到test2表中
分区表overwrite
动态覆盖
spark.sql(
"""
| insert overwrite hadoop_prod.iceberg_db.test1 select id,name,loc from hadoop_prod.iceberg_db.test3
""".stripMargin)
"""
| insert overwrite hadoop_prod.iceberg_db.test1 select id,name,loc from hadoop_prod.iceberg_db.test3
""".stripMargin)
会将test1的原数据全部删除,将test3的数据覆盖到test1中
静态覆盖
spark.sql(
"""
| insert overwrite hadoop_prod.iceberg_db.test1
| partition (loc = 'guangzhou')
| select id,name from hadoop_prod.iceberg_db.test3
""".stripMargin)
"""
| insert overwrite hadoop_prod.iceberg_db.test1
| partition (loc = 'guangzhou')
| select id,name from hadoop_prod.iceberg_db.test3
""".stripMargin)
会覆盖匹配上分区的数据,没有则新增: test1如果分区名字对应上,test3的数据会覆盖匹配上分区test1里面的数据,如果没有该分区,则新增新分区
4.delete from
delete from 如果where条件删除的是整个分区,那么只会修改元数据
delete from 如果where条件删除的是某条数据,iceberg会重写当前这条数据所在的paquet文件数据
5.Update
update .... set ...where ..
6.DataFrame 读写Iceberg
df.writeTo("hadoop_prod.iceberg_db.df_tbl1").create()
写入普通表
df.sortWithinPartitions($"loc")
.writeTo("hadoop_prod.default.df_tbl2")
.partitionedBy($"loc")
.create()
.writeTo("hadoop_prod.default.df_tbl2")
.partitionedBy($"loc")
.create()
写入分区表 写入前一定要按照分区排序
spark.read.table("hadoop_prod.default.df_tbl2").show()
操作
df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ...
df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ..
df.write(tbl).append() 相当于 INSERT INTO ...
df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...
7.Structured Streaming 实时向Iceberg写数据
1.StructuredStreaming 向Iceberg写数据编码
2.注意事项
写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。
实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。
Flink 与 Iceberg整合
Flink 支持DataStream API 和 SQL API 批量/实时操作Iceberg
DataStream API 操作Iceberg
DataStream API 实时读取Kafka 数据写入Iceberg表
注意问题
需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。
可以创建Hive 对应的Iceberg表,可以实时看到表中数据增加,但是注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。
DataStream API 批量/实时读取Iceberg表数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
.streaming(true)
.build();
如果.streaming(true)就是实时读取,默认false ,批量读取
DataStream API 基于快照来批量/实时读取Iceberg表数据
DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
.tableLoader(tableLoader)
.startSnapshotId(5001264950716649450L)
.streaming(true)
.build();
定义提交Flink任务合并Data files
//合并data files
Actions.forTable(table)
.rewriteDataFiles()
.targetSizeInBytes(536870912L)
.execute();
Flink SQL API 操作Iceberg
SQL API 创建Iceberg表并写入数据
1.创建Catalog
2.使用当前Catalog
3.创建数据库
4.使用当前库
5.创建Iceberg 表
6.向iceberg表中插入数据
SQL API 批量/实时查询Iceberg表数据
批量查询就是写SQL 直接查询
实时查询需要设置table.dynamic-table-options.enabled 为true,并且在SQL中指定Option
SQL API 基于某个快照ID 实时增量读取iceberg数据
实时查询需要设置table.dynamic-table-options.enabled 为true,并且在SQL中指定Option : start-snapshot-id
案例:SQL API 实现实时读取Kafka 数据写入Iceberg表
Flink与Iceberg整合不足
Iceberg目前不支持Flink SQL 查询表的元数据信息,需要使用Java API 实现。
Flink不支持创建带有隐藏分区的Iceberg表
Flink不支持带有WaterMark的Iceberg表
Flink不支持添加列、删除列、重命名列操作。
Flink对Iceberg Connector支持并不完善。
Iceberg 与 Hudi数据湖技术对比
相同点
都是构建于存储格式之上的数据组织方式
提供ACID能力,提供一定的事务、并行执行能力
提供行级别数据修改能力。
提供一定的Schema扩展能力,例如:新增、修改、删除列操作。
支持数据合并,处理小文件。
支持Time travel 查询快照数据。
支持批量和实时数据读写
不同点
Iceberg支持Parquet、avro、orc数据格式,Hudi支持Parquet和Avro格式。
两者数据存储和查询机制不同
对于处理小文件合并时,Iceberg只支持API方式手动处理合并小文件,Hudi对于小文件合并处理可以根据配置自动的执行。
Spark与Iceberg和Hudi整合时,Iceberg对SparkSQL的支持目前来看更好。Spark与Hudi整合更多的是Spark DataFrame API 操作。
关于Schema方面,Iceberg Schema与计算引擎是解耦的,不依赖任何的计算引擎,而Hudi的Schema依赖于计算引擎Schema。
0 条评论
下一页