太一项目图计算SparkGraphX思维导图
2025-07-11 08:39:12 0 举报
AI智能生成
1
作者其他创作
大纲/内容
概念
点
边
点和边都可以设置值
截图
步骤
第一步
// 创建一个 RDD 用于表示顶点
val users: RDD[(VertexId, (String, String))] = sc.parallelize(
Array(
(3L, ("zhangsan", "student")), (7L, ("lisi", "postdoctoral")),
(5L, ("p1", "professor")), (2L, ("p2", "professor"))
)
)
val users: RDD[(VertexId, (String, String))] = sc.parallelize(
Array(
(3L, ("zhangsan", "student")), (7L, ("lisi", "postdoctoral")),
(5L, ("p1", "professor")), (2L, ("p2", "professor"))
)
)
第二步
// 创建一个 RDD 用于表示边
val relationships: RDD[Edge[String]] = sc.parallelize(
Array(
// srcId:源顶点,dstId:目的顶点,attr:边的属性(边数据)
Edge(3L, 7L, "collaborator"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI") // Principal Investigator
)
)
val relationships: RDD[Edge[String]] = sc.parallelize(
Array(
// srcId:源顶点,dstId:目的顶点,attr:边的属性(边数据)
Edge(3L, 7L, "collaborator"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI") // Principal Investigator
)
)
第三步
// 连通性(连通子图):可以将每个顶点都关联到连通图里的最小顶点 如何生成图
val value = graph.connectedComponents()
value.vertices
.map(tp => (tp._2, tp._1))
.groupByKey()
.collect()
.foreach(println)
// 如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2
// 返回结果:(2,CompactBuffer(3, 7, 5, 2))
val value = graph.connectedComponents()
value.vertices
.map(tp => (tp._2, tp._1))
.groupByKey()
.collect()
.foreach(println)
// 如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2
// 返回结果:(2,CompactBuffer(3, 7, 5, 2))
第一次的数据处理步骤
准备数据
username,phone,mac,imei,open_id,android_id,idfv
zhangsan,13800000000,00:1A:2B:3C:4D:5E,356938035643809,openid_1234567890abcdef,,idfv_1234567890abcdef
,13800000000,11:22:33:44:55:66,356938035643810,openid_1234567890abcdef,androididabcdef1234567890,
,,AA:BB:CC:DD:EE:FF,356938035643811,,androididabcdef1234567890,
lisi,13811111111,33:22:11:44:55:66,356938035643814,openid_567890abcdef1234,androidid567890abcdef1234,
wangwu,13822222222,44:33:22:11:66:55,356938035643815,openid_67890abcdef12345,,idfv_67890abcdef12345
zhangsan,13800000000,00:1A:2B:3C:4D:5E,356938035643809,openid_1234567890abcdef,,idfv_1234567890abcdef
,13800000000,11:22:33:44:55:66,356938035643810,openid_1234567890abcdef,androididabcdef1234567890,
,,AA:BB:CC:DD:EE:FF,356938035643811,,androididabcdef1234567890,
lisi,13811111111,33:22:11:44:55:66,356938035643814,openid_567890abcdef1234,androidid567890abcdef1234,
wangwu,13822222222,44:33:22:11:66:55,356938035643815,openid_67890abcdef12345,,idfv_67890abcdef12345
第一步
创建样例类
样例类根据数据的头文件进行生成
case class UserEventLog(username: String,
phone: String,
mac: String,
imei: String,
open_id: String,
android_id: String,
idfv: String)
phone: String,
mac: String,
imei: String,
open_id: String,
android_id: String,
idfv: String)
创建Spark对象
第二步
读取数据
df_20240706
Spark.read
第一行表头
.option("header", "true")
// ,分割
.option("sep", ",")
// 类型
.option("inferSchema", "true")
.csv("file:///D:\\YJX\\第二阶段\\太一用户画像\\002_code\\数仓项目代码\\tydt-offline\\tydt-offline-etl\\src\\test\\resources\\data\\day01.csv")
.option("header", "true")
// ,分割
.option("sep", ",")
// 类型
.option("inferSchema", "true")
.csv("file:///D:\\YJX\\第二阶段\\太一用户画像\\002_code\\数仓项目代码\\tydt-offline\\tydt-offline-etl\\src\\test\\resources\\data\\day01.csv")
val user_event_log_20240706: Dataset[UserEventLog] = df_20240706.as[UserEventLog]
段代码的核心功能就是把无类型的 DataFrame 转换为类型安全的 Dataset
第三步
数据过滤
按照分区在处理每一条
过滤null
缓存
val idsCache_20240706 = user_event_log_20240706.rdd
//按照分区处理再处理每一条
.mapPartitions(partIter => {
partIter.map(event => {
Array(event.username, event.phone, event.mac, event.imei, event.open_id, event.android_id, event.idfv)
//过滤null
.filter(StringUtils.isNotBlank)
})
//缓存
}).persist(StorageLevel.MEMORY_AND_DISK)
//按照分区处理再处理每一条
.mapPartitions(partIter => {
partIter.map(event => {
Array(event.username, event.phone, event.mac, event.imei, event.open_id, event.android_id, event.idfv)
//过滤null
.filter(StringUtils.isNotBlank)
})
//缓存
}).persist(StorageLevel.MEMORY_AND_DISK)
第四步
创建顶点集合
把顶点属性进行哈希 得到顶点值
循环遍历所有顶点
yield 集合生成器
去重
// 对每个 Array 进行处理并创建顶点集合
// 构建点集合 哈希 zhangsan 获取 顶点 顶点的属性就是zhangsan
val vertices_20240706: RDD[(VertexId, String)] = idsCache_20240706
.mapPartitions(partIter => {
// 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()
val hashFunction = Hashing.murmur3_128()
partIter.flatMap(arr => {
// (点的唯一标识, 点的数据) 传进来的值是String 所以用hashString i是属性zhangsan (hash,zhangsan)
// yield 集合生成器
for (i <- arr) yield ((hashFunction.hashString(i, StandardCharsets.UTF_8).asLong.abs), i)
})
})
// 去重
.distinct()
// 构建点集合 哈希 zhangsan 获取 顶点 顶点的属性就是zhangsan
val vertices_20240706: RDD[(VertexId, String)] = idsCache_20240706
.mapPartitions(partIter => {
// 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()
val hashFunction = Hashing.murmur3_128()
partIter.flatMap(arr => {
// (点的唯一标识, 点的数据) 传进来的值是String 所以用hashString i是属性zhangsan (hash,zhangsan)
// yield 集合生成器
for (i <- arr) yield ((hashFunction.hashString(i, StandardCharsets.UTF_8).asLong.abs), i)
})
})
// 去重
.distinct()
创建临时视图
第五步
创建边集合
源顶点
目标顶点
边数据
去重
// 对每个 Array 进行处理并创建边集合
val edges_20240706: RDD[Edge[String]] = idsCache_20240706
.mapPartitions(partIter => {
// 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()
val hashFunction = Hashing.murmur3_128()
partIter.flatMap(arr => {
// Edge(srcId:源顶点,dstId:目的顶点,attr:边数据)
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1)
yield Edge(
hashFunction.hashString(arr(i), StandardCharsets.UTF_8).asLong.abs,
hashFunction.hashString(arr(j), StandardCharsets.UTF_8).asLong.abs,
""
)
})
})
// 重复去重
.distinct()
val edges_20240706: RDD[Edge[String]] = idsCache_20240706
.mapPartitions(partIter => {
// 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()
val hashFunction = Hashing.murmur3_128()
partIter.flatMap(arr => {
// Edge(srcId:源顶点,dstId:目的顶点,attr:边数据)
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1)
yield Edge(
hashFunction.hashString(arr(i), StandardCharsets.UTF_8).asLong.abs,
hashFunction.hashString(arr(j), StandardCharsets.UTF_8).asLong.abs,
""
)
})
})
// 重复去重
.distinct()
第六步
构建图对象
顶点集合对象和边集合对象
val graph_20240706 = Graph(vertices_20240706, edges_20240706)
调用连通子图算法
val connectedSubgraph_20240706 = graph_20240706.connectedComponents()
如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2
第七步
重新整合表 提高可读性
val newIdMapping = spark.sql(
"""
|SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, t1.guid AS guid
|FROM subgraph t1
|LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode
|GROUP BY t1.id_hashcode, t2.id, t1.guid;
|""".stripMargin)
// 将结果按日期分区,写入 GUID 维度字典表 dim.dim_guid_dict
"""
|SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, t1.guid AS guid
|FROM subgraph t1
|LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode
|GROUP BY t1.id_hashcode, t2.id, t1.guid;
|""".stripMargin)
// 将结果按日期分区,写入 GUID 维度字典表 dim.dim_guid_dict
重新生成guid
//之前用的 是子图的最小hashcode 从新生成一个guid
// GUID 也可以采用 MD5/UUID 的方式
spark.sql(
"""
|SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, MD5(CAST(t1.guid AS STRING)) AS guid
|FROM subgraph t1
|LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode
|GROUP BY t1.id_hashcode, t2.id, t1.guid;
|""".stripMargin)
//.show(100, truncate = false)
// GUID 也可以采用 MD5/UUID 的方式
spark.sql(
"""
|SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, MD5(CAST(t1.guid AS STRING)) AS guid
|FROM subgraph t1
|LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode
|GROUP BY t1.id_hashcode, t2.id, t1.guid;
|""".stripMargin)
//.show(100, truncate = false)
存到维表中
使用行为数据中的ID标识去查这一日的GUID 维度字典表 找出对应的guid 回填至行为数据 并设置isNew=1
非第一次对数据的处理
第一步
创建样例类
创建spark对象
读取第二日的数据
处理数据 去null 和缓存
创建顶点集合
创建边集合
第二步
读取第一日的数据
正常是维表用文件代替
val dim_guid_dict: DataFrame = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.csv("file:///E:\\Projects\\IdeaProjects\\yjxxt\\tydt-offline\\tydt-offline-etl\\src\\test\\resources\\data\\day01_idmapping")
// 三列
.toDF("id_hashcode", "id", "guid")
.option("sep", ",")
.option("inferSchema", "true")
.csv("file:///E:\\Projects\\IdeaProjects\\yjxxt\\tydt-offline\\tydt-offline-etl\\src\\test\\resources\\data\\day01_idmapping")
// 三列
.toDF("id_hashcode", "id", "guid")
将上一日的数据构建点集合
val vertices_20240706: RDD[(VertexId, String)] = dim_guid_dict.rdd
.mapPartitions(partIter => {
partIter.map(row => (row.getAs[VertexId]("id_hashcode"), row.getAs[String]("id")))
})
.mapPartitions(partIter => {
partIter.map(row => (row.getAs[VertexId]("id_hashcode"), row.getAs[String]("id")))
})
还是分区按行读取
上一日的维表的id_hashcode,id 组合起来就是顶点集合
将上一日的数据构建边集合
第一步:提取(guid,id_hashcode)键值对
输入:第一日维表dim_guid_dict的每一行(包含id_hashcode、id、guid)。
输出:键值对(guid, id_hashcode),其中guid是第一日为用户分配的唯一标识(连通子图的最小哈希值),id_hashcode是该用户的某个 ID 的哈希值。
目的:按guid分组,将同一用户(同一guid)的所有 ID 哈希值归类到一起
输出:键值对(guid, id_hashcode),其中guid是第一日为用户分配的唯一标识(连通子图的最小哈希值),id_hashcode是该用户的某个 ID 的哈希值。
目的:按guid分组,将同一用户(同一guid)的所有 ID 哈希值归类到一起
第二步:按 guid 分组,聚合同一用户的所有 id_hashcode
作用:将第一步的键值对按guid分组,得到(guid, Iterable[id_hashcode])的结构。
示例:假设第一日中guid=A对应的id_hashcode有x、y、z,则分组后为(A, [x, y, z])。
目的:获取同一用户的所有 ID 哈希值的集合,后续需要让这些 ID 在图中保持连通。
第三步:为同一 guid 下的所有 id_hashcode 生成两两相连的边
逻辑:对每个guid对应的id_hashcode数组,生成所有 “两两组合” 的边。例如,数组[x, y, z]会生成:
Edge(x, y, "")
Edge(x, z, "")
Edge(y, z, "")
Edge(x, y, "")
Edge(x, z, "")
Edge(y, z, "")
目的:通过 “两两连边” 确保同一guid下的所有id_hashcode在图中属于同一连通子图(因为任意两个 ID 之间都有直接边连接)。
按第一日guid分组→同一组内 ID 两两连边” 的逻辑,复现了第一日的用户关联关系,为第二日的图计算提供了历史关联基础,最终保证跨日的用户唯一标识(guid)的一致性。
// 将上一日 dim_guid_dict 的数据构建点集合
val vertices_20240706: RDD[(VertexId, String)] = dim_guid_dict.rdd
.mapPartitions(partIter => {
partIter.map(row => (row.getAs[VertexId]("id_hashcode"), row.getAs[String]("id")))
})
// 将上一日 dim_guid_dict 的数据构建边集合
val edges_20240706: RDD[Edge[String]] = dim_guid_dict.rdd
.mapPartitions(partIter => {
partIter.map(row => (row.getAs[VertexId]("guid"), row.getAs[VertexId]("id_hashcode")))
})
// 分组
.groupByKey()
.mapPartitions(partIter => {
partIter.flatMap(kv => {
val arr = kv._2.toArray
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i), arr(j), "")
})
})
val vertices_20240706: RDD[(VertexId, String)] = dim_guid_dict.rdd
.mapPartitions(partIter => {
partIter.map(row => (row.getAs[VertexId]("id_hashcode"), row.getAs[String]("id")))
})
// 将上一日 dim_guid_dict 的数据构建边集合
val edges_20240706: RDD[Edge[String]] = dim_guid_dict.rdd
.mapPartitions(partIter => {
partIter.map(row => (row.getAs[VertexId]("guid"), row.getAs[VertexId]("id_hashcode")))
})
// 分组
.groupByKey()
.mapPartitions(partIter => {
partIter.flatMap(kv => {
val arr = kv._2.toArray
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i), arr(j), "")
})
})
这一日点union上一日点
val allEdges = edges_20240707.union(edges_20240706)
这一日边union上一日的边
val graph = Graph(allVertices, allEdges)
调用连通子图算法
val connectedSubgraph = graph.connectedComponents()
关键问题
更新后的图的guid 跟上一日已经存到维表里面的guid 不同 因为:如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2 可能导致跨日用户标识不一致
连通子图结果提取
提取关联之后的数据
val connectedSubgraphVertices_20240707 = connectedSubgraph.vertices.toDF("id_hashcode", "guid")
历史映射关系广播
将上一日 dim_guid_dict 的数据,收集到 Driver 端,然后作为变量广播出去
如果广播变量非常大,广播的时候可能会存在 OOM 的风险
从第一日的 GUID 维度字典表dim_guid_dict中提取(id_hashcode, guid)映射关系,转换为 Map 集合
通过广播变量bcIdMappingMap将该映射关系分发至所有 Executor,避免分布式计算中频繁传输大表数据,提升查询效率。
// 遍历 ID,挨个映射查找 把第二天的进行guid分组 然后拿着数组里面的数据去第一天找guid 用第一天的guid 去覆盖第二天的guid
分组:将第二日连通子图结果按临时guid分组,得到(tempGuid, Iterable[id_hashcode]),即每个临时子图包含的所有 ID 哈希值。
修正逻辑:对每个临时子图,遍历其包含的id_hashcode,若存在于第一日的映射关系中,则将该子图的guid更新为第一日对应的guid(确保跨日用户标识一致性);若不存在,则保留临时guid(新增用户)。
扁平化:将(finalGuid, Iterable[id_hashcode])转换为(id_hashcode, finalGuid)的扁平结构,得到修正后的id_hashcode与guid映射。
// 将这一日结果跟上一日的 GUID 维度字典表数据做对比,更新 GUID
val newIdMapping = connectedSubgraphVertices_20240707.rdd
.map(row => (row.getAs[VertexId]("guid"), (row.getAs[VertexId]("id_hashcode"))))
.groupByKey() // 按 guid 分组
.mapPartitions(partIter => {
partIter.map(kv => {
// 获取广播变量中上一日 dim_guid_dict 的数据 优化点
val idMappingMapForExecutor = bcIdMappingMap.value
var newGuid = kv._1
val ids_hashcode = kv._2
// 遍历 ID,挨个映射查找 把第二天的进行guid分组 然后拿着数组里面的数据去第一天找guid 用第一天的guid 去覆盖第二天的guid
breakable {
for (id_hashcode <- ids_hashcode if idMappingMapForExecutor.contains(id_hashcode)) {
newGuid = idMappingMapForExecutor(id_hashcode)
break()
}
}
// 还是返回 groupByKey 后的数据格式,只是 guid 已被更新
(newGuid, ids_hashcode)
})
})
val newIdMapping = connectedSubgraphVertices_20240707.rdd
.map(row => (row.getAs[VertexId]("guid"), (row.getAs[VertexId]("id_hashcode"))))
.groupByKey() // 按 guid 分组
.mapPartitions(partIter => {
partIter.map(kv => {
// 获取广播变量中上一日 dim_guid_dict 的数据 优化点
val idMappingMapForExecutor = bcIdMappingMap.value
var newGuid = kv._1
val ids_hashcode = kv._2
// 遍历 ID,挨个映射查找 把第二天的进行guid分组 然后拿着数组里面的数据去第一天找guid 用第一天的guid 去覆盖第二天的guid
breakable {
for (id_hashcode <- ids_hashcode if idMappingMapForExecutor.contains(id_hashcode)) {
newGuid = idMappingMapForExecutor(id_hashcode)
break()
}
}
// 还是返回 groupByKey 后的数据格式,只是 guid 已被更新
(newGuid, ids_hashcode)
})
})
新老用户标识
更新完的第二日维表之后 使用行为数据的ID标识去查这一日的GUID 维度字典表 找出对应的guid 回填到行为数据
再将第二日的行为数据中的guid 和第一次的GUID维表字典表使用guid 进行LEET JOIN JOIN上的就是老用户 JOIN不上的就是新用户
作业优化
0 条评论
下一页