大数据知识图谱
2021-11-18 09:27:51 2 举报
AI智能生成
大数据常用组件知识点
作者其他创作
大纲/内容
1.离线
核心诉求
处理时间要求不高;
处理数据量巨大;
处理数据格式多样;
支持SQL类作业和自定义作业;
数据处理大致可以分成两大类
OLAP
联机分析处理
OLTP
联机事务处理
常用组件
Hadoop
分布式的计算存储框架
分布式的计算存储框架
1/2/3区别
1->2
1、联邦机制,两个nameNode,防止出现单点故障
2、yarn
2->3
java的运行环境最低1.8
HDFS支持纠删码,节省了空间,只需要1.4倍的空间就能保证数据的完整性
Yarn的时间线服务
支持对于两个以上的namenode(一个active,多个standby状态)
MapReduce本地优化,性能提升30%,但是不稳定
三个主要部分
HDFS
Yarn
MapReduce
HDFS:
分布式文件系统,为各种批处理引擎提供数据存储,可以存储各种文件格式数据。
分布式文件系统,为各种批处理引擎提供数据存储,可以存储各种文件格式数据。
主从架构
Master
namenode
元数据管理
一条元数据大概150个字节
一条元数据大概150个字节
fsimage
元数据的镜像文件
目录(创建时间,访问权限)和文件(创建时间)以及相关属性的序列化信息
editslog
编辑日志文件
所有的更新操作,写操作
通过配置文件设置fsimage和editslog,设置副本
Slave
datanode
存储策略:机架感知
架构稳定性
心跳机制和重新复制
数据的完整性checkSum
元数据的磁盘故障
HDFS支持快照
HDFS文件读写原理
Write写
read读
YARN:
资源调度引擎,为各种批处理引擎提供资源调度能力。
资源调度引擎,为各种批处理引擎提供资源调度能力。
架构
ResourceManager
(RM)作为集群的全局资源管理器,一个集群一个,负责整个集群的资源管理和分配,监控ApplicationMaster的运行情况以及NodeManager的资源情况,想当于Master节点。
RM主要对资源进行管理以及分配,不负责task,但是会监控AM的运行情况,如果AM运行失败,会重新分配container,重启AM。
RM主要对资源进行管理以及分配,不负责task,但是会监控AM的运行情况,如果AM运行失败,会重新分配container,重启AM。
NodeManager
(NM)负责集群中每个节点的资源以及使用,运行任务,相当于Slave节点。
NM负责该节点容器的资源使用情况,不监控task。
NM负责该节点容器的资源使用情况,不监控task。
ApplicationMaster
(AM)对集群中的某个应用程序进行管理,向RM(ResourceManager)申请资源以及分配监控任务。
AM主要申请资源,然后分配任务给相应的NM,同时监控task的运行情况,如果任务失败,那么重新申请资源,在新的container中运行task
AM主要申请资源,然后分配任务给相应的NM,同时监控task的运行情况,如果任务失败,那么重新申请资源,在新的container中运行task
container
资源单位,包括cpu,内存等,不同于mrv1中slot,它是动态资源,而原来是事先程序员定好的,比如某个节点的map slot和reduce slot都是固定的,影响了集群的使用。
RPC
Remote Procedure Call,yarn中各个组件间的通信采用的是rpc机制
application
我们提交给yarn需要运行的应用程序
task
为了完成application,需要完成多个任务,任务包括map task和reduce task
executor
MapReduce:
分布式的计算框架;大数据批处理引擎,用于处理海量数据,但是处理速度较慢。
分布式的计算框架;大数据批处理引擎,用于处理海量数据,但是处理速度较慢。
原语:相同的Key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算
并行度
集群规模决定Reduce的并行度
切块大小(块)决定Map阶段的并行度
流程
第一阶段map
1.map task读取HDFS文件。每个block,启动一个map task。每个map task按照行读取一个block中的内容,对每一行执行map函数
2.map函数对输入的数据进行拆分split,得到一个数组,组成一个键值对<word, 1>
3.分区,每个分区对应1个reduce task
4.对每个分区中的数据,按照key进行分组并排序
5.在map段执行小reduce,输出<key,times>
第二阶段reduce
1.每个分区对应一个reduce task,这个reduce task会读取相同分区的map输出;reduce task对接收到的所有map输出,进行排序分组<hello,{1,1}><me,{1}><you,{1}>
2.执行reduce 操作,对一个分组中的value进行累加<hello,2><me,1><you,1>
3.每个分区输出到一个HDFS文件中
1.map task读取HDFS文件。每个block,启动一个map task。每个map task按照行读取一个block中的内容,对每一行执行map函数
2.map函数对输入的数据进行拆分split,得到一个数组,组成一个键值对<word, 1>
3.分区,每个分区对应1个reduce task
4.对每个分区中的数据,按照key进行分组并排序
5.在map段执行小reduce,输出<key,times>
第二阶段reduce
1.每个分区对应一个reduce task,这个reduce task会读取相同分区的map输出;reduce task对接收到的所有map输出,进行排序分组<hello,{1,1}><me,{1}><you,{1}>
2.执行reduce 操作,对一个分组中的value进行累加<hello,2><me,1><you,1>
3.每个分区输出到一个HDFS文件中
1)首先客户端提交运用程序给yarn的RM
2)RM接受到应用程序后,为其在一个NM node中分配第一个container,并于对应的NM通信,令其在container中创建该应用的AM。
3)AM创建后,马上在RM中进行注册,注册后使得RM能够监控AM的运行情况。AM开始运行应用程序,为其对应的各个task(可以理解成应用程序拆解成各个task运行)向RM申请资源container,其中map task的优先级高于reduce task。
4)申请到资源后,AM与NM通信。
5)AM让NM在container中启动对应的task,并且监控task的运行情况。
以上便是MapReduce在yarn中运行过程,当程序运行结束后,AM向RM注销自己,停止应用,同时container中task和AM都清空。
2)RM接受到应用程序后,为其在一个NM node中分配第一个container,并于对应的NM通信,令其在container中创建该应用的AM。
3)AM创建后,马上在RM中进行注册,注册后使得RM能够监控AM的运行情况。AM开始运行应用程序,为其对应的各个task(可以理解成应用程序拆解成各个task运行)向RM申请资源container,其中map task的优先级高于reduce task。
4)申请到资源后,AM与NM通信。
5)AM让NM在container中启动对应的task,并且监控task的运行情况。
以上便是MapReduce在yarn中运行过程,当程序运行结束后,AM向RM注销自己,停止应用,同时container中task和AM都清空。
map
数据压缩
mapred.compress.map.out=true
reduce
shuffle
分区
将数据标记好分区(对key进行hash,值对reducetask的个数取余,余几就放到哪个分区)->
发送到环形缓冲区中(默认100MB,达到80%会溢写到磁盘中,溢写之前会对数据进行排序)->
溢写的文件达到十个,会对文件进行Merge(合并,归并排序)
发送到环形缓冲区中(默认100MB,达到80%会溢写到磁盘中,溢写之前会对数据进行排序)->
溢写的文件达到十个,会对文件进行Merge(合并,归并排序)
排序
字典排序
快排
combiner
局部聚合,不能对最终结果产生影响
集成Reduce,重写reduce方法
减少Map->reduce数据量
集成Reduce,重写reduce方法
减少Map->reduce数据量
分组
等到所有的MapTask运行完毕,启动一定数量的ReduceTask,告知reduceTask读取数据的范围(分区)
每个reduce会拉去Map端的数据,先存储到内存中,再存入磁盘
拉取完所有数据后,采用归并排序,将内存和磁盘中的数据都进行排序,
在进入reduce之前,会对数据进行分组操作,最终将数据以组为单位发送到reduce方法中去
每个reduce会拉去Map端的数据,先存储到内存中,再存入磁盘
拉取完所有数据后,采用归并排序,将内存和磁盘中的数据都进行排序,
在进入reduce之前,会对数据进行分组操作,最终将数据以组为单位发送到reduce方法中去
调优
小文件
小文件合并归档
读取小文件的时候实现CombineFileInputFormat,会合并小文件
数据倾斜
推测执行
JVM重用
Map优化
调大环形缓冲区大小
对Map输出的数据进行压缩(消耗CPU)
Reduce
集群调优核心思路
HBase
分布式、可扩展、支持海量数据存储的NoSQL数据库
分布式、可扩展、支持海量数据存储的NoSQL数据库
逻辑结构
列族
列
Rowkey
store
最终保存到storeFile中
Region
横向切分
物理结构
底层物理存储结构K-V
底层物理存储结构K-V
版本控制,根据TimeStamp,返回最新的值
数据模型
NameSpace:类似于数据库名
hbase
存放HBase内置的表
default
用户默认的命名空间
Region
类似于数据库表的概念
Row
每行数据由一个RowKey和多个Column组成,数据是按照RowKey的字典顺序存储的
Column
每个列由Column Family(列族)和Column Qualifier(列限定符)进行限定,建表时只需要指明列族,列限定符无需预先定义
Time Stamp
用于标识数据的不同版本
Cell
由{rowkey,column family:column qualifier,time stamp}唯一确定的单元,cell中的数据没有类型,全部是字节码形式存储
基础架构
Master
作用
Table:create,delete,alter
RegionSever:分配regions到每个RegionSever,监控每个RegionSever的状态
Table:create,delete,alter
RegionSever:分配regions到每个RegionSever,监控每个RegionSever的状态
Zookeeper
存储RegionSever的路由信息,通过Zookeeper来做master的高可用、RegionSever的监控、元数据的入口及集群配置的维护等工作
Region Sever
作用
Data:get,put,delete
Region:splitRegion,compactRegion
Data:get,put,delete
Region:splitRegion,compactRegion
Zookeeper分布式消息发布订阅服务框架
是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于 Zookeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于 Zookeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
Zookeeper
分布式服务框架
分布式服务框架
拥有文件系统的数据库
解决了数据一致性问的的分布式数据库
具有发布和订阅功能的分布式数据库
架构
leader
flower
Hive:
大数据SQL批处理引擎,用于处理SQL类批处理作业,但是处理速度较慢。
大数据SQL批处理引擎,用于处理SQL类批处理作业,但是处理速度较慢。
架构
元数据
Client
driver
jdbc
解析器
解析器
优化器
执行器
MR/Tez/Spark
HDFS
内部表、外部表
创建
内部表会将数据移动到指定的目录
删除
内部表的元数据和原始数据都会被删除,外部表只会删除元数据
external关键字修饰外部表
Hive自定义UDF
1、继承“org.apache.hadoop.hive.ql.exec.UDF"。
2、实现一个evaluate()方法,编写要实现的逻辑。
3、打包并上传到HDFS里。
4、Hive创建临时函数。
5、调用该函数。
2、实现一个evaluate()方法,编写要实现的逻辑。
3、打包并上传到HDFS里。
4、Hive创建临时函数。
5、调用该函数。
Hive调优
1、数据倾斜
容易造成数据倾斜的原因:
group by
distinct count
join
容易造成数据倾斜的原因:
group by
distinct count
join
2、参数调优
set hive.map.aggr=true;
在map中会做部分聚集操作,效率更高但需要更多的内存。
set hive.groupby.skewindata=true;
此时生成的查询计划会有两个MRJob,可实现数据倾斜时负载均衡。
set hive.map.aggr=true;
在map中会做部分聚集操作,效率更高但需要更多的内存。
set hive.groupby.skewindata=true;
此时生成的查询计划会有两个MRJob,可实现数据倾斜时负载均衡。
3、map side join
set hive.auto.convert.join=true;
当连接一个较小和较大表的时候,把较小的表直接放到内存中去,然后再对较大的表进行map操作。
set hive.auto.convert.join=true;
当连接一个较小和较大表的时候,把较小的表直接放到内存中去,然后再对较大的表进行map操作。
4、并行化执行
每个查询会被Hive转化为多个阶段,当有些阶段关联性不大时,可以并行化执行,减少整个任务的执行时间。
开启任务并行执行:
set hive.exec.parallel=true;
设置同一个sql允许并行任务的最大线程数(例如设置为8个):
set hive.exec.parallel.thread.number=8;
每个查询会被Hive转化为多个阶段,当有些阶段关联性不大时,可以并行化执行,减少整个任务的执行时间。
开启任务并行执行:
set hive.exec.parallel=true;
设置同一个sql允许并行任务的最大线程数(例如设置为8个):
set hive.exec.parallel.thread.number=8;
Hive数据仓库
数据仓库分层
ODS层:原始数据层。
DWD层:结构和粒度与原始表保持一致,简单清洗。
DWS层:以DWD为基础,进行轻度汇总。
ADS层:为各种统计报表提供数据。
DWD层:结构和粒度与原始表保持一致,简单清洗。
DWS层:以DWD为基础,进行轻度汇总。
ADS层:为各种统计报表提供数据。
分层优点
复杂问题简单化
将任务分解成多个步骤完成,每一层只处理单一的步骤,比较简单,并且方便定位问题。
减少重复开发
规范数据分层,通过中间层数据,减少最大的重复计算,增加一次计算结果的复用性。
隔离原始数据
避免数据异常或者数据敏感,使真实数据与统计数据解耦。
将任务分解成多个步骤完成,每一层只处理单一的步骤,比较简单,并且方便定位问题。
减少重复开发
规范数据分层,通过中间层数据,减少最大的重复计算,增加一次计算结果的复用性。
隔离原始数据
避免数据异常或者数据敏感,使真实数据与统计数据解耦。
Spark:
基于内存的数据处理引擎,适合海量数据,处理速度高效。
基于内存的数据处理引擎,适合海量数据,处理速度高效。
应用场景
数据处理(DataProcessing):可以用来快速处理数据,兼具容错性和可扩展性。
迭代计算(IterativeComputation):支持迭代计算,有效应对复杂的数据处理逻辑。
数据挖掘(DataMining):在海量数据基础上进行复杂的挖掘分析,可支持多种数据挖掘和机器学习算法。
流式处理(StreamingProcessing):支持秒级延迟的流处理,可支持多种外部数据源。
查询分析(QueryAnalysis):支持SQL的查询分析,同时提供领域特定语言(DSL)以方便操作结构化数据,并支持多种外部数据源。
迭代计算(IterativeComputation):支持迭代计算,有效应对复杂的数据处理逻辑。
数据挖掘(DataMining):在海量数据基础上进行复杂的挖掘分析,可支持多种数据挖掘和机器学习算法。
流式处理(StreamingProcessing):支持秒级延迟的流处理,可支持多种外部数据源。
查询分析(QueryAnalysis):支持SQL的查询分析,同时提供领域特定语言(DSL)以方便操作结构化数据,并支持多种外部数据源。
组件
(1)Application:表示你的应用程序
(2)Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext
(3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。
(4)Worker:集群中可以运行Application代码的节点。在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。
(5)Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage
(6)Job:包含多个Task组成的并行计算,是由Action行为触发的
(7)Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage
(8)DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系
(9)TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。
(2)Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext
(3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。
(4)Worker:集群中可以运行Application代码的节点。在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。
(5)Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage
(6)Job:包含多个Task组成的并行计算,是由Action行为触发的
(7)Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage
(8)DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系
(9)TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。
spark任务运行过程
概念
RDD
Resilient Distributed Dataset 分布式数据集
Shuffle
Shuffle是划分DAG中stage的标识,同时影响Spark执行速度的关键步骤
RDD 的Transformation 函数中,分为窄依赖(narrow dependency)和宽依赖(widedependency)的操作.
窄依赖跟宽依赖的区别是是否发生Shuffle(洗牌) 操作。
RDD 的Transformation 函数中,分为窄依赖(narrow dependency)和宽依赖(widedependency)的操作.
窄依赖跟宽依赖的区别是是否发生Shuffle(洗牌) 操作。
宽依赖/窄依赖
窄依赖是指父RDD的每个分区只被子RDD的一个分区使用。
宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用。
Stage
Transformation
Transformation是RDD的算子类型,它的返回值还是一个RDD。
lazy特性
lazy特性
1、map (func)
对调用map的RDD数据集中的每个element都使用func,然后返回一个新的分布式数据集RDD。
对调用map的RDD数据集中的每个element都使用func,然后返回一个新的分布式数据集RDD。
2、filter(func)
对调用filter的RDD数据集中的每个element都使用func,然后返回一个包含使func为true的元素构成的RDD。
对调用filter的RDD数据集中的每个element都使用func,然后返回一个包含使func为true的元素构成的RDD。
3、flatMap(func)
和map算子差不多,但是flatMap生成的是多个结果。
和map算子差不多,但是flatMap生成的是多个结果。
4、mapPartitions(func)
和map算子差不多,但是map针对的是每个element,而mapPartitions针对的是每个partition(block)。例如,有一个数据集,有100个partiton,每个partition有10000条数据。我们要将这100w的数据保存到数据库中。如果我们用map实现,则需要链接数据库100w次,而如果用mapPartitions,则只需要链接100次。大大减少了数据库的压力。但是使用mapPartitions要考虑到内存的问题,因为它要把一个partition的数据处理完后才会释放资源,如果内存不足可能会报OOM。
和map算子差不多,但是map针对的是每个element,而mapPartitions针对的是每个partition(block)。例如,有一个数据集,有100个partiton,每个partition有10000条数据。我们要将这100w的数据保存到数据库中。如果我们用map实现,则需要链接数据库100w次,而如果用mapPartitions,则只需要链接100次。大大减少了数据库的压力。但是使用mapPartitions要考虑到内存的问题,因为它要把一个partition的数据处理完后才会释放资源,如果内存不足可能会报OOM。
11、intersection(otherDataset)
返回两个dataset的一个交集dataset。
返回两个dataset的一个交集dataset。
12、sortByKey([ascending], [numPartitions])
按照Key进行排序,ascending的值默认为True,True/False表示升序/降序 。
按照Key进行排序,ascending的值默认为True,True/False表示升序/降序 。
13、join(otherDataset, [numPartitions])
类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W))。
类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W))。
14、cogroup(otherDataset, [numPartitions])
作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操作可叫做groupWith。
作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操作可叫做groupWith。
15、cartesian(otherDataset)
笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合
笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合
16、coalesce(numPartitions)/ˌkoʊəˈles/
将RDD的分区数减小到numPartitions个。当数据集通过过滤规模减小时,使用这个操作可以提升性能。一般作用在filter之后例如一个RDD有100个partition,意味着有100个task。但是经过一系列的filter过滤之后,可能每个partition里面包含的数据量已经很少了,这时再启动100个task会产生很多的小文件。这个时候就可以使用coalesce减少partition的数量。
将RDD的分区数减小到numPartitions个。当数据集通过过滤规模减小时,使用这个操作可以提升性能。一般作用在filter之后例如一个RDD有100个partition,意味着有100个task。但是经过一系列的filter过滤之后,可能每个partition里面包含的数据量已经很少了,这时再启动100个task会产生很多的小文件。这个时候就可以使用coalesce减少partition的数量。
17、repartition(numPartitions)
重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区。这一操作会将整个数据集在网络中重新洗牌。
重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区。这一操作会将整个数据集在网络中重新洗牌。
注意(16,17):repartition算子底层调用的是coalesce,coalesce有个默认参数shuffle: Boolean = false,默认是不走shuffle操作的,repartition算子调用coalesce的时候–coalesce(numPartitions, shuffle = true)传入的是true,走shuffle,可以增大partition的数量。
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
Action
Action是RDD的算子,它的返回值不是一个RDD。Action操作是返回结果或者将结果写入存储的操作。Action是Spark应用启动执行的触发动作,得到RDD的相关计算结果或将RDD保存到文件系统中。
1、reduce(func)
使用函数func(两个输入参数,返回一个值)对数据集中的元素做聚集操作
使用函数func(两个输入参数,返回一个值)对数据集中的元素做聚集操作
2、collect()
在driver程序中以数组形式返回数据集中所有的元素。这以action通常在执行过filter或者其他操作后返回一个较小的子数据集时非常有用。
在driver程序中以数组形式返回数据集中所有的元素。这以action通常在执行过filter或者其他操作后返回一个较小的子数据集时非常有用。
3、count()
返回数据集中元素的个数
返回数据集中元素的个数
4、first()
返回数据集中的第一个元素,底层调用的是take(1)
返回数据集中的第一个元素,底层调用的是take(1)
5、take(n)
以数组形式返回数据集中前n个元素。需要注意的是,这一action并不是在多个node上并行执行,而是在driver程序所在的机器上单机执行,会增大内存的压力,使用需谨慎。
以数组形式返回数据集中前n个元素。需要注意的是,这一action并不是在多个node上并行执行,而是在driver程序所在的机器上单机执行,会增大内存的压力,使用需谨慎。
6、takeSample(withReplacement, num, [seed])
以数组形式返回从数据集中抽取的样本数量为num的随机样本,有替换或者无替换的进行采样。可选参数[seed]可以允许用户自己预定义随机数生成器的种子。注意:该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver的内存中。
以数组形式返回从数据集中抽取的样本数量为num的随机样本,有替换或者无替换的进行采样。可选参数[seed]可以允许用户自己预定义随机数生成器的种子。注意:该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver的内存中。
7、takeOrdered(n, [ordering])
返回RDD的前n个元素,可以利用自然顺序或者由用户执行排序的comparator。
返回RDD的前n个元素,可以利用自然顺序或者由用户执行排序的comparator。
8、saveAsTextFile(path)
将数据集中的元素以文本文件(或者文本文件的一个集合)的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。Spark会调用每个元素的toString方法,将其转换为文本文件中的一行。
将数据集中的元素以文本文件(或者文本文件的一个集合)的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。Spark会调用每个元素的toString方法,将其转换为文本文件中的一行。
9、saveAsSequenceFile(path)
将数据集中的元素以Hadoop SequenceFile的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。RDD的元素必须由实现了Hadoop的Writable接口的key-value键值对组成。在Scala中,也可以是隐式可以转换为Writable的键值对(Spark包括了基本类型的转换,例如Int,Double,String等等)
将数据集中的元素以Hadoop SequenceFile的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。RDD的元素必须由实现了Hadoop的Writable接口的key-value键值对组成。在Scala中,也可以是隐式可以转换为Writable的键值对(Spark包括了基本类型的转换,例如Int,Double,String等等)
10、saveAsObjectFile(path)
利用Java序列化,将数据集中的元素以一种简单的形式进行写操作,并能够利用SparkContext.objectFile()加载数据。(适用于Java和Scala)
利用Java序列化,将数据集中的元素以一种简单的形式进行写操作,并能够利用SparkContext.objectFile()加载数据。(适用于Java和Scala)
11、countByKey()
只能作用于键值对(K, V)形式的RDDs上。按照Key进行计数,返回键值对(K, int)的哈希表。
只能作用于键值对(K, V)形式的RDDs上。按照Key进行计数,返回键值对(K, int)的哈希表。
12、foreach(func)
在数据集的每个元素上调用函数func。这一操作通常是为了实现一些副作用,比如更新累加器或者与外部存储系统进行交互。注意:在foreach()之外修改除了累加器以外的变量可能造成一些未定义的行为。更多内容请参阅闭包进行理解。
在数据集的每个元素上调用函数func。这一操作通常是为了实现一些副作用,比如更新累加器或者与外部存储系统进行交互。注意:在foreach()之外修改除了累加器以外的变量可能造成一些未定义的行为。更多内容请参阅闭包进行理解。
SparkConf
SparkContext
SparkContext是Spark的入口,相当于应用程序的main函数。
SparkContext表示与Spark集群的连接,可用于在该集群上创建RDD,记录计算结果和环境配置等信息。
SparkContext表示与Spark集群的连接,可用于在该集群上创建RDD,记录计算结果和环境配置等信息。
SparkSession
Spark2.0中引入了SparkSession的概念,为用户提供了一个统一的切入点来使用Spark的各项功能。
封装了SparkConf和SparkContext对象,方便用户使用Spark的各种API。
封装了SparkConf和SparkContext对象,方便用户使用Spark的各种API。
SparkSQL
SparkSQL是Spark用来处理结构化数据的一个模块,可以在Spark应用中直接使用SQL语句对数据进行操作。
SQL语句通过SparkSQL模块解析为RDD执行计划,交给SparkCore执行。
SQL语句通过SparkSQL模块解析为RDD执行计划,交给SparkCore执行。
使用方式
SparkSession提交SQL语句
JDBC
SparkSQL关键概念DataSet
DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象
DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式存储,不需要反序列化就可以执行sort、filter、shuffle等操作。
Dataset是“懒惰”的,只在执行action操作时触发计算。当执行action操作时,Spark用查询优化程序来优化逻辑计划,并生成一个高效的并行分布式的物理计
DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式存储,不需要反序列化就可以执行sort、filter、shuffle等操作。
Dataset是“懒惰”的,只在执行action操作时触发计算。当执行action操作时,Spark用查询优化程序来优化逻辑计划,并生成一个高效的并行分布式的物理计
使用场景
适合:
结构化数据处理。
对数据处理的实时性要求不高的场景
需要处理PB级的大容量数据。
不适合:
实时数据查询。
结构化数据处理。
对数据处理的实时性要求不高的场景
需要处理PB级的大容量数据。
不适合:
实时数据查询。
调优
资源参数调优
num-executors:设置Spark作业总共要用多少个Executor进程来执行
executor-memory:设置每个Executor进程的内存
executor-cores:设置每个Executor进程的CPU core数量
driver-memory:设置Driver进程的内存
spark.default.parallelism:设置每个stage的默认task数量
num-executors:设置Spark作业总共要用多少个Executor进程来执行
executor-memory:设置每个Executor进程的内存
executor-cores:设置每个Executor进程的CPU core数量
driver-memory:设置Driver进程的内存
spark.default.parallelism:设置每个stage的默认task数量
开发调优
避免创建重复的RDD
尽可能复用同一个RDD
对多次使用的RDD进行持久化
尽量避免使用shuffle类算子
使用map-side预聚合的shuffle操作
使用高性能的算子
避免创建重复的RDD
尽可能复用同一个RDD
对多次使用的RDD进行持久化
尽量避免使用shuffle类算子
使用map-side预聚合的shuffle操作
使用高性能的算子
其他
防止内存溢出的方法
driver端的内存溢出
可以增大driver的内存参数:spark.driver.memory (default 1g)
这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
map过程产生大量对象导致内存溢出
在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区, 不会有shuffle的过程。
数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。
shuffle后内存溢出
shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。
standalone模式下资源分配不均匀导致内存溢出
配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。
spark中cache和persist的区别:
cache:缓存数据,默认是缓存在内存中,其本质还是调用persist
persist:缓存数据,有丰富的数据缓存策略。数据可以保存在内存也可以保存在磁盘中,使用的时候指定对应的缓存级别就可以了。
常用采集工具
Sqoop
主要用于在Hadoop(Hive)与传统的数据库(MySQL、PostgreSQL...)间 进 行 数 据 的 传 递,可以将一个关系型数据库(例 如:MySQL,Oracle,PostgreSQL等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
原理
SqoopImport原理:(RDBMS->Hadoop)
Sqoop在import时,需要指定split-by参数。Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中。
每个map中再处理数据库中获取的一行一行的值,写入到HDFS中。
同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。
Sqoop在import时,需要指定split-by参数。Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中。
每个map中再处理数据库中获取的一行一行的值,写入到HDFS中。
同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。
Sqoop export原理:(Hadoop->RDBMS)
获取导出表的schema、meta信息,和Hadoop中的字段match;
并行导入数据:
将Hadoop 上文件划分成若干个分片,每个分片由一个Map Task进行数据导入。
获取导出表的schema、meta信息,和Hadoop中的字段match;
并行导入数据:
将Hadoop 上文件划分成若干个分片,每个分片由一个Map Task进行数据导入。
Loader
Loader是实现FusionInsight HD与关系型数据库、文件系统之间交换数据和文件的数据加载工具。
提供可视化向导式的作业配置管理界面;
提供定时调度任务,周期性执行Loader作业;
在界面中可指定多种不同的数据源、配置数据的清洗和转换步骤、配置集群存储系统等。
基于开源Sqoop研发,做了大量优化和扩展。
提供可视化向导式的作业配置管理界面;
提供定时调度任务,周期性执行Loader作业;
在界面中可指定多种不同的数据源、配置数据的清洗和转换步骤、配置集群存储系统等。
基于开源Sqoop研发,做了大量优化和扩展。
特点
图形化
提供图像化配置、监控界面,操作简便。
提供图像化配置、监控界面,操作简便。
高性能
利用MapReduce并行处理数据
利用MapReduce并行处理数据
高可靠
Loader Sever采用主备双机。
作业通过MapReduce执行,支持失败重试。
作业失败后,不会残留数据。
Loader Sever采用主备双机。
作业通过MapReduce执行,支持失败重试。
作业失败后,不会残留数据。
安全
Kerberos认证作业权限管理。
Kerberos认证作业权限管理。
Doris
数据集市和数据仓库
数据集市
数据集市(Data Mart) ,也叫数据市场,数据集市就是满足特定的部门或者用户的需求,按照多维的方式进行存储,
包括定义维度、需要计算的指标、维度的层次等,生成面向决策分析需求的数据立方体。
数据集市(Data Mart) ,也叫数据市场,数据集市就是满足特定的部门或者用户的需求,按照多维的方式进行存储,
包括定义维度、需要计算的指标、维度的层次等,生成面向决策分析需求的数据立方体。
数据仓库
为满足各类零散分析的需求,通过数据分层和数据模型的方式,并以基于业务和应用的角度将数据进行模块化的存储。
为满足各类零散分析的需求,通过数据分层和数据模型的方式,并以基于业务和应用的角度将数据进行模块化的存储。
2.实时
Flume
Kafka:
分布式、分区、多副本、多订阅者的日志系统
分布式、分区、多副本、多订阅者的日志系统
角色
Producer
Consumer
如何实现高吞吐
顺序读写
零拷贝
文件分段
批量发送
数据压缩
零拷贝
文件分段
批量发送
数据压缩
Flink
主要特点
事件驱动(Event-driven)
基于流(有界无界)
分层API
SQL/Table API
DataStream API
ProcessFunction
部署方式
Standalone
源码
任务提交流程
yarn-per-job(三大进程)
CliFrontend
参数解析
封装CommandLine:三个,依次添加
配置的封装
执行用户代码: execute()
生成StreamGraph
Executor:生成JobGraph
集群描述器:上传jar包、配置, 封装提交给yarn的命令
yarnclient提交应用
封装CommandLine:三个,依次添加
配置的封装
执行用户代码: execute()
生成StreamGraph
Executor:生成JobGraph
集群描述器:上传jar包、配置, 封装提交给yarn的命令
yarnclient提交应用
YarnJobClusterEntryPonit
1、Dispatcher的创建和启动
2、ResourceManager的创建、启动:里面有一个 slotmanager(真正管理资源的、向yarn申请资源)
3、Dispatcher启动JobMaster:生成ExecutionGraph(里面有一个slotpool,真正去发送请求的)
4、slotpool向slotmanager申请资源, slotmanager向yarn申请资源(启动新节点)
2、ResourceManager的创建、启动:里面有一个 slotmanager(真正管理资源的、向yarn申请资源)
3、Dispatcher启动JobMaster:生成ExecutionGraph(里面有一个slotpool,真正去发送请求的)
4、slotpool向slotmanager申请资源, slotmanager向yarn申请资源(启动新节点)
YarnTaskExecutorRunner
1、启动 TaskExecutor
2、向ResourceManager注册slot
3、ResourceManager分配slot
4、TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
5、JobMaster提交任务给TaskExecutor去执行
2、向ResourceManager注册slot
3、ResourceManager分配slot
4、TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
5、JobMaster提交任务给TaskExecutor去执行
子主题
yarn-per-job提交命令新老版本
老版本(<=1.10)
flink run -m yarn-cluster -c xxx xxx.jar
新版本(>=1.11)
flink run -t yarn-per-job -c xxx xxx.jar
组件通信
任务调度
内存管理
Structured Streaming
Spark Streaming
Storm
分布式实时大数据处理框架
核心组件
Nimbus
Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。
Supervisor
Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。
Zookeeper
用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。
抽象静态组件
Topology
Storm对一个分布式计算应用程序的抽象,目的是通过一个实现Topology能够完整地完成一件事情(从业务角度来看)。一个Topology是由一组静态程序组件(Spout/Bolt)、组件关系Streaming Groups这两部分组成。
Spout
描述了数据是如何从外部系统(或者组件内部直接产生)进入到Storm集群,并由该Spout所属的Topology来处理,通常是从一个数据源读取数据,也可以做一些简单的处理(为了不影响数据连续地、实时地、快速地进入到系统,通常不建议把复杂处理逻辑放在这里去做)。
Bolt
描述了与业务相关的处理逻辑。
抽象动态组件
Task
Spout/Bolt在运行时所表现出来的实体,都称为Task,一个Spout/Bolt在运行时可能对应一个或多个Spout Task/Bolt Task,与实际在编写Topology时进行配置有关
Worker
运行时Task所在的一级容器,Executor运行于Worker中,一个Worker对应于- - Supervisor上创建的一个JVM实例
Executor
运行时Task所在的直接容器,在Executor中执行Task的处理逻辑;一个或多个Executor实例可以运行在同一个Worker进程中,一个或多个Task可以运行于同一个Executor中;在Worker进程并行的基础上,Executor可以并行,进而Task也能够基于Executor实现并行计算。
相关概念
Stream Grouping
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
Reliability
消息可靠性保障机制
每处理完一个tuple,调用OutputColletor的ack方法,通知acker消息是否投递成功,若没有成功,需要重新投递。
Redis
ElasticSearch
任务调度
airflow
设计理念
动态
python编写
可扩展
轻松定义自己的运算符(operators),执行器(executors)和扩展库
优雅
管道简洁明了,使用功能强大的Jinja模板引擎,将脚本参数化内置到Airflow的核心中
可伸缩
Airflow具有模块化架构,使用消息队列来协调任务数量的Worker。Airflow可扩展到无穷大。
3.数据治理
Apache Atlas
概述
架构
0 条评论
下一页