基本
Spark 的 shell 提供了一种学习 API 的简单方法,以及交互式分析数据的强大工具。它可以在 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)或 Python 中使用。通过在 Spark 目录中运行以下命令来启动它:<br>./bin/spark-shell<br>Spark 的主要抽象是称为数据集的分布式项目集合。可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他数据集来创建数据集。让我们根据 Spark 源目录中的 README 文件的文本创建一个新的数据集:<br><br>scala> val textFile = spark.read.textFile("README.md")<br>textFile: org.apache.spark.sql.Dataset[String] = [value: string]<br>您可以通过调用某些操作直接从数据集中获取值,或者转换数据集以获取新的数据集。欲了解更多详细信息,请阅读API 文档。<br><br>scala> textFile.count() // Number of items in this Dataset<br>res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs<br><br>scala> textFile.first() // First item in this Dataset<br>res1: String = # Apache Spark<br>现在让我们将此数据集转换为新的数据集。我们调用filter返回一个新的数据集,其中包含文件中项目的子集。<br><br>scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))<br>linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]<br>我们可以将转换和行动链接在一起:<br><br>scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?<br>res3: Long = 15<br>
有关Dataset操作的更多信息
数据集操作和转换可用于更复杂的计算。假设我们想要找到单词最多的行:<br>scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)<br>res4: Long = 15<br>首先将一行映射到一个整数值,创建一个新的数据集。reduce调用该数据集来查找最大字数。map和 的参数reduce是 Scala 函数文字(闭包),并且可以使用任何语言功能或 Scala/Java 库。例如,我们可以轻松调用其他地方声明的函数。我们将使用Math.max()函数来使代码更容易理解:<br><br>scala> import java.lang.Math<br>import java.lang.Math<br><br>scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))<br>res5: Int = 15<br>一种常见的数据流模式是由 Hadoop 推广的 MapReduce。Spark 可以轻松实现 MapReduce 流程:<br><br>scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()<br>wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]<br>在这里,我们调用将行flatMap数据集转换为单词数据集,然后组合groupByKey并count计算文件中每个单词的计数作为(字符串,长整型)对的数据集。要在 shell 中收集字数统计,我们可以调用collect:<br><br>scala> wordCounts.collect()<br>res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)<br>
说明, identity 是匿名函数, 函数作用是对传入的值原样返回,此处表示key不变。
缓存
Spark 还支持将数据集拉入集群范围的内存缓存中。当重复访问数据时(例如查询小型“热”数据集或运行 PageRank 等迭代算法时),这非常有用。作为一个简单的示例,让我们将linesWithSpark数据集标记为要缓存:<br>scala> linesWithSpark.cache()<br>res7: linesWithSpark.type = [value: string]<br><br>scala> linesWithSpark.count()<br>res8: Long = 15<br><br>scala> linesWithSpark.count()<br>res9: Long = 15<br><br>使用 Spark 来探索和缓存 100 行文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。您还可以通过连接bin/spark-shell到集群以交互方式执行此操作,如RDD 编程指南中所述。<br>