Spark_总体
如果会简单的 Scala 语言,那么学习 Spark 的时候会如虎添翼
Spark用来做什么?
取代MapReduce做批处理计算。
为什么 Spark 比 MapReduce 快?
- Spark 是基于内存的,而 MapReduce 是基于磁盘的。
- Spark 是基于线程方式运行在进程中的,MapReduce 是基于进程运行的。
Spark集群架构。
Spark的数据抽象。
spark的数据抽象是RDD,它是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:
创建RDD
- 有现有集合创建
- 引用外部数据集
使用外部数据集需要注意:
如果在集群环境下从本地文件系统读取数据,则要求该文件必须在集群中所有机器上都存在,且路径相同。
支持目录路径,支持压缩文件,支持使用通配符。
操作RDD
RDD支持两种类型的操作:transformations(转换,从现有数据集创建新数据集)和 actions(在数据集上运行计算后将值返回到驱动程序)。RDD中的所有转换操作都是惰性的,它们只是记住这些转换操作,但不会立即执行,只有遇到 action 操作后才会真正的进行计算,这类似于函数式编程中的惰性求值。
- 理解Shuffle
由于Shuffle操作对性能的影响比较大,所以需要特别注意使用,以下操作都会导致Shuffle:
涉及到重新分区操作: 如repartition 和 coalesce;
所有涉及到ByKey的操作:如groupByKey和reduceByKey,但countByKey除外;
联结操作:如cogroup和join。
- 宽依赖和窄依赖
RDD 和它的父 RDD(s) 之间的依赖关系分为两种不同的类型:
窄依赖 (narrow dependency):父 RDDs 的一个分区最多被子 RDDs 一个分区所依赖;
宽依赖 (wide dependency):父 RDDs 的一个分区可以被子 RDDs 的多个子分区所依赖。
- 区分这两种依赖是非常有用的:
首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)对父分区数据进行计算,例如先执行 map 操作,然后执行 filter 操作。而宽依赖则需要计算好所有父分区的数据,然后再在节点之间进行 Shuffle,这与 MapReduce 类似。
窄依赖能够更有效地进行数据恢复,因为只需重新对丢失分区的父分区进行计算,且不同节点之间可以并行计算;而对于宽依赖而言,如果数据丢失,则需要对所有父分区数据进行计算并再次 Shuffle。
外部数据集
Spark可以从Hadoop支持的任何存储源(包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等)创建分布式数据集。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
可以使用SparkContext的textFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合。
- Java 代码
public class Spark_Text_Demo {
private static String appName = "spaer.text.demo";
private static String master = "local[2]";
public static void main(String[] args) {
JavaSparkContext context = null;
try {
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
context = new JavaSparkContext(conf);
JavaRDD<String> rdd = context.textFile("F:\\SparkText");
// JavaRDD<String> rdd = context.textFile("hdfs://master01:9000/Demo_27");
rdd.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("spark");
}
});
Integer reduce = rdd.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println("执行结果:" + reduce);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(context != null) {
context.close();
}
}
}
}
- Scala 代码
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("1208").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.textFile("F:\\SparkText").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
//sc.textFile("hdfs://master01:9000/Demo_27").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
}
}
注意事项
- 如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制给所有工作人员,或使用网络安装的共享文件系统。
- Spark的所有基于文件的输入法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile(“/my/directory”),textFile(“/my/directory/.txt”)和textFile(“/my/directory/.gz”)。
- 该textFile方法还采用一个可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更大数量的分区。请注意,分区不能少于块。
Stage 划分
Stage 的划分依据是:宽依赖。
Stage 与 Stage 之间的过程就是 Shuffle 。负责这个过程的是 ShuffleMapStage 和 ResultStage。
DAGScheduler 会将 Job 的 RDD 划分到不同的 Stage 中,并构建一个 Stage 的依赖关系,即DAG。这样划分的目的是既可以保障没有依赖关系的 Stage 可以并行执行,又可以保证存在依赖关系的 Stage 顺序执行。Stage 主要分为两种类型,一种是 ShuffleMapStage,另一种是 ResultStage。其中 ShuffleMapStage 是属于上游的 Stage,而 ResulStage 属于最下游的 Stage,这意味着上游的 Stage 先执行,最后执行 ResultStage。
可以这样理解:在 DAG 中进行反向解析,遇到宽依赖就断开、遇到窄依赖就把当前的 RDD 加入到当前的 Stage 中。即将窄依赖划分到同一个s Stage中,从而形成一个 Pipeline,提升计算效率。所以一个 DAG 图可以划分为多个 Stage,每个 Stage都代表了一组关联的,相互之间没有Shuffle依赖关系的任务组成的 Task 集合,每个 Task 集合会被提交到 TaskScheduler 进行调度处理,最终将任务分发到 Executor 中进行执行。
其实就是一句话:每一个独立的 Stage 中,只存在一种依赖,那就是窄依赖。
Shuffle
Shuffle 可以理解为对数据的重组。
先看下 Hadoop 的 Shuffle 流程:
在 DAG 调度的过程中,Stage 阶段的划分是根据是否有 Shuffle 过程,也就是存在宽依赖的时候,需要进行 Shuffle,这时候会将 Job 划分成多个Stage,每一个 Stage 内部有很多可以并行运行的 Task。
Stage 与 Stage 之间的过程就是 Shuffle 阶段,在 Spark 中,负责 Shuffle 过程的执行、计算和处理的组件主要就是 ShuffleManager 。ShuffleManager 随着 Spark 的发展有两种实现的方式,分别为 HashShuffleManager 和 SortShuffleManager ,因此 Spark的Shuffle有 Hash Shuffle 和 Sort Shuffle 两种。
- HashShuffleManager
HashShuffleManager 有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘 IO 操作影响了性能。
- SortShuffleManager
SortShuffleManager 相较于 HashShuffleManager 来说,主要就在于每个 Task 在进行 Shuffle 操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(Merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 Stage 的 Shuffle Read Task 拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
- SortShuffleManager 的运行机制
普通机制
运行流程
- 数据先写入数据结构,聚合算子写入 Map ,一边通过 Map 局部聚合,一边写入内存;Join 直接写入内存。
- 判断内存是否达到阈值(5M)。如果达到就将数据写入磁盘,清空内存。
- 数据在写入磁盘前会先排序,排序好的数据通过缓冲区溢写方式在分批写入磁盘文件。默认批次一万条。每次溢写都会产生一个磁盘文件。
- 在每个 Task 中,将所有临时文件合并。这个过程会将所有临时文件读取出来,一次写入到最终文件。这就意味着一个 Task 的所有数据都在这个文件中。同时单独写一份索引文件,标识下游各个 Task 的数据在文件中的 Start Offset 和 End Offset。
好处
- 小文件明显变少了,一个 Task 只生成一个 File 文件。
- File 文件整体有序,加上索引文件的辅助,查找变快,虽然排序浪费一些性能,但是查找变快很多。
广播变量
广播变量是一个只读的变量,并且在每个节点都保存一份副本,而不需要在集群中发送数据。
使用场景:当我们的数据有一部分在整个数据处理的流程中,是只读的时候,那么推荐使用广播变量。
- 代码
object SparkBroadcast {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("SparkBroadcast")
val ssc = new SparkContext(conf)
val mapData = Map("Spark" -> 10, "Flink" -> 20,"Hadoop" -> 15, "Hive" -> 9)
//定义广播变量
val broadRDD = ssc.broadcast(mapData)
println(broadRDD.value)
ssc.stop()
}
}
注意事项
- 不能将一个 RDD 使用广播变量广播出去。
- 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
- 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
- 如果 Executor 端用到了 Driver 的变量,如果不使用广播变量在 Executor 有多少 Task 就有多少 Driver 端的变量副本。
- 如果 Executor 端用到了 Driver 的变量,如果使用广播变量在每个 Executo r中只有一份Driver 端的变量副本
累加器
累加器是 Spark 提供的另外一个共享变量,与广播变量不同,累加器是可以被修改的,是可变的。
使用场景:调用 Foreach 访问 RDD 中的每个元素的时候,Foreach 内部不可更新其它变量,否则与预期结果不符。原因是: Spark 是以不同节点的一组任务并行一个函数,会把函数内的每个变量的副本发送到每个任务。
- 代码
object SparkAccumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("SparkAccumulator")
val ssc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = ssc.parallelize(list)
//定义外部变量
var counter = 0
//定义累加器
val countAcc = ssc.longAccumulator("accumulator")
val mapRDD = listRDD.map(num => {
//外部变量的值并不会改变
counter += 1
//满足条件使用累加器
if(num % 3 == 0) {
countAcc.add(1)
}
num % 2
})
mapRDD.foreach(println)
println("counter = " + counter)
println("countAcc = " + countAcc)
ssc.stop()
}
}
我们在dirver中声明的一些局部变量或者成员变量,可以直接在transformation中使用,但是经过transformation操作之后,是不会将最终的结果重新赋值给dirver中的对应的变量。因为通过action触发transformation操作之后,transformation的操作都是通过DAGScheduler将代码打包,然后序列化,最后交由TaskScheduler传送到各个Worker节点中的Executor去执行,在transformation中执行的这些变量,是自己节点上的变量,不是dirver上最初的变量,只不过是将driver上的对应的变量拷贝了一份而已。
持久化
- persist() 可以把数据缓存到内存和磁盘。
- cache() 把数据缓存到内存中,本质上还是调用 persist() 方法。
- 持久化机制
Spark 的任务调度
- 初始化 Driver,Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler。
- TaskScheduler 去注册中心 Master 注册,寻找 Work 启动 Executor 。
- Executor 把注册的信息发送给 Driver 。
- 按照 RDD 的操作,生成 DAG 有向无环图。
- DAGScheduler 拿到 DAG 有向无环图之后,按照宽依赖进行 Stage 的划分。每一个 Stage 内部有很多可以并行运行的 Task,最后封装在一个一个的 TaskSet 集合中,然后把 TaskSet 发送给 TaskScheduler。
- TaskScheduler 得到 TaskSet 集合之后,依次遍历取出每一个 Task 提交到 Worker 节点上的 Executor 进程中运行。
- Task 运行完,任务结束。
解释说明
- Application 在遇到不同的算子时会划分不同的 Job。
- Job 会根据是否有宽依赖划分多个 Stage。
- Stage 里又分多个 Task (任务逻辑相同,数据不同)。
- 每个 Task 对应一个分区 Partition 。
Spark 分区
- HashPartitioner (Spark 默认)
- RangePartitioner
- 自定义分区
- 自定义分区代码
//自定义分区只要继承 Partitioner 类,重写其中的方法即可
class CustomerPartition(partitions: Int) extends Partitioner{
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
(key.toString.charAt(0) + scala.util.Random.nextInt(10)) % partitions
}
}
object DefaultPartition {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").getOrCreate()
val bigData = util.Arrays.asList("Hadoop", "Spark", "Flink", "Hive", "Impala", "Hbase", "Kafka", "ClickHouse", "KUDU", "zookeeper")
import spark.implicits._
val stringDataset = spark.createDataset(bigData)
println("当前分区:" + stringDataset.rdd.partitions.length)
val reparationDS = stringDataset.repartition(4)
println("第一次分区:" + reparationDS.rdd.partitions.length)
val stringRDD = stringDataset.rdd.map((word => (word,word.length)))
val partitionRDD = stringRDD.partitionBy(new CustomerPartition(8))
println("第二次分区:" + partitionRDD.partitions.length)
}
}
- Spark 内自己的方法
- coalesce() 方法,避免 Shuffle ,只能减少分区。
- repartition() 方法,会有 Shuffle,可以减少分区和增加分区。