大数据知识图谱
2021-11-18 09:27:51 2 举报
AI智能生成
大数据常用组件知识点
作者其他创作
大纲/内容
1.离线
核心诉求
处理时间要求不高;
处理数据量巨大;
处理数据格式多样;
支持SQL类作业和自定义作业;
数据处理大致可以分成两大类
OLAP
联机分析处理
OLTP
联机事务处理
常用组件
Hadoop<br>分布式的计算存储框架<br>
1/2/3区别
1->2
1、联邦机制,两个nameNode,防止出现单点故障
2、yarn
2->3
java的运行环境最低1.8
HDFS支持纠删码,节省了空间,只需要1.4倍的空间就能保证数据的完整性
Yarn的时间线服务
支持对于两个以上的namenode(一个active,多个standby状态)
MapReduce本地优化,性能提升30%,但是不稳定<br>
三个主要部分
HDFS
Yarn
MapReduce
HDFS:<br>分布式文件系统,为各种批处理引擎提供数据存储,可以存储各种文件格式数据。
主从架构
Master
namenode
元数据管理<br>一条元数据大概150个字节<br>
fsimage
元数据的镜像文件
目录(创建时间,访问权限)和文件(创建时间)以及相关属性的序列化信息
editslog<br>
编辑日志文件<br>
所有的更新操作,写操作<br>
通过配置文件设置fsimage和editslog,设置副本
Slave
datanode
存储策略:机架感知
架构稳定性<br>
心跳机制和重新复制
数据的完整性checkSum
元数据的磁盘故障<br>
HDFS支持快照
HDFS文件读写原理
Write写
<br>
read读
<br>
YARN:<br>资源调度引擎,为各种批处理引擎提供资源调度能力。
架构
ResourceManager
(RM)作为集群的全局资源管理器,一个集群一个,负责整个集群的资源管理和分配,监控ApplicationMaster的运行情况以及NodeManager的资源情况,想当于Master节点。<br>RM主要对资源进行管理以及分配,不负责task,但是会监控AM的运行情况,如果AM运行失败,会重新分配container,重启AM。
NodeManager
(NM)负责集群中每个节点的资源以及使用,运行任务,相当于Slave节点。<br>NM负责该节点容器的资源使用情况,不监控task。
ApplicationMaster
(AM)对集群中的某个应用程序进行管理,向RM(ResourceManager)申请资源以及分配监控任务。<br>AM主要申请资源,然后分配任务给相应的NM,同时监控task的运行情况,如果任务失败,那么重新申请资源,在新的container中运行task
container
资源单位,包括cpu,内存等,不同于mrv1中slot,它是动态资源,而原来是事先程序员定好的,比如某个节点的map slot和reduce slot都是固定的,影响了集群的使用。
RPC
Remote Procedure Call,yarn中各个组件间的通信采用的是rpc机制
application
我们提交给yarn需要运行的应用程序
task
为了完成application,需要完成多个任务,任务包括map task和reduce task
executor
MapReduce:<br>分布式的计算框架;大数据批处理引擎,用于处理海量数据,但是处理速度较慢。
原语:相同的Key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算<br>
并行度<br>
集群规模决定Reduce的并行度
切块大小(块)决定Map阶段的并行度<br>
流程
第一阶段map<br>1.map task读取HDFS文件。每个block,启动一个map task。每个map task按照行读取一个block中的内容,对每一行执行map函数<br>2.map函数对输入的数据进行拆分split,得到一个数组,组成一个键值对<word, 1><br>3.分区,每个分区对应1个reduce task<br>4.对每个分区中的数据,按照key进行分组并排序<br>5.在map段执行小reduce,输出<key,times><br><br>第二阶段reduce<br>1.每个分区对应一个reduce task,这个reduce task会读取相同分区的map输出;reduce task对接收到的所有map输出,进行排序分组<hello,{1,1}><me,{1}><you,{1}><br>2.执行reduce 操作,对一个分组中的value进行累加<hello,2><me,1><you,1><br>3.每个分区输出到一个HDFS文件中<br>
<br>
1)首先客户端提交运用程序给yarn的RM<br><br>2)RM接受到应用程序后,为其在一个NM node中分配第一个container,并于对应的NM通信,令其在container中创建该应用的AM。<br><br>3)AM创建后,马上在RM中进行注册,注册后使得RM能够监控AM的运行情况。AM开始运行应用程序,为其对应的各个task(可以理解成应用程序拆解成各个task运行)向RM申请资源container,其中map task的优先级高于reduce task。<br><br>4)申请到资源后,AM与NM通信。<br><br>5)AM让NM在container中启动对应的task,并且监控task的运行情况。<br><br>以上便是MapReduce在yarn中运行过程,当程序运行结束后,AM向RM注销自己,停止应用,同时container中task和AM都清空。
map<br>
数据压缩<br>
mapred.compress.map.out=true
reduce
shuffle
分区<br>
将数据标记好分区(对key进行hash,值对reducetask的个数取余,余几就放到哪个分区)-><br>发送到环形缓冲区中(默认100MB,达到80%会溢写到磁盘中,溢写之前会对数据进行排序)-><br>溢写的文件达到十个,会对文件进行Merge(合并,归并排序)<br>
排序
字典排序
快排
combiner
局部聚合,不能对最终结果产生影响<br>集成Reduce,重写reduce方法<br>减少Map->reduce数据量<br>
分组
等到所有的MapTask运行完毕,启动一定数量的ReduceTask,告知reduceTask读取数据的范围(分区)<br>每个reduce会拉去Map端的数据,先存储到内存中,再存入磁盘<br>拉取完所有数据后,采用归并排序,将内存和磁盘中的数据都进行排序,<br>在进入reduce之前,会对数据进行分组操作,最终将数据以组为单位发送到reduce方法中去<br>
调优
小文件
小文件合并归档
读取小文件的时候实现CombineFileInputFormat,会合并小文件
数据倾斜
推测执行
JVM重用
Map优化
调大环形缓冲区大小
对Map输出的数据进行压缩(消耗CPU)
Reduce <br>
集群调优核心思路<br>
HBase<br>分布式、可扩展、支持海量数据存储的NoSQL数据库<br>
逻辑结构
列族<br>
列
Rowkey
store
最终保存到storeFile中<br>
Region
横向切分<br>
物理结构<br>底层物理存储结构K-V<br>
版本控制,根据TimeStamp,返回最新的值
数据模型
NameSpace:类似于数据库名
hbase<br>
存放HBase内置的表<br>
default
用户默认的命名空间
Region
类似于数据库表的概念<br>
Row
每行数据由一个RowKey和多个Column组成,数据是按照RowKey的字典顺序存储的
Column
每个列由Column Family(列族)和Column Qualifier(列限定符)进行限定,建表时只需要指明列族,列限定符无需预先定义<br>
Time Stamp<br>
用于标识数据的不同版本<br>
Cell
由{rowkey,column family:column qualifier,time stamp}唯一确定的单元,cell中的数据没有类型,全部是字节码形式存储
基础架构
Master
作用<br>Table:create,delete,alter<br>RegionSever:分配regions到每个RegionSever,监控每个RegionSever的状态<br>
Zookeeper
存储RegionSever的路由信息,通过Zookeeper来做master的高可用、RegionSever的监控、元数据的入口及集群配置的维护等工作<br>
Region Sever<br>
作用<br>Data:get,put,delete<br>Region:splitRegion,compactRegion<br>
Zookeeper分布式消息发布订阅服务框架<br>是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于 Zookeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
Zookeeper<br>分布式服务框架
拥有文件系统的数据库
解决了数据一致性问的的分布式数据库
具有发布和订阅功能的分布式数据库
架构
leader
flower
Hive:<br>大数据SQL批处理引擎,用于处理SQL类批处理作业,但是处理速度较慢。
架构
元数据
Client
driver
jdbc<br>
解析器
解析器
优化器
执行器
MR/Tez/Spark
HDFS
内部表、外部表<br>
创建
内部表会将数据移动到指定的目录
删除
内部表的元数据和原始数据都会被删除,外部表只会删除元数据<br>
external关键字修饰外部表
Hive自定义UDF<br>
1、继承“org.apache.hadoop.hive.ql.exec.UDF"。<br>2、实现一个evaluate()方法,编写要实现的逻辑。<br>3、打包并上传到HDFS里。<br>4、Hive创建临时函数。<br>5、调用该函数。<br>
Hive调优
1、数据倾斜<br>容易造成数据倾斜的原因:<br>group by<br>distinct count<br>join<br>
2、参数调优<br>set hive.map.aggr=true;<br>在map中会做部分聚集操作,效率更高但需要更多的内存。<br>set hive.groupby.skewindata=true;<br>此时生成的查询计划会有两个MRJob,可实现数据倾斜时负载均衡。
3、map side join<br>set hive.auto.convert.join=true;<br>当连接一个较小和较大表的时候,把较小的表直接放到内存中去,然后再对较大的表进行map操作。
4、并行化执行<br>每个查询会被Hive转化为多个阶段,当有些阶段关联性不大时,可以并行化执行,减少整个任务的执行时间。<br>开启任务并行执行:<br>set hive.exec.parallel=true;<br>设置同一个sql允许并行任务的最大线程数(例如设置为8个):<br>set hive.exec.parallel.thread.number=8;
Hive数据仓库<br>
数据仓库分层
ODS层:原始数据层。<br>DWD层:结构和粒度与原始表保持一致,简单清洗。<br>DWS层:以DWD为基础,进行轻度汇总。<br>ADS层:为各种统计报表提供数据。
分层优点
复杂问题简单化<br>将任务分解成多个步骤完成,每一层只处理单一的步骤,比较简单,并且方便定位问题。<br>减少重复开发<br>规范数据分层,通过中间层数据,减少最大的重复计算,增加一次计算结果的复用性。<br>隔离原始数据<br>避免数据异常或者数据敏感,使真实数据与统计数据解耦。
Spark:<br>基于内存的数据处理引擎,适合海量数据,处理速度高效。
应用场景
数据处理(DataProcessing):可以用来快速处理数据,兼具容错性和可扩展性。<br>迭代计算(IterativeComputation):支持迭代计算,有效应对复杂的数据处理逻辑。<br>数据挖掘(DataMining):在海量数据基础上进行复杂的挖掘分析,可支持多种数据挖掘和机器学习算法。<br>流式处理(StreamingProcessing):支持秒级延迟的流处理,可支持多种外部数据源。<br>查询分析(QueryAnalysis):支持SQL的查询分析,同时提供领域特定语言(DSL)以方便操作结构化数据,并支持多种外部数据源。
组件<br>
(1)Application:表示你的应用程序<br><br>(2)Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext<br><br>(3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。<br><br>(4)Worker:集群中可以运行Application代码的节点。在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。<br><br>(5)Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage<br><br>(6)Job:包含多个Task组成的并行计算,是由Action行为触发的<br><br>(7)Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage<br><br>(8)DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系<br><br>(9)TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。
spark任务运行过程
概念
RDD
Resilient Distributed Dataset 分布式数据集
Shuffle
Shuffle是划分DAG中stage的标识,同时影响Spark执行速度的关键步骤<br>RDD 的Transformation 函数中,分为窄依赖(narrow dependency)和宽依赖(widedependency)的操作.<br>窄依赖跟宽依赖的区别是是否发生Shuffle(洗牌) 操作。
宽依赖/窄依赖<br>
窄依赖是指父RDD的每个分区只被子RDD的一个分区使用。
宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用。
Stage
Transformation
Transformation是RDD的算子类型,它的返回值还是一个RDD。<br>lazy特性
1、map (func)<br> 对调用map的RDD数据集中的每个element都使用func,然后返回一个新的分布式数据集RDD。
2、filter(func)<br> 对调用filter的RDD数据集中的每个element都使用func,然后返回一个包含使func为true的元素构成的RDD。
3、flatMap(func)<br> 和map算子差不多,但是flatMap生成的是多个结果。
4、mapPartitions(func)<br> <font color="#FF0000">和map算子差不多,但是map针对的是每个element,而mapPartitions针对的是每个partition(block)</font>。例如,有一个数据集,有100个partiton,每个partition有10000条数据。我们要将这100w的数据保存到数据库中。如果我们用map实现,则需要链接数据库100w次,而如果用mapPartitions,则只需要链接100次。大大减少了数据库的压力。但是使用mapPartitions要考虑到内存的问题,因为它要把一个partition的数据处理完后才会释放资源,如果内存不足可能会报OOM。
11、intersection(otherDataset)<br> 返回两个dataset的一个交集dataset。
12、sortByKey([ascending], [numPartitions])<br>按照Key进行排序,ascending的值默认为True,True/False表示升序/降序 。<br>
13、join(otherDataset, [numPartitions])<br> 类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W))。
14、cogroup(otherDataset, [numPartitions])<br>作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操作可叫做groupWith。<br>
15、cartesian(otherDataset)<br> 笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合<br>
16、coalesce(numPartitions)/ˌkoʊəˈles/<br>将RDD的分区数减小到numPartitions个。当数据集通过过滤规模减小时,使用这个操作可以提升性能。<font color="#FF0000">一般作用在filter之后例如一个RDD有100个partition,意味着有100个task。但是经过一系列的filter过滤之后,可能每个partition里面包含的数据量已经很少了,这时再启动100个task会产生很多的小文件。这个时候就可以使用coalesce减少partition的数量。</font><br>
17、repartition(numPartitions)<br>重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区。这一操作会将整个数据集在网络中重新洗牌。<br>
注意(16,17):repartition算子底层调用的是coalesce,coalesce有个默认参数shuffle: Boolean = false,默认是不走shuffle操作的,repartition算子调用coalesce的时候–coalesce(numPartitions, shuffle = true)传入的是true,走shuffle,可以增大partition的数量。
<font color="#FF0000">repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。<br>一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。</font><br>
Action
Action是RDD的算子,它的返回值不是一个RDD。Action操作是返回结果或者将结果写入存储的操作。Action是Spark应用启动执行的触发动作,得到RDD的相关计算结果或将RDD保存到文件系统中。
1、reduce(func)<br> 使用函数func(两个输入参数,返回一个值)对数据集中的元素做聚集操作
2、collect()<br>在driver程序中以数组形式返回数据集中所有的元素。这以action通常在执行过filter或者其他操作后返回一个较小的子数据集时非常有用。
3、count()<br>返回数据集中元素的个数
4、first()<br>返回数据集中的第一个元素,底层调用的是take(1)
5、take(n)<br>以数组形式返回数据集中前n个元素。需要注意的是,这一action并不是在多个node上并行执行,而是在driver程序所在的机器上单机执行,会增大内存的压力,使用需谨慎。
6、takeSample(withReplacement, num, [seed])<br> 以数组形式返回从数据集中抽取的样本数量为num的随机样本,有替换或者无替换的进行采样。可选参数[seed]可以允许用户自己预定义随机数生成器的种子。注意:该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver的内存中。
7、takeOrdered(n, [ordering])<br> 返回RDD的前n个元素,可以利用自然顺序或者由用户执行排序的comparator。
8、saveAsTextFile(path)<br> 将数据集中的元素以文本文件(或者文本文件的一个集合)的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。Spark会调用每个元素的toString方法,将其转换为文本文件中的一行。
9、saveAsSequenceFile(path)<br>将数据集中的元素以Hadoop SequenceFile的形式写入本地文件系统,或者HDFS,或者其他Hadoop支持的文件系统的指定路径path下。RDD的元素必须由实现了Hadoop的Writable接口的key-value键值对组成。在Scala中,也可以是隐式可以转换为Writable的键值对(Spark包括了基本类型的转换,例如Int,Double,String等等)
10、saveAsObjectFile(path)<br> 利用Java序列化,将数据集中的元素以一种简单的形式进行写操作,并能够利用SparkContext.objectFile()加载数据。(适用于Java和Scala)
11、countByKey()<br>只能作用于键值对(K, V)形式的RDDs上。按照Key进行计数,返回键值对(K, int)的哈希表。
12、foreach(func)<br>在数据集的每个元素上调用函数func。这一操作通常是为了实现一些副作用,比如更新累加器或者与外部存储系统进行交互。注意:在foreach()之外修改除了累加器以外的变量可能造成一些未定义的行为。更多内容请参阅闭包进行理解。
SparkConf
SparkContext<br>
SparkContext是Spark的入口,相当于应用程序的main函数。<br>SparkContext表示与Spark集群的连接,可用于在该集群上创建RDD,记录计算结果和环境配置等信息。
SparkSession
Spark2.0中引入了SparkSession的概念,为用户提供了一个统一的切入点来使用Spark的各项功能。<br>封装了SparkConf和SparkContext对象,方便用户使用Spark的各种API。
SparkSQL<br>
SparkSQL是Spark用来处理结构化数据的一个模块,可以在Spark应用中直接使用SQL语句对数据进行操作。<br>SQL语句通过SparkSQL模块解析为RDD执行计划,交给SparkCore执行。
使用方式<br>
SparkSession提交SQL语句
JDBC
SparkSQL关键概念DataSet
DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象<br>DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式存储,不需要反序列化就可以执行sort、filter、shuffle等操作。<br>Dataset是“懒惰”的,只在执行action操作时触发计算。当执行action操作时,Spark用查询优化程序来优化逻辑计划,并生成一个高效的并行分布式的物理计
使用场景
适合:<br>结构化数据处理。<br>对数据处理的实时性要求不高的场景<br>需要处理PB级的大容量数据。<br>不适合:<br>实时数据查询。
调优<br>
资源参数调优<br>num-executors:设置Spark作业总共要用多少个Executor进程来执行<br>executor-memory:设置每个Executor进程的内存<br>executor-cores:设置每个Executor进程的CPU core数量<br>driver-memory:设置Driver进程的内存<br>spark.default.parallelism:设置每个stage的默认task数量
开发调优<br>避免创建重复的RDD<br>尽可能复用同一个RDD<br>对多次使用的RDD进行持久化<br>尽量避免使用shuffle类算子<br>使用map-side预聚合的shuffle操作<br>使用高性能的算子
其他<br>
防止内存溢出的方法<br>
driver端的内存溢出
可以增大driver的内存参数:spark.driver.memory (default 1g)<br>这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
map过程产生大量对象导致内存溢出
在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区, 不会有shuffle的过程。
数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。
shuffle后内存溢出
shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。
standalone模式下资源分配不均匀导致内存溢出
配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。
spark中cache和persist的区别:
cache:缓存数据,默认是缓存在内存中,其本质还是调用persist
persist:缓存数据,有丰富的数据缓存策略。数据可以保存在内存也可以保存在磁盘中,使用的时候指定对应的缓存级别就可以了。
常用采集工具
Sqoop
主要用于在Hadoop(Hive)与传统的数据库(MySQL、PostgreSQL...)间 进 行 数 据 的 传 递,可以将一个关系型数据库(例 如:MySQL,Oracle,PostgreSQL等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
原理
SqoopImport原理:(RDBMS->Hadoop)<br>Sqoop在import时,需要指定split-by参数。Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中。<br>每个map中再处理数据库中获取的一行一行的值,写入到HDFS中。<br>同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。
Sqoop export原理:(Hadoop->RDBMS)<br>获取导出表的schema、meta信息,和Hadoop中的字段match;<br>并行导入数据:<br>将Hadoop 上文件划分成若干个分片,每个分片由一个Map Task进行数据导入。
Loader
Loader是实现FusionInsight HD与关系型数据库、文件系统之间交换数据和文件的数据加载工具。<br>提供可视化向导式的作业配置管理界面;<br>提供定时调度任务,周期性执行Loader作业;<br>在界面中可指定多种不同的数据源、配置数据的清洗和转换步骤、配置集群存储系统等。<br>基于开源Sqoop研发,做了大量优化和扩展。
特点<br>
图形化<br>提供图像化配置、监控界面,操作简便。<br>
高性能<br>利用MapReduce并行处理数据<br>
高可靠<br>Loader Sever采用主备双机。<br>作业通过MapReduce执行,支持失败重试。<br>作业失败后,不会残留数据。<br>
安全<br>Kerberos认证作业权限管理。<br>
Doris
数据集市和数据仓库
数据集市<br>数据集市(Data Mart) ,也叫数据市场,数据集市就是满足特定的部门或者用户的需求,按照多维的方式进行存储,<br>包括定义维度、需要计算的指标、维度的层次等,生成面向决策分析需求的数据立方体。
数据仓库<br>为满足各类零散分析的需求,通过数据分层和数据模型的方式,并以基于业务和应用的角度将数据进行模块化的存储。
2.实时
Flume
Kafka:<br>分布式、分区、多副本、多订阅者的日志系统<br>
角色
Producer
Consumer
如何实现高吞吐<br>
顺序读写<br><br>零拷贝<br><br>文件分段<br><br>批量发送<br><br>数据压缩
Flink
主要特点<br>
事件驱动(Event-driven)<br>
基于流(有界无界)
分层API
SQL/Table API<br>
DataStream API<br>
ProcessFunction
部署方式
Standalone
源码
任务提交流程
yarn-per-job(三大进程)
CliFrontend
参数解析<br> 封装CommandLine:三个,依次添加<br> 配置的封装<br> 执行用户代码: execute()<br> 生成StreamGraph<br> Executor:生成JobGraph<br> 集群描述器:上传jar包、配置, 封装提交给yarn的命令<br> yarnclient提交应用
YarnJobClusterEntryPonit
1、Dispatcher的创建和启动<br> 2、ResourceManager的创建、启动:里面有一个 slotmanager(真正管理资源的、向yarn申请资源)<br> 3、Dispatcher启动JobMaster:生成ExecutionGraph(里面有一个slotpool,真正去发送请求的)<br> 4、slotpool向slotmanager申请资源, slotmanager向yarn申请资源(启动新节点)
YarnTaskExecutorRunner
1、启动 TaskExecutor<br> 2、向ResourceManager注册slot<br> 3、ResourceManager分配slot<br> 4、TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)<br> 5、JobMaster提交任务给TaskExecutor去执行
子主题
yarn-per-job提交命令新老版本
老版本(<=1.10)
flink run -m yarn-cluster -c xxx xxx.jar
新版本(>=1.11)
flink run -t yarn-per-job -c xxx xxx.jar
组件通信
任务调度
内存管理
Structured Streaming<br>
Spark Streaming<br>
Storm
分布式实时大数据处理框架
核心组件
Nimbus
Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。
Supervisor
Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。
Zookeeper<br>
用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。
抽象静态组件<br>
Topology
Storm对一个分布式计算应用程序的抽象,目的是通过一个实现Topology能够完整地完成一件事情(从业务角度来看)。一个Topology是由一组静态程序组件(Spout/Bolt)、组件关系Streaming Groups这两部分组成。
Spout
描述了数据是如何从外部系统(或者组件内部直接产生)进入到Storm集群,并由该Spout所属的Topology来处理,通常是从一个数据源读取数据,也可以做一些简单的处理(为了不影响数据连续地、实时地、快速地进入到系统,通常不建议把复杂处理逻辑放在这里去做)。
Bolt
描述了与业务相关的处理逻辑。
抽象动态组件<br>
Task
Spout/Bolt在运行时所表现出来的实体,都称为Task,一个Spout/Bolt在运行时可能对应一个或多个Spout Task/Bolt Task,与实际在编写Topology时进行配置有关
Worker
运行时Task所在的一级容器,Executor运行于Worker中,一个Worker对应于- - Supervisor上创建的一个JVM实例
Executor
运行时Task所在的直接容器,在Executor中执行Task的处理逻辑;一个或多个Executor实例可以运行在同一个Worker进程中,一个或多个Task可以运行于同一个Executor中;在Worker进程并行的基础上,Executor可以并行,进而Task也能够基于Executor实现并行计算。
相关概念
Stream Grouping
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。<br>Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。<br>All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。<br>Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。<br>Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。<br>Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。<br>Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
Reliability
消息可靠性保障机制
每处理完一个tuple,调用OutputColletor的ack方法,通知acker消息是否投递成功,若没有成功,需要重新投递。
Redis
ElasticSearch
任务调度
airflow
设计理念
动态
python编写
可扩展
轻松定义自己的运算符(operators),执行器(executors)和扩展库
优雅
管道简洁明了,使用功能强大的Jinja模板引擎,将脚本参数化内置到Airflow的核心中
可伸缩
Airflow具有模块化架构,使用消息队列来协调任务数量的Worker。Airflow可扩展到无穷大。
3.数据治理
Apache Atlas
概述
架构
0 条评论
下一页