准备数据
username,phone,mac,imei,open_id,android_id,idfv<br>zhangsan,13800000000,00:1A:2B:3C:4D:5E,356938035643809,openid_1234567890abcdef,,idfv_1234567890abcdef<br>,13800000000,11:22:33:44:55:66,356938035643810,openid_1234567890abcdef,androididabcdef1234567890,<br>,,AA:BB:CC:DD:EE:FF,356938035643811,,androididabcdef1234567890,<br>lisi,13811111111,33:22:11:44:55:66,356938035643814,openid_567890abcdef1234,androidid567890abcdef1234,<br>wangwu,13822222222,44:33:22:11:66:55,356938035643815,openid_67890abcdef12345,,idfv_67890abcdef12345
第一步
创建样例类
样例类根据数据的头文件进行生成
case class UserEventLog(username: String,<br> phone: String,<br> mac: String,<br> imei: String,<br> open_id: String,<br> android_id: String,<br> idfv: String)<br>
创建Spark对象<br>
第三步
数据过滤
按照分区在处理每一条
过滤null<br>
缓存
val idsCache_20240706 = user_event_log_20240706.rdd<br> //按照分区处理再处理每一条<br> .mapPartitions(partIter => {<br> partIter.map(event => {<br> Array(event.username, event.phone, event.mac, event.imei, event.open_id, event.android_id, event.idfv)<br> //过滤null<br> .filter(StringUtils.isNotBlank)<br> })<br> //缓存<br> }).persist(StorageLevel.MEMORY_AND_DISK)<br>
第四步
创建顶点集合
把顶点属性进行哈希 得到顶点值
循环遍历所有顶点
yield 集合生成器<br>
去重
// 对每个 Array 进行处理并创建顶点集合<br> // 构建点集合 哈希 zhangsan 获取 顶点 顶点的属性就是zhangsan<br> val vertices_20240706: RDD[(VertexId, String)] = idsCache_20240706<br> .mapPartitions(partIter => {<br> // 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()<br> val hashFunction = Hashing.murmur3_128()<br> partIter.flatMap(arr => {<br> // (点的唯一标识, 点的数据) 传进来的值是String 所以用hashString i是属性zhangsan (hash,zhangsan)<br>// yield 集合生成器<br> for (i <- arr) yield ((hashFunction.hashString(i, StandardCharsets.UTF_8).asLong.abs), i)<br> })<br> })<br>// 去重<br> .distinct()
创建临时视图
第五步
创建边集合
源顶点
目标顶点
边数据
去重
// 对每个 Array 进行处理并创建边集合<br> val edges_20240706: RDD[Edge[String]] = idsCache_20240706<br> .mapPartitions(partIter => {<br> // 建议采用碰撞率更低效率更高的 Hashing.murmur3_128()<br> val hashFunction = Hashing.murmur3_128()<br> partIter.flatMap(arr => {<br> // Edge(srcId:源顶点,dstId:目的顶点,attr:边数据)<br> for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1)<br> yield Edge(<br> hashFunction.hashString(arr(i), StandardCharsets.UTF_8).asLong.abs,<br> hashFunction.hashString(arr(j), StandardCharsets.UTF_8).asLong.abs,<br> ""<br> )<br> })<br> })<br>// 重复去重<br> .distinct()
第六步
构建图对象
顶点集合对象和边集合对象
val graph_20240706 = Graph(vertices_20240706, edges_20240706)
调用连通子图算法
val connectedSubgraph_20240706 = graph_20240706.connectedComponents()
如果多个点可以连通为子图,连通子图算法最终取最小的点,例如 3, 7, 5, 2,则取 2
第七步
重新整合表 提高可读性
val newIdMapping = spark.sql(<br> """<br> |SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, t1.guid AS guid<br> |FROM subgraph t1<br> |LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode<br> |GROUP BY t1.id_hashcode, t2.id, t1.guid;<br> |""".stripMargin)<br> // 将结果按日期分区,写入 GUID 维度字典表 dim.dim_guid_dict
重新生成guid<br>
//之前用的 是子图的最小hashcode 从新生成一个guid<br> // GUID 也可以采用 MD5/UUID 的方式<br> spark.sql(<br> """<br> |SELECT t1.id_hashcode AS id_hashcode, t2.id AS id, MD5(CAST(t1.guid AS STRING)) AS guid<br> |FROM subgraph t1<br> |LEFT JOIN vertex_20240706 t2 ON t1.id_hashcode = t2.id_hashcode<br> |GROUP BY t1.id_hashcode, t2.id, t1.guid;<br> |""".stripMargin)<br> //.show(100, truncate = false)
存到维表中
使用行为数据中的ID标识去查这一日的GUID 维度字典表 找出对应的guid 回填至行为数据 并设置isNew=1<br>