太一项目图计算SparkGraphX思维导图
2025-07-11 08:39:12 0 举报
AI智能生成
1
作者其他创作
大纲/内容
概念
点
边
点和边都可以设置值
截图
<br>
<br>
<br>
步骤
第一步
// 创建一个 RDD 用于表示顶点<br> val users: RDD[(VertexId, (String, String))] = sc.parallelize(<br> Array(<br> (3L, ("zhangsan", "student")), (7L, ("lisi", "postdoctoral")),<br> (5L, ("p1", "professor")), (2L, ("p2", "professor"))<br> )<br> )<br>
第二步
// 创建一个 RDD 用于表示边<br> val relationships: RDD[Edge[String]] = sc.parallelize(<br> Array(<br> // srcId:源顶点,dstId:目的顶点,attr:边的属性(边数据)<br> Edge(3L, 7L, "collaborator"), Edge(5L, 3L, "advisor"),<br> Edge(2L, 5L, "colleague"), Edge(5L, 7L, "PI") // Principal Investigator<br> )<br> )
第三步
// 连通性(连通子图):可以将每个顶点都关联到连通图里的最小顶点 如何生成图<br> val value = graph.connectedComponents()<br> value.vertices<br> .map(tp => (tp._2, tp._1))<br> .groupByKey()<br> .collect()<br> .foreach(println)<br> // 如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2<br> // 返回结果:(2,CompactBuffer(3, 7, 5, 2))<br>
第一次的数据处理步骤
准备数据
username,phone,mac,imei,open_id,android_id,idfv<br>zhangsan,13800000000,00:1A:2B:3C:4D:5E,356938035643809,openid_1234567890abcdef,,idfv_1234567890abcdef<br>,13800000000,11:22:33:44:55:66,356938035643810,openid_1234567890abcdef,androididabcdef1234567890,<br>,,AA:BB:CC:DD:EE:FF,356938035643811,,androididabcdef1234567890,<br>lisi,13811111111,33:22:11:44:55:66,356938035643814,openid_567890abcdef1234,androidid567890abcdef1234,<br>wangwu,13822222222,44:33:22:11:66:55,356938035643815,openid_67890abcdef12345,,idfv_67890abcdef12345
第一步
创建样例类
样例类根据数据的头文件进行生成
case class UserEventLog(username: String,<br> phone: String,<br> mac: String,<br> imei: String,<br> open_id: String,<br> android_id: String,<br> idfv: String)<br>
创建Spark对象<br>
第二步
读取数据
df_20240706
Spark.read<br>
第一行表头<br> .option("header", "true")<br>// ,分割<br> .option("sep", ",")<br>// 类型<br> .option("inferSchema", "true")<br> .csv("file:///D:\\YJX\\第二阶段\\太一用户画像\\002_code\\数仓项目代码\\tydt-offline\\tydt-offline-etl\\src\\test\\resources\\data\\day01.csv")
val user_event_log_20240706: Dataset[UserEventLog] = df_20240706.as[UserEventLog]
段代码的核心功能就是把无类型的 DataFrame 转换为类型安全的 Dataset
第三步
数据过滤
按照分区在处理每一条
过滤null<br>
缓存
val idsCache_20240706 = user_event_log_20240706.rdd<br> //按照分区处理再处理每一条<br> .mapPartitions(partIter => {<br> partIter.map(event => {<br> Array(event.username, event.phone, event.mac, event.imei, event.open_id, event.android_id, event.idfv)<br> //过滤null<br> .filter(StringUtils.isNotBlank)<br> })<br> //缓存<br> }).persist(StorageLevel.MEMORY_AND_DISK)<br>
第四步
创建顶点集合
把顶点属性进行哈希 得到顶点值
循环遍历所有顶点
yield 集合生成器<br>
去重
// 对每个 Array 进行处理并创建顶点集合<br> // 构建点集合 哈希 zhangsan 获取 顶点 顶点的属性就是zhangsan<br> val vertices_20240706: RDD[(VertexId, String)] = idsCache_20240706<br> .mapPartitions(partIter => {<br> // 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()<br> val hashFunction = Hashing.murmur3_128()<br> partIter.flatMap(arr => {<br> // (点的唯一标识, 点的数据) 传进来的值是String 所以用hashString i是属性zhangsan (hash,zhangsan)<br>// yield 集合生成器<br> for (i <- arr) yield ((hashFunction.hashString(i, StandardCharsets.UTF_8).asLong.abs), i)<br> })<br> })<br>// 去重<br> .distinct()
创建临时视图
第五步
创建边集合
源顶点
目标顶点
边数据
去重
// 对每个 Array 进行处理并创建边集合<br> val edges_20240706: RDD[Edge[String]] = idsCache_20240706<br> .mapPartitions(partIter => {<br> // 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()<br> val hashFunction = Hashing.murmur3_128()<br> partIter.flatMap(arr => {<br> // Edge(srcId:源顶点,dstId:目的顶点,attr:边数据)<br> for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1)<br> yield Edge(<br> hashFunction.hashString(arr(i), StandardCharsets.UTF_8).asLong.abs,<br> hashFunction.hashString(arr(j), StandardCharsets.UTF_8).asLong.abs,<br> ""<br> )<br> })<br> })<br>// 重复去重<br> .distinct()
第六步
构建图对象
顶点集合对象和边集合对象
val graph_20240706 = Graph(vertices_20240706, edges_20240706)
调用连通子图算法
val connectedSubgraph_20240706 = graph_20240706.connectedComponents()
如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2
第七步
重新整合表 提高可读性
val newIdMapping = spark.sql(<br> """<br> |SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, t1.guid AS guid<br> |FROM subgraph t1<br> |LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode<br> |GROUP BY t1.id_hashcode, t2.id, t1.guid;<br> |""".stripMargin)<br> // 将结果按日期分区,写入 GUID 维度字典表 dim.dim_guid_dict
重新生成guid<br>
//之前用的 是子图的最小hashcode 从新生成一个guid<br> // GUID 也可以采用 MD5/UUID 的方式<br> spark.sql(<br> """<br> |SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, MD5(CAST(t1.guid AS STRING)) AS guid<br> |FROM subgraph t1<br> |LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode<br> |GROUP BY t1.id_hashcode, t2.id, t1.guid;<br> |""".stripMargin)<br> //.show(100, truncate = false)
存到维表中
使用行为数据中的ID标识去查这一日的GUID 维度字典表 找出对应的guid 回填至行为数据 并设置isNew=1<br>
非第一次对数据的处理
<br>
第一步
创建样例类
创建spark对象<br>
读取第二日的数据
处理数据 去null 和缓存<br>
创建顶点集合
创建边集合
第二步
读取第一日的数据
正常是维表用文件代替
val dim_guid_dict: DataFrame = spark.read<br> .option("sep", ",")<br> .option("inferSchema", "true")<br> .csv("file:///E:\\Projects\\IdeaProjects\\yjxxt\\tydt-offline\\tydt-offline-etl\\src\\test\\resources\\data\\day01_idmapping")<br>// 三列<br> .toDF("id_hashcode", "id", "guid")<br>
将上一日的数据构建点集合
val vertices_20240706: RDD[(VertexId, String)] = dim_guid_dict.rdd<br> .mapPartitions(partIter => {<br> partIter.map(row => (row.getAs[VertexId]("id_hashcode"), row.getAs[String]("id")))<br> })
还是分区按行读取
上一日的维表的id_hashcode,id 组合起来就是顶点集合<br>
将上一日的数据构建边集合
第一步:提取(guid,id_hashcode)键值对
输入:第一日维表dim_guid_dict的每一行(包含id_hashcode、id、guid)。<br>输出:键值对(guid, id_hashcode),其中guid是第一日为用户分配的唯一标识(连通子图的最小哈希值),id_hashcode是该用户的某个 ID 的哈希值。<br>目的:按guid分组,将同一用户(同一guid)的所有 ID 哈希值归类到一起
第二步:按 guid 分组,聚合同一用户的所有 id_hashcode
作用:将第一步的键值对按guid分组,得到(guid, Iterable[id_hashcode])的结构。
示例:假设第一日中guid=A对应的id_hashcode有x、y、z,则分组后为(A, [x, y, z])。
目的:获取同一用户的所有 ID 哈希值的集合,后续需要让这些 ID 在图中保持连通。
第三步:为同一 guid 下的所有 id_hashcode 生成两两相连的边
逻辑:对每个guid对应的id_hashcode数组,生成所有 “两两组合” 的边。例如,数组[x, y, z]会生成:<br>Edge(x, y, "")<br>Edge(x, z, "")<br>Edge(y, z, "")
目的:通过 “两两连边” 确保同一guid下的所有id_hashcode在图中属于同一连通子图(因为任意两个 ID 之间都有直接边连接)。
按第一日guid分组→同一组内 ID 两两连边” 的逻辑,复现了第一日的用户关联关系,为第二日的图计算提供了历史关联基础,最终保证跨日的用户唯一标识(guid)的一致性。
// 将上一日 dim_guid_dict 的数据构建点集合<br> val vertices_20240706: RDD[(VertexId, String)] = dim_guid_dict.rdd<br> .mapPartitions(partIter => {<br> partIter.map(row => (row.getAs[VertexId]("id_hashcode"), row.getAs[String]("id")))<br> })<br><br> // 将上一日 dim_guid_dict 的数据构建边集合<br> val edges_20240706: RDD[Edge[String]] = dim_guid_dict.rdd<br> .mapPartitions(partIter => {<br> partIter.map(row => (row.getAs[VertexId]("guid"), row.getAs[VertexId]("id_hashcode")))<br> })<br>// 分组<br> .groupByKey()<br> .mapPartitions(partIter => {<br> partIter.flatMap(kv => {<br> val arr = kv._2.toArray<br> for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i), arr(j), "")<br> })<br> })<br>
这一日点union上一日点<br>
val allEdges = edges_20240707.union(edges_20240706)
这一日边union上一日的边<br>
val graph = Graph(allVertices, allEdges)
调用连通子图算法
val connectedSubgraph = graph.connectedComponents()
关键问题
更新后的图的guid 跟上一日已经存到维表里面的guid 不同 因为:如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2 可能导致跨日用户标识不一致<br>
连通子图结果提取
提取关联之后的数据
val connectedSubgraphVertices_20240707 = connectedSubgraph.vertices.toDF("id_hashcode", "guid")
历史映射关系广播
将上一日 dim_guid_dict 的数据,收集到 Driver 端,然后作为变量广播出去
如果广播变量非常大,广播的时候可能会存在 OOM 的风险
从第一日的 GUID 维度字典表dim_guid_dict中提取(id_hashcode, guid)映射关系,转换为 Map 集合
通过广播变量bcIdMappingMap将该映射关系分发至所有 Executor,避免分布式计算中频繁传输大表数据,提升查询效率。
// 遍历 ID,挨个映射查找 把第二天的进行guid分组 然后拿着数组里面的数据去第一天找guid 用第一天的guid 去覆盖第二天的guid
分组:将第二日连通子图结果按临时guid分组,得到(tempGuid, Iterable[id_hashcode]),即每个临时子图包含的所有 ID 哈希值。<br><br>
修正逻辑:对每个临时子图,遍历其包含的id_hashcode,若存在于第一日的映射关系中,则将该子图的guid更新为第一日对应的guid(确保跨日用户标识一致性);若不存在,则保留临时guid(新增用户)。<br><br>
扁平化:将(finalGuid, Iterable[id_hashcode])转换为(id_hashcode, finalGuid)的扁平结构,得到修正后的id_hashcode与guid映射。
// 将这一日结果跟上一日的 GUID 维度字典表数据做对比,更新 GUID<br> val newIdMapping = connectedSubgraphVertices_20240707.rdd<br> .map(row => (row.getAs[VertexId]("guid"), (row.getAs[VertexId]("id_hashcode"))))<br> .groupByKey() // 按 guid 分组<br> .mapPartitions(partIter => {<br> partIter.map(kv => {<br> // 获取广播变量中上一日 dim_guid_dict 的数据 优化点<br> val idMappingMapForExecutor = bcIdMappingMap.value<br> var newGuid = kv._1<br> val ids_hashcode = kv._2<br> // 遍历 ID,挨个映射查找 把第二天的进行guid分组 然后拿着数组里面的数据去第一天找guid 用第一天的guid 去覆盖第二天的guid<br> breakable {<br> for (id_hashcode <- ids_hashcode if idMappingMapForExecutor.contains(id_hashcode)) {<br> newGuid = idMappingMapForExecutor(id_hashcode)<br> break()<br> }<br> }<br> // 还是返回 groupByKey 后的数据格式,只是 guid 已被更新<br> (newGuid, ids_hashcode)<br> })<br> })
新老用户标识
更新完的第二日维表之后 使用行为数据的ID标识去查这一日的GUID 维度字典表 找出对应的guid 回填到行为数据<br>
再将第二日的行为数据中的guid 和第一次的GUID维表字典表使用guid 进行LEET JOIN JOIN上的就是老用户 JOIN不上的就是新用户<br>
作业优化
0 条评论
下一页