Spark基础
2023-06-04 11:25:59 2 举报
AI智能生成
详细总结了spark的基础知识点,包含spark的RDD,SparkSQL,SparkStreaming等等
作者其他创作
大纲/内容
Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,将Spark SQL转换成RDD,然后提交到集群执行
定义
易整合
统一的数据访问方式
兼容Hive
标准的数据连接
特点
DataFrame是一个分布式数据容器,除了数据以外,还记录数据的结构信息,即schema,也支持嵌套数据类型(struct、array和map)
DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程
优化的执行计划:查询计划通过Spark catalyst optimiser进行优化
DataFrame也是懒执行的。性能上比RDD要高,主要原因:
分支主题
与RDD的关系图
DataFrame
DataSet比DataFrame多展示了数据的类型
DataSet
Spark SQL概述
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合
SparkSession
// RDD转换为DF,DS时,需要增加隐式转换,需要引入spark环境对象的隐式转换规则
import spark.implicits._
RDD -> DataFrame
val rdd1 = df.rdd
DataFrame -> RDD
RDD - DataFrame
val userRDD = rdd.map {
}
}
val userDS = userRDD.toDS()
RDD -> DataSet
val rdd3 = userDS.rdd
DataSet -> RDD
RDD DataSet
val ds = df.as[User]
DataFrame -> DataSet
val df1 = ds.toDF()
DataSet -> DataFrame
DataFrame - DataSet
图示
三者均是Spark平台下的分布式弹性数据集
都有惰性机制,Action时才运算
根据内存,自动缓存运算
均有partition概念
import spark.implicits._,需要导入此包支持操作
共性
RDD不支持SparkSQL
与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同
差异
共性与差异
RDD、DataFrame、DataSet
// 1. 继承UserDefinedAggregateFunction// 2. 重写方法
// 主方法中注册函数
用户自定义函数
SparkSQL编程
修改配置项spark.sql.sources.default,可修改默认数据源格式
当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet)
Spark SQL的默认数据源为Parquet格式
spark.read.format("json").load
peopleDF.write.format("parquet").save
手动指定选项
SaveMode.ErrorIfExists(default)
SaveMode.Append
SaveMode.Overwrite
SaveMode.Ignore
文件保存选项
通用加载/保存方法
Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row].
可以通过SparkSession.read.json()去加载一个 一个JSON 文件
spark.read.json
JSON文件
Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录
Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法
spark.read.parquet
Parquet文件
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame
通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中
需要将相关的数据库驱动放到spark的类路径下
JDBC
内嵌Hive使用
外部Hive应用
Hive数据库
SparkSQL数据源
SparkSQL实战
Spark SQL
Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列
易用
容错
架构
Spark Streaming概述
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。
DStream入门
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取
文件需要有相同的数据格式;
文件进入 dataDirectory的方式需要通过移动或者重命名来实现;
一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;
注意事项
文件数据源
ssc.queueStream(queueOfRDDs)
//3.创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建QueueInputDStream
//8.循环创建并向RDD队列中放入RDD
for (i <- 1 to 5) {
Thread.sleep(2000)
}
用法
RDD队列
//3.创建自定义receiver的Streaming
//最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
//读数据并将数据发送给Spark
def receive(): Unit = {
//创建一个Socket
//定义一个变量,用来接收端口传过来的数据
var input: String = null
//创建一个BufferedReader用于读取端口传来的数据
//读取数据
input = reader.readLine()
//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
//跳出循环则关闭资源
reader.close()
socket.close()
//重启任务
restart("restart")
override def onStop(): Unit = {}
}
CustomerReceiver
自定义数据源
在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
导入依赖
Kafka数据源
DStream创建
DStream转换
DStream输出
SparkStreaming
Hadoop1.x版本的问题
Hadoop2.x版本
Hadoop小剧场
为什么使用函数式编程
Spark小剧场
Spark是基于内存的快速、通用。可扩展的大数据分析引擎
什么是Spark
Spark SQL 结构化数据 | Spark Streaming 实时计算
Spark Core
独立调度器 | Yarn | Mesos
模块分区
实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义
是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等
是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应
Spark Streaming
提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能
Spark MLlib
Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。
集群管理器
模块解释
Spark内置模块
Spark概述
Spark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行
Driver:任务的调度和切分
概述
把用户程序转为作业(JOB)
跟踪Executor的运行状况
为执行器节点调度任务
UI展示应用运行状况
功用
Driver(驱动器)
Spark Executor是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。
Executor:任务的执行
负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
Executor(执行器)
重要角色
所有计算运行在一个线程中
local
指定使用几个线程来运行计算,通常CPU有几核,就指定几个线程
local[K]
按CPU内部最多的Cores设置线程数
local[*]
模式
bin/spark-submit \\
--class <main-class>
--master <master-url> \\
--deploy-mode <deploy-mode> \\
--conf <key>=<value> \\
... # other options
<application-jar> \\
[application-arguments]
基本语法
--master 指定Master的地址,默认为Local
--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
--deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
--conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”
application-arguments: 传给main()方法的参数
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个
参数说明
Local模式
Standalone模式
Yarn模式
运行模式
Spark运行模式
Spark是一个分布式数据集的分析框架,将计算单元缩小为更适合分布式计算和并行计算的模型,称之为RDD
自动进行内存和磁盘数据存储的切换
基于Lineage(血统)的高效容错机制
Task如果失败,自动进行特定次数的重试
Stage如果失败,自动进行特定次数的重试
checkpoint和persist,可主动或被动触发
数据调度弹性:DAGScheduler、TaskScheduler与资源管理无关
数据分片的高度弹性(coalesce)
弹性
数据的来源
分布式
数据的类型 & 计算逻辑的封装 (数据模型)
数据集
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。
计算逻辑不可变
不可变
提高数据处理能力
可分区
多任务同时执行
并行计算
代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
什么是RDD
一组分区(Partition)即数据集的基本组成单位
一个计算各个分区间的函数
一个有关于各个RDD间依赖关系的列表
优先位置是为了利于计算
一个存储存取每个Partition的优先位置(preferred location)的列表
一个关于键值key-value分片的Partitioner
RDD的属性
RDD逻辑上是分区的,每个分区的数据是抽象存在的
如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据
如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换
计算的时候会通过一个compute函数得到每个分区的数据
分区
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系
一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中
要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD
只读
RDDs维护着操作算子转换的血缘关系,即依赖
一对一
窄依赖
多对多
宽依赖
依赖的分类
依赖
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,计算一次后进入缓存,就不再根据血缘关系计算
缓存
迭代会使RDDs之间的血缘关系变长,因此RDD引入checkpoint,将数据持久化存储,从而减轻血缘关系的依赖
Checkpoint
RDD的特点
RDD概述
从集合中创建RDD(内存)
val rdd2= sc.textFile("hdfs://hadoop131:9000/RELEASE")
从外部的存储创建RDD(硬盘)
从其他RDD创建(转换)
RDD的创建
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
map(fuc)
类似于map,但独立地在RDD的每一个分片(分区)上运行.
mapPartitions(fuc)
map():每次处理一条数据
mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM
当内存空间较大的时候建议使用mapPartitions(),以提高处理效率
map()和mapPartitions()的区别
mapPartitionsWithIndex(fuc)
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
flatMap(fuc)
将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
glom(配合flatmap可以合并分区)(fuc)
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
groupBy(fuc)
过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
filter(fuc)
以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它
distinct([numTasks])
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
coalesce(numPartitions)
根据分区数,重新通过网络随机洗牌所有数据
repartition(numPartitions)
coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定
repartition实际上是调用的coalesce,默认是进行shuffle的
coalesce和repartition的区别
使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序
Value类型
并集
union(otherDataset)
差集
subtract (otherDataset)
交集
intersection(otherDataset)
笛卡尔积
cartesian(otherDataset)
zip(otherDataset)
双Value类型交互
对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程
partitionBy
groupByKey也是对每个key进行操作,但只生成一个sequence
groupByKey
groupByKey:按照key进行分组,直接进行shuffle
一般情况下reduceByKey比groupByKey更好,但要注意combine的使用条件必须是不影响最终的业务逻辑
reduceByKey和groupByKey的区别
在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
即计算两次,一次分区内,一次分区外
aggregateByKey
aggregateByKey分区内和分区外计算规则相同可用foldByKey
foldByKey
createCombiner
mergeValue
mergeCombiners
将第一个key出现的v转换结构计算规则,第二个参数表示分区内计算规则,第三个参数表示分区间计算规则
combineByKey[C]
mapValues
Key-Value类型
RDD的转换
通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
reduce(func)
在驱动程序中,以数组的形式返回数据集的所有元素。
collect()
返回RDD中元素的个数
count()
返回RDD中的第一个元素
first()
返回一个由RDD的前n个元素组成的数组
take(n)
返回该RDD排序后的前n个元素组成的数组
takeOrdered(n)
aggregate
折叠操作,aggregate的简化操作,seqop和combop一样
fold(num)(func)
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
在数据集的每一个元素上,运行函数func进行更新
foreach(func)
Action
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的
函数的序列化 extends Serializable
传递方法
传递属性
RDD中的函数传递
在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区
RDD的Lineage会记录RDD的元数据信息和转换行为
.toDebugString
如何查看Lineage
.dependencies
如何查看依赖类型
Lineage
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
一对一关系
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle
多对多关系
原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据
DAG(Directed Acyclic Graph)
初始化一个SparkContext即生成一个Application,Driver也可理解为Application
Application
一个Action算子就会生成一个Job,只针对于Action算子
Job
根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage
Stage
Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task
Task
名词解释
Application->Job->Stage-> Task每一层都是1对n的关系
任务划分
RDD依赖关系
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中
RDD缓存
本质是通过将RDD写入Disk做检查点
检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能
在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发
RDD CheckPoint
RDD编程
Hash分区为当前的默认分区
Hash
Range
Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
partitioner属性
获取RDD分区
计算key的hashcode,与分区个数取余,小于0则加分区个数,大于0则加0
弊端:导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据
Hash分区
将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,但是分区内的元素是不能保证顺序的。
从整个RDD中抽取样本并排序,计算每个分区最大key,形成数组变量
判断key在rangeBounds中的范围,赋予id下标
实现步骤
Ranger分区
继承Partitioner
numPartitions: Int:返回创建出来的分区数。
getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
判断相等性的标准方法
equals():Java
实现方法
自定义分区
键值对RDD数据分区器
读取textFile
保存saveAsTextFile
Text文件
使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件
import scala.util.parsing.json.JSON
解析map
Json文件
Csv文件
Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)
SequenceFile文件只针对PairRDD
读取sequenceFile()
保存saveAsSequenceFile()
Sequence文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制
读取objectFile
保存saveAsObjectFile
Object文件
文件类数据读取与保存
输入格式(InputFormat)
键类型
值类型
分区值
hadoopRDD
newHadoopRDD
最抽象的两个函数接口
Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
map-reduce如何读取某一类型数据,将该对应读取方式改写为上述的两种接口
备注
HDFS
jdbcRDD
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
添加依赖
// 定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//创建JdbcRDD
Class.forName(driver)
)
MySQL读取
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.executeUpdate()
})
MySQL写入
代码详解
MySQL数据库
Spark 可以通过Hadoop输入格式访问HBase
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
<artifactId>hbase-client</artifactId>
//构建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
//从HBase读取数据形成RDD
classOf[Result])
HBase读取
//创建HBaseConf
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
//构建Hbase表描述器
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//创建Hbase表
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
admin.createTable(tableDescr)
//定义往Hbase插入数据的方法
val put = new Put(Bytes.toBytes(triple._1))
//创建一个RDD
//将RDD内容写到HBase
val localData = initialRDD.map(convert)
localData.saveAsHadoopDataset(jobConf)
HBase写入
HBase数据库
文件系统类数据读取与保存
数据读取与保存
弹性分布数据集
RDD
分布式共享只写数据
累加器
分布式共享只读数据
广播变量
Spark三大数据结构
所有分片处理时更新共享变量
系统累加器
extends org.apache.spark.util.AccumulatorV2
自定义累加器
全局可读
广播变量(调优策略)
RDD编程进阶
扩展
SparkCore
Spark笔记
0 条评论
回复 删除
下一页