sparkAPI、RDD总结
2019-11-27 10:08:12 0 举报
AI智能生成
spark结构化apijiRDD总结
作者其他创作
大纲/内容
Spark RDD小结
1. 什么是RDD
介绍
弹性分布式数据集,一种容错的并行数据结构<br>
一种数据抽象,只读的、分区记录集合<br>——在此之上,提供了丰富的操作用来处理RDD<br>
Spark的基石,也是Spark的灵魂。<br>——RDD是Spark最核心最精髓的部分,Spark将所有数据都抽象成RDD。<br>
5个特性
分区信息(Partition):<br>-- 数据集的基本组成单位<br>
一系列的分区信息。<br>每一个分区都会被一个任务处理。 ---决定了并行度。<br> 创建RDD时,可以指定RDD的分区数,如果没有指定,采用默认值。<br>
RDD是一组分区,RDD由分区组成<br>
分区个数默认与CPU核数个数有关<br>
计算的函数 : <br>-- 对于给定的数据集,需要做哪些计算<br>
由一个函数计算每一个分片。RDD的计算以分片为单位。<br>
依赖关系 : <br>-- RDD的依赖关系,描述了RDD之间的Lineage<br>
RDD每一次转换都生成一个新的RDD,多个RDD之间有前后依赖关系。<br> 在某个分区数据丢失时,Spark可以通过这层依赖关系重新计算丢失的分区数据,<br>而不是重头对RDD的所有分区数据进行计算。→容错性<br>
Partitioner<br>函数: <br>-- 对于计算出来的数据结果如何分发<br>
Partitioner是RDD中的分区函数,数据按一定规则分配到指定的Reducer上去处理。<br>两种分区;Hash Partitioner、RangePartitioner<br> <font color="#c41230">key-value的数据才有Partitioner</font>,普通的数据Partitioner为None<br>
优先位置列表 :<br>-- 对于data partition的位置偏好<br>
HDFS -> Partitioner所在的Block的位置。<br><br> 分配任务时,会尽量将任务分配给处理数据块的位置。<br>
2. 创建RDD<br>
基于parallelize创建<br>
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\<br> .split(" ")<br><br>words = spark.sparkContext.parallelize(myCollection, 2)<br>
parallelize函数可以传入分片个数参数,否则采用defaultParallelism。<br>
基于外部数据源创建<br>
distFile = sc.textFile("file:///home/camel/Repos/spark/README.md")<br>distFile.count()<br>
textFile函数支持从多种源创建RDD,如hdfs://,s3n://
基于父RDD转换得来<br>
rdd2 = rdd1.xxx()<br>
入口:<br>spark.sparkContext # 或者直接调用 sc<br>
3. RDD常用算子<br>
转换(transformantion)<br>
在一个已存在的 RDD上创建一个新的 RDD,<br>但实际的计算并没有执行,仅仅记录操作过程<br>
在RDD上调用distinct方法,删除重复数据:<br>words.distinct().count() #9<br>
对RDD进行过滤,保留以字母“ S”开头的单词:<br>def startsWithS(individual):<br> return individual.startswith("S")<br>words.filter(lambda word: startsWithS(word)).collect()<br>
map: 将函数作用到数据集的每一个元素上,生成一个新的分布式的数据集(RDD)返回<br>words2 = words.map(lambda word: (word, word[0], word.startswith("S")))<br>words2.filter(lambda record: record[2]).take(5) <br>
flatMap操作也是对RDD中每个元素进行操作的,但是它的操作结果是一对一或者是一对多的<br>words.flatMap(lambda word: list(word)).take(5)<br>
按单词长度从最长到最短排序<br>words.sortBy(lambda word: len(word) * -1).take(2) <br>
动作(action)<br>
执行 RDD记录的所有运行transformations操作,<br>并计算结果,结果可返回到 driver程序<br>
指定一函数将RDD中数据任意个数的数据值合并为一个值<br>spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210<br>
count<br>使用它计算RDD中的行数<br>words.count() #9
first<br>返回结果集的第一个值<br>words.first()
max and min<br>分别返回结果中最大、最小值<br>sc.parallelize(range(1, 20)).max()<br>sc.parallelize(range(1, 20)).min()
保存结果数据到文件<br>saveAsTextFile<br>words.saveAsTextFile(“file:/tmp/bookTitle”)
Spark结构化API总结
1.创建DataFrame和SQL临时表
创建Dataframe
从数据源创建
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
通过row转换
导包:<br>from pyspark.sql import Row<br>from pyspark.sql.types <br>import StructField, StructType, StringType, LongType<br>
myManualSchema = StructType([ StructField("some", StringType(), True), <br>StructField("col", StringType(), True), <br>StructField("names", LongType(), False) ]) <br><br>myRow = Row("Hello", None, 1) <br>myDf = spark.createDataFrame([myRow], myManualSchema)
从RDD
创建Sql临时表
df.createOrReplaceTempView("dfTable")
Schema
查看:df.printSchema()<br>
创建:<br><font color="#f1753f">myManualSchema = StructType([<br>StructField("DEST_COUNTRY_NAME", StringType(), True),<br>StructField("ORIGIN_COUNTRY_NAME", StringType(), True),<br>StructField("count", LongType(), False, metadata={"hello":"world"})<br>])</font>
应用:df = spark.read.format("json").schema(<font color="#c41230">myManualSchema</font>)\<br>.load("/data/flight-data/json/2015-summary.json")<br>
2.数据源对接<br>
读模式
读取数据的核心结构:<br>DataFrameReader.format(...).option("key", "value").schema(...).load()<br>
spark.read.format("csv")<br>.option("mode", "FAILFAST")<br>.option("inferSchema", "true")<br>.option("path", "path/to/file(s)")<br>.schema(someSchema)<br>.load()
写模式
写入数据的核心结构:<br>DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(<br>...).save() <br>
dataframe.write.format("csv")<br>.option("mode", "OVERWRITE")<br>.option("dateFormat", "yyyy-MM-dd")<br>.option("path", "path/to/file(s)")<br>.save()
可对接的六大核心数据源
CSV<br>JSON<br>Parquet<br>ORC<br>JDBC/ODBC connections<br>Plain-text files
3.dataframe基本操作<br>
转换
字段选择
<b>Python: <br></b>df.select("DEST_COUNTRY_NAME").show(2)<br>
<b>Spark SQL:<br></b>SELECT columnName * 10, otherColumn, someOtherCol as c FROM dataFrameTable<b><br></b>
增加列
<b>Python:<br></b>df.withColumn("numberOne", lit(1)).show(2) <br>
<b>Spark SQL:<br></b>SELECT *, 1 as numberOne FROM dfTable LIMIT 2<b><br></b>
修改列
<b>Python:</b><br>df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns<br>
删除列
<b>Python: </b><br>df.drop("ORIGIN_COUNTRY_NAME").columns<br>
修改类型
<b>Python : </b><br>df.withColumn("count2", col("count").cast("long"))<br>
条件过滤
<b>Python :</b><br>df.filter(col("count") < 2).show(2) <br>df.where("count < 2").show(2)<br>
<b>Spark SQL:<br></b>SELECT * FROM dfTable WHERE count < 2 LIMIT 2<b><br></b>
去重
<b>Python:</b><br>df.select("id","sno").distinct().count()<br>
<b>Spark SQL:<br></b>SELECT COUNT(DISTINCT(id, sno)) FROM dfTable<b><br></b>
排序
<b>Python:</b><br>df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)<br>
<b>Spark SQL:<br></b>SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2<b><br></b>
有限选择
<b>Python:</b><br>df.orderBy(expr("count desc")).limit(6).show()<br>
<b>Spark SQL:<br></b>SELECT * FROM dfTable ORDER BY count desc LIMIT 6<b><br></b>
4.dataframe聚合操作<br>
聚合算法
计数
<b>Python:</b><br>df.select(count("StockCode")).show()<br>
<b>Spark SQL:<br></b>SELECT COUNT(*) FROM dfTable<b><br></b>
去重计数
<b>Python:</b><br>df.select(countDistinct("StockCode")).show()<br>
<b>Spark SQL:<br></b>SELECT COUNT(DISTINCT *) FROM DFTABLE<b><br></b>
规定有效数字计数
<b>Python:</b><br>df.select(approx_count_distinct("StockCode", 0.1)).show()<br>
<b>Spark SQL:<br></b>SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE<b><br></b>
第一个和最后一个
<b>Python:</b><br>df.select(first("StockCode"), last("StockCode")).show()<br>
<b>Spark SQL:<br></b>SELECT first(StockCode), last(StockCode) FROM dfTable<b><br></b>
最大最小值
<b>Python:</b><br>df.select(min("Quantity"), max("Quantity")).show()<br>
<b>Spark SQL:<br></b>SELECT min(Quantity), max(Quantity) FROM dfTable<b><br></b>
求和
<b>Python:</b><br>df.select(sum("Quantity")).show()<br>
<b>Spark SQL:<br></b>SELECT sum(Quantity) FROM dfTable<b><br></b>
区别求和
<b>Python:</b><br>df.select(sumDistinct("Quantity")).show()<br>
<b>Spark SQL:<br></b>SELECT SUM(Quantity) FROM dfTable<b><br></b>
......
分组聚合
from pyspark.sql.functions import count<br><br>df.groupBy("InvoiceNo").agg(<br>count("Quantity").alias("quan"),<br>expr("count(Quantity)")).show()<br>
实现K-V映射
<b>Python:</b><br>df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show() <br>
<b>Spark SQL:<br></b>SELECT avg(Quantity), stddev_pop(Quantity), InvoiceNo FROM dfTable<br>GROUP BY InvoiceNo<br>
5. SparkSQL<br>
创建表
CREATE TABLE flights (<br>DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)<br>USING JSON OPTIONS (path '/data/flight-data/json/2015-summary.json')
删除表
DROP TABLE flights_csv;
DROP TABLE IF EXISTS flights_csv;<br>
插入数据
INSERT INTO flights_from_select<br>SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
收藏
收藏
0 条评论
下一页