Spark Streaming 编程指南
2017-09-01 15:34:41 0 举报
AI智能生成
Spark Streaming 编程指南
作者其他创作
大纲/内容
基本概念
代码库依赖
以Maven方式管理依赖
<pre><code><dependency><br> <groupId>org.apache.spark</groupId><br> <artifactId>spark-streaming_2.10</artifactId><br> <version>1.6.1</version><br></dependency></code></pre>
以SBT方式管理依赖
<h2>初始化StreamingContext</h2>
根据conf创建
<pre><code>val ssc = new StreamingContext(conf, Seconds(1))</code></pre>
根据已有的SparkContext创建
<pre><code>val ssc = new StreamingContext(sc, Seconds(1))</code></pre>
<h2>离散数据流 (DStreams)</h2>
<span>离散数据流(DStream)是Spark Streaming最基本的抽象,代表了一种连续数据流</span>
<h2>输入DStream和接收器(二者相关联)</h2>
输入DStream
基础数据源
文件数据流(不依赖接收器)
<pre>streamingContext.fileStream(dataDirectory)</pre>
<span>streamingContext.textFileStream(dataDirectory)</span>
套接字
<pre><code>streamingContext.socketTextStream(host, port)</code></pre>
Akka actor
<span>streamingContext.actorStream(actorProps, actor-name)</span>
高级数据源
Kafka
Flume
<span>Kinesis</span>
自定义数据源
只需实现一个接收器(R<span>eceiver</span>)
接收器
接收器可靠性
不可靠接收器
<span>不可靠接收器不会发送任何确认信息。不过这种接收器常用于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源</span>
可靠接收器
<span>可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息</span>
<h2>DStream算子</h2>
<h2>transformation算子</h2>
基于窗口的算子
window(<i>windowLength, slideInterval</i>)
countByWindow(<i>windowLength, slideInterval</i>)
reduceByWindow(<i>func, windowLength, slideInterval</i>)
reduceByKeyAndWindow(<i style="">func</i>,<i style="">windowLength</i>, <i style="">slideInterval</i>, [<i style="">numTasks</i>])
reduceByKeyAndWindow(<i style="">func</i>, <i style="">invFunc</i>,<i style="">windowLength</i>,<i style="">slideInterval</i>, [<i style="">numTasks</i>])
与上一个的区别:<span>会用之前滑动窗口计算结果,递增地计算每个窗口的归约结果</span>
countByValueAndWindow(<i style="">windowLength</i>,<i style="">slideInterval</i>, [<i style="">numTasks</i>])
Join相关的算子
流-流(Stream-Stream)关联
可以调用window函数做基于窗口关联
<h4>流-数据集(stream-dataset)关联</h4>
不触发shuffle
map(func)
filter(func)
flatMap(func)
union(otherStream)
触发shuffle
repartition(numPartitions)
count()
reduce(func)
countByValue()
recudeByKey(func,[numTasks])
cogroup(otherStream,[numTasks])
transform(func)
<span>transform算子(及其变体transformWith)可以支持任意的RDD到RDD的映射操作</span>
<pre><code>val cleanedDStream = wordCounts.transform(rdd => {<br> rdd.join(spamInfoRDD).filter(...) // 将DStream中的RDD和spamInfoRDD关联,并实时过滤垃圾数据<br> ...<br>})</code></pre>
join(otherStream,[numTasks])
updateStateByKey(func)
按key更新状态
DStream输出算子
print()
saveAsTextFiles(<i style="">prefix</i>, [<i style="">suffix</i>])
saveAsHadoopFiles(<i style="">prefix</i>, [<i style="">suffix</i>])
foreachRDD(<i style="">func</i>)
DataFrame和SQL相关的算子
MLlib算子
<h2>缓存/持久化</h2>
一般Dstream持久化
调用persist()
<span>该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中</span>
滑动窗口算子持久化
<span>滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)</span>
<span>网络接收数据的输入数据流</span>
<span>默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本</span>
<h2>检查点</h2>
<span>检查点需要保存的两种数据</span>
元数据检查点(恢复驱动器节点上的故障)
<em>Configuration</em><span> – 创建Streaming应用程序的配置信息</span>
<em>DStream operations</em><span> – 定义流式处理逻辑的DStream操作信息</span>
<em>Incomplete batches</em><span> – 已经排队但未处理完的批次信息</span>
数据检查点(恢复有状态转换操作)
<span>将生成的RDD保存到可靠的存储中</span>
启用与配置
何时启用检查点
使用了有状态的转换算子时
需要支持驱动器故障中恢复时
如何配置检查点
启用有状态的转换算子检查点
<span>streamingContext.checkpoint(checkpointDirectory)</span>
驱动器故障后恢复
<span>如果程序是首次启动,就需要new一个新的StreamingContext,并定义好所有的数据流处理,然后调用StreamingContext.start()</span>
<span>如果程序是故障后重启,就需要从检查点目录中的数据中重新构建StreamingContext对象</span>
性能调优
减少批次处理时间三种方式
增加数据接收并发度三种方式
优化接收器阻塞间隔
数据块间隔配置参数<span>spark.streaming.blockInterval</span>
<span>增加数据流接收的并行度</span>
<pre><code>val numStreams = 5<br>val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }<br>val unifiedStream = streamingContext.union(kafkaStreams)<br>unifiedStream.print()</code></pre>
对输入DStream划分分区
<span>inputStream.repartition(<number of partitions>)</span>
增加数据处理并发度
修改并发任务数参数<span>spark.default.parallelism提高任务并发数量</span>
数据序列化(<span>Kryo序列化)</span>
序列化输入数据
<strong>Streaming算子所生产的持久化的RDDs</strong>
减少任务启动开销两种方式
<span>使用Kryo来序列化任务</span>
配置参数<span>spark.closure.serializer</span>
执行模式
<span> Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短</span>
设置合适的批次间隔
<span>批次数据的处理速度应该和其生成速度一样快</span>
内存调优
DStream持久化
Kryo序列化
数据压缩
配置参数<span>spark.rdd.compress</span>
清除老数据
<span>默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都是自动清理的</span>
CMS垃圾回收器
<span>设置驱动器CMS垃圾回收器(通过spark-submit中的–driver-java-options设置)</span>
设置执行器CMS垃圾回收器(使用spark.executor.extraJavaOptions配置参数)
其他提示
<span>配合Tachyon使用堆外内存来持久化RDD</span>
<span>使用更多但是更小的执行器进程。这样GC压力就会分散到更多的JVM堆中</span>
<span>每个批次中数据块的个数将会决定处理这批数据并行任务的个数</span>
输入数据默认存储级别:<a>StorageLevel.MEMORY_AND_DISK_SER_2</a>
在application中同时设置
容错语义
收藏
0 条评论
下一页