Spark工作机制以及API详解
1. Spark基本API解读
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /* 使用java实现word count */ public class WCJava1 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("wcjava"); JavaSparkContext jsc = new JavaSparkContext(conf); //加载文件 JavaRDD<String> rdd1 = jsc.textFile("hdfs://mycluster/wc.txt"); //压扁 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //map JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); List<Tuple2<String, Integer>> list = rdd4.collect(); for (Tuple2<String, Integer> stringIntegerTuple2 : list) { System.out.println(stringIntegerTuple2); } } }
1.1 Spark core模块核心概念介绍
RDD:弹性分布式数据集(resilient distributed dataset)
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)
1. 分区列表:指的是分区对象中包含的数据的范围
2. 计算每个切片的函数:指的是operator,也就是算子,或者称之为算法
3. 和其他RDD的依赖列表:主要有宽依赖以及窄依赖两种
4. (可选的)KV类型RDD的分区器
5. (可选的)计算每个切片的首选位置列表
1.2 RDD API基本特性总结
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
很明显,该函数使用的是一个柯里化过程,第一个括号内由用户指定一个zeroValue,即零值U,该值可以是任意的数据类型,在seqOp中使用的变换过程是 ( U , V ) => U,在同一个分区中,该零值与value进行某种聚合操作,因此能够将value的数据类型改变成U的数据类型,而combOp则是对分区之间各自计算出来的U值进行聚合操作,然后聚合成最终的U,即 ( U , U ) => U,该函数的优点有这么几个:
1. 使用了map端聚合,因此可以提高计算效率
2. 可以改变value的数据类型,并且可由用户自行指定聚合规则,非常的灵活
import org.apache.spark.{SparkConf, SparkContext} /* 写一个使用aggregateByKey方法做word count的demo */ object AggScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("aggscala") val sc = new SparkContext(conf) //设置zeroValue val zeroValue = "" def seqOp(U:String, V:Int) : String = { U + V + "," } def combOp(U1:String, U2:String) : String = { U1 + ":" + U2 } sc.textFile("hdfs://mycluster/wc.txt",2).flatMap(e => e.split(" ")) .map(e => (e, 1)) .aggregateByKey(zeroValue)(seqOp,combOp) .collect .foreach(println(_)) } }
def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }
2. Spark工作流程详解
2.1 Spark作业提交流程图
1. 客户端向主节点请求注册APP
2. 主节点APP注册成功,向客户端发送一条APP已经注册的回执
3. Master联系三个worker进程中的其中一个(上图中是worker-3)发送请求让该节点启动驱动
4. worker-3得到了从Master节点发送过来的启动驱动的请求,开始正式启动驱动,因此起来了一个叫做DriverWrappers的子进程
5. Master继续联系剩余的两个节点work-1以及worker-2并发送启动执行器即executor的请求
6. 两个节点调用startProcess方法准备开始启动执行器
7. 执行器向驱动发送注册执行器的请求
8. 驱动完成注册告知执行器可以执行任务了,此时在work-1以及work-2节点上将会出现粗粒度执行器后台进程,即CoarseGrainedExecutorBakend进程
9. 执行器得到来自驱动的消息并对数据进行解码,翻译成任务描述符TaskDesc
10. 执行器正式开始执行任务
2.2 三级调度框架
direct acycle graph,有向无环图调度器 ,
The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
minimal schedule to run the job. It then submits stages as TaskSets to an underlying
TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
tasks that can run right away based on the data that's already on the cluster (e.g. map output
files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with
"narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks
in each stage, but operations with shuffle dependencies require multiple stages (one to write a
set of map output files, and another to read those files after a barrier). In the end, every
stage will have only shuffle dependencies on other stages, and may compute multiple operations
inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
locations to run each task on, based on the current cache status, and passes these to the
low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:
- Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler.
For example, when the user calls an action, like count(), a job will be submitted through
submitJob. Each Job may require the execution of multiple stages to build intermediate data.
- Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each
task computes the same function on partitions of the same RDD. Stages are separated at shuffle
boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
- Tasks are individual units of work, each sent to one machine.
- Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them
and likewise remembers which shuffle map stages have already produced output files to avoid
redoing the map side of a shuffle.
- Preferred locations: the DAGScheduler also computes where to run each task in a stage based
on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
- Cleanup: all data structures are cleared when the running jobs that depend on them finish,
to prevent memory leaks in a long-running application.
- 作业(表示成[[ActiveJob]])是被提交到调度器上的最高级别的工作单元。比方说,当用户调用一个action的方法时,如,count()
- 阶段是计算作业中间结果的的任务集合,每个任务会在相同RDD的分区上计算相同的函数。阶段会在shuffle的边界处被分割,这将引入一
- 任务是独立的工作单位,一个任务会被发送至一台机器。
- 缓存追踪:DAG调度器会计算出哪些RDD需要被缓存以避免重复的计算,同样地,也会记住哪些shuffle map阶段已经输出了文件从而避免
- 最佳位置:DAG调度器也会基于后台的RDD的最佳位置计算在哪儿运行一个阶段的每个任务,或是缓存的或shuffle的数据的位置。
- 收尾:所有的数据结构在当依赖于它们的作业执行完成之后都会被清空,这是为了在一个长时间运行的应用程序中防止内存泄漏
/** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
首先第一行的注释中写了句话:在提交阶段之前,先要递归地提交任何丢失的父阶段,可以看到,在for(parent <- missing)循环中的确进行了阶段的递归提交,那么这个missing数组又是怎么被计算出来的呢?因此,我们就需要看一下getMissingParentStages的源码了,该方法源代码如下:
private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new ArrayStack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } missing.toList }
2.3 Spark的shuffle过程详解
Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
with it, and executors (or tasks running locally in the driver) can ask to read and write data.
NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
boolean isDriver as parameters.
In sort-based shuffle, incoming records are sorted according to their target partition ids, then
written to a single map output file. Reducers fetch contiguous regions of this file in order to
read their portion of the map output. In cases where the map output data is too large to fit in
memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged
to produce the final output file.
Sort-based shuffle has two different write paths for producing its map output files:
- Serialized sorting: used when all three of the following conditions hold:
1. The shuffle dependency specifies no aggregation or output ordering.
2. The shuffle serializer supports relocation of serialized values (this is currently
supported by KryoSerializer and Spark SQL's custom serializers).
3. The shuffle produces fewer than 16777216 output partitions.
- Deserialized sorting: used to handle all other cases.
- 串行化排序:在以下几个条件满足的情况下将会使用这种排序方式:
1. shuffle依赖没有指定聚合函数或输出排序。
2. shuffle串行化器支持串行化值的重定位(目前KryoSerializer串行化器以及Spark SQL的自定义串行化器支持该技术)
3. shuffle过程生成的输出分区数小于16777216个,即16m个文件
- 非串行化排序:被用来处理其他所有的情况
Serialized sorting mode
In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
shuffle writer and are buffered in a serialized form during sorting. This write path implements
several optimizations:
在串行化排序模式下,进来的记录一旦被传递给shuffle writer就会被即刻串行化并且在排序过程中被缓存成一个串行化的形式。
- Its sort operates on serialized binary data rather than Java objects, which reduces memory
consumption and GC overheads. This optimization requires the record serializer to have certain
properties to allow serialized records to be re-ordered without requiring deserialization.
See SPARK-4550, where this optimization was first proposed and implemented, for more details.
- 该方式的排序是以串行化了的二进制数据而不是以Java对象的方式进行的,因此这种方式就会大幅度降低内存消耗以及GC(垃圾回
- It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
record in the sorting array, this fits more of the array into cache.
- 它使用了一种特殊的高效的缓存排序机制([[ShuffleExternalSorter]])来对压缩了记录指针以及分区id的数组进行排序。通
- The spill merging procedure operates on blocks of serialized records that belong to the same
partition and does not need to deserialize records during the merge.
- 溢出合并的过程直接操作的就是串行化的数据而不需要在合并过程中对记录进行反串行化操作。
- When the spill compression codec supports concatenation of compressed data, the spill merge
simply concatenates the serialized and compressed spill partitions to produce the final output
partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
and avoids the need to allocate decompression or copying buffers during the merge.
- 当溢出压缩编解码器支持压缩数据的串联时,为了产生最终的输出分区,溢出合并只会串联串行了的并且是压缩的溢出分区。这个过程会
使用高效的数据拷贝技术(即零拷贝技术),就像是Java NIO技术中的"transferTo"方法,使用了这种技术就能够避免在合并过程中拷贝
override def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need map-side aggregation, then write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // Otherwise, buffer map outputs in a deserialized form: new BaseShuffleHandle(shuffleId, numMaps, dependency) } }
除此之外,还能得到的信息是这三种处理方式的优先级为bypass > serialized > base
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need map-side aggregation, then write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers.
abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}
private[spark] class BaseShuffleHandle[K, V, C]( shuffleId: Int, val numMaps: Int, val dependency: ShuffleDependency[K, V, C]) extends ShuffleHandle(shuffleId)
private[spark] class BypassMergeSortShuffleHandle[K, V]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, V]) extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }
private[spark] class SerializedShuffleHandle[K, V]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, V]) extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. if (dep.mapSideCombine) { false } else { val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) dep.partitioner.numPartitions <= bypassMergeThreshold } }
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false } else if (dependency.mapSideCombine) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " + s"map-side aggregation") false } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true } }
override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }
之前在分析registerShuffle方法的源码时已经说明了三种方式的优先级为bypass > serialized > base,那么接下来,我们就按照这个顺序来详细了解以下这三种不同的Writer吧!
public void write(Iterator<Product2<K, V>> records) throws IOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); partitionWriters = new DiskBlockObjectWriter[numPartitions]; partitionWriterSegments = new FileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); while (records.hasNext()) { final Product2<K, V> record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (int i = 0; i < numPartitions; i++) { final DiskBlockObjectWriter writer = partitionWriters[i]; partitionWriterSegments[i] = writer.commitAndGet(); writer.close(); } File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); try { partitionLengths = writePartitionedFile(tmp); shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); }
private long[] writePartitionedFile(File outputFile) throws IOException { // Track location of the partition starts in the output file final long[] lengths = new long[numPartitions]; if (partitionWriters == null) { // We were passed an empty iterator return lengths; } final FileOutputStream out = new FileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } threwException = false; } finally { Closeables.close(out, threwException); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; return lengths; }
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { // Keep track of success so we know if we encountered an exception // We do this rather than a standard try/catch/re-throw to handle // generic throwables. boolean success = false; try { while (records.hasNext()) { insertRecordIntoSorter(records.next()); } closeAndWriteOutput(); success = true; } finally { if (sorter != null) { try { sorter.cleanupResources(); } catch (Exception e) { // Only throw this error if we won't be masking another // error. if (success) { throw e; } else { logger.error("In addition to a failure during writing, we failed during " + "cleanup.", e); } } } } }
void insertRecordIntoSorter(Product2<K, V> record) throws IOException { assert(sorter != null); final K key = record._1(); final int partitionId = partitioner.getPartition(key); serBuffer.reset(); serOutputStream.writeKey(key, OBJECT_CLASS_TAG); serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG); serOutputStream.flush(); final int serializedRecordSize = serBuffer.size(); assert (serializedRecordSize > 0); sorter.insertRecord( serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId); }
An external sorter that is specialized for sort-based shuffle.
Incoming records are appended to data pages. When all records have been inserted (or when the
current thread's shuffle memory limit is reached), the in-memory records are sorted according to
their partition ids (using a {@link ShuffleInMemorySorter}). The sorted records are then
written to a single output file (or multiple files, if we've spilled). The format of the output
files is the same as the format of the final output file written by
{@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
written as a single serialized, compressed stream that can be read with a new decompression and
deserialization stream.
Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
specialized merge procedure that avoids extra serializationhttps://img.qb5200.com/download-x/deserialization.
/** * Write a record to the shuffle sorter. */ public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException { // for tests assert(inMemSorter != null); if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } growPointerArrayIfNecessary(); final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // Need 4 or 8 bytes to store the record length. final int required = length + uaoSize; acquireNewPageIfNecessary(required); assert(currentPage != null); final Object base = currentPage.getBaseObject(); final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); UnsafeAlignedOffset.putSize(base, pageCursor, length); pageCursor += uaoSize; Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); pageCursor += length; inMemSorter.insertRecord(recordAddress, partitionId); }
1. 首先判断内存排序器中记录的数量是否大于等于门限值,一旦大于,就会将数据溢出到磁盘
2. 然后使用到了一个重要技术,那就是内存拷贝(又称零拷贝技术),Platform.copyMemoey(recordBase, recordOffset, base, pageCursor, length)方法中将之前串行化缓冲区中的数据从记录偏移量位置处从base数组中的pageCursor位置处拷贝了length长度的字节,该方法比普通的拷贝技术更为高效
3. 内存拷贝结束后,最终才调用了inMemSorter的insertRecord方法将之前得到的记录指针和分区id打包成一个整体插入到内存排序器中去,所以,下一步,我们来看一下inMemSorter对象的insertRecord方法中做了什么
public void insertRecord(long recordPointer, int partitionId) { if (!hasSpaceForAnotherRecord()) { throw new IllegalStateException("There is no space for new record"); } array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++; }
public static long packPointer(long recordPointer, int partitionId) { assert (partitionId <= MAXIMUM_PARTITION_ID); // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24; final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS); return (((long) partitionId) << 40) | compressedAddress; }
@VisibleForTesting void closeAndWriteOutput() throws IOException { assert(sorter != null); updatePeakMemoryUsed(); serBuffer = null; serOutputStream = null; final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final long[] partitionLengths; final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); final File tmp = Utils.tempFileWith(output); try { try { partitionLengths = mergeSpills(spills, tmp); } finally { for (SpillInfo spill : spills) { if (spill.file.exists() && ! spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } } shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); }
public SpillInfo[] closeAndGetSpills() throws IOException { if (inMemSorter != null) { // Do not count the final file towards the spill count. writeSortedFile(true); freeMemory(); inMemSorter.free(); inMemSorter = null; } return spills.toArray(new SpillInfo[spills.size()]); }
private void writeSortedFile(boolean isLastFile) { final ShuffleWriteMetrics writeMetricsToUse; if (isLastFile) { // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. writeMetricsToUse = writeMetrics; } else { // We're spilling, so bytes written should be counted towards spill rather than write. // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count // them towards shuffle bytes written. writeMetricsToUse = new ShuffleWriteMetrics(); } // This call performs the actual sort. final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords = inMemSorter.getSortedIterator(); // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to // be an API to directly transfer bytes from managed memory to the disk writer, we buffer // data through a byte array. This array does not need to be large enough to hold a single // record; final byte[] writeBuffer = new byte[diskWriteBufferSize]; // Because this output will be read during shuffle, its compression codec must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more details. final Tuple2<TempShuffleBlockId, File> spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = spilledFileInfo._2(); final TempShuffleBlockId blockId = spilledFileInfo._1(); final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. // Our write path doesn't actually use this serializer (since we end up calling the `write()` // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; final DiskBlockObjectWriter writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); int currentPartition = -1; final int uaoSize = UnsafeAlignedOffset.getUaoSize(); while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final int partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition); if (partition != currentPartition) { // Switch to the new partition if (currentPartition != -1) { final FileSegment fileSegment = writer.commitAndGet(); spillInfo.partitionLengths[currentPartition] = fileSegment.length(); } currentPartition = partition; } final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); final Object recordPage = taskMemoryManager.getPage(recordPointer); final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); Platform.copyMemory( recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); writer.write(writeBuffer, 0, toTransfer); recordReadPosition += toTransfer; dataRemaining -= toTransfer; } writer.recordWritten(); } final FileSegment committedSegment = writer.commitAndGet(); writer.close(); // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. if (currentPartition != -1) { spillInfo.partitionLengths[currentPartition] = committedSegment.length(); spills.add(spillInfo); } if (!isLastFile) { // i.e. this is a spill file // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter // relies on its `recordWritten()` method being called in order to trigger periodic updates to // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that // counter at a higher-level, then the in-progress metrics for records written and bytes // written would get out of sync. // // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter; // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those // metrics to the true write metrics here. The reason for performing this copying is so that // we can avoid reporting spilled bytes as shuffle write bytes. // // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`. // Consistent with ExternalSorter, we do not count this IO towards shuffle write time. // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this. writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten()); } }
1. 这个方法的参数isLastFile判断的是写入磁盘的到底是溢出的文件还是正常的之前保存在排序器中的缓存,如果isLastFile为true,写出的就是缓存数据,反之,则写出溢出文件
2. 重要:紧接着,我们终于看到排序的代码了!注释写道://This call performs the actual sort,之后我们会单独讲解这个方法
3. 排序器数组已经完成了最终排序,接下去,又可以看到一个内存拷贝的方法,先从排好序的数组sortedRecords中获取记录指针,然后根据这个记录指针获得到内存空间,再将这片空间又拷贝给了writeBuffer对象,最后写出到磁盘
public ShuffleSorterIterator getSortedIterator() { int offset = 0; if (useRadixSort) { offset = RadixSort.sort( array, pos, PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); } else { MemoryBlock unused = new MemoryBlock( array.getBaseObject(), array.getBaseOffset() + pos * 8L, (array.size() - pos) * 8L); LongArray buffer = new LongArray(unused); Sorter<PackedRecordPointer, LongArray> sorter = new Sorter<>(new ShuffleSortDataFormat(buffer)); sorter.sort(array, 0, pos, SORT_COMPARATOR); } return new ShuffleSorterIterator(pos, array, offset); }
/** * Sorts the specified portion of the specified array using a binary * insertion sort. This is the best method for sorting small numbers * of elements. It requires O(n log n) compares, but O(n^2) data * movement (worst case). * * If the initial part of the specified range is already sorted, * this method can take advantage of it: the method assumes that the * elements from index {@code lo}, inclusive, to {@code start}, * exclusive are already sorted. * * @param a the array in which a range is to be sorted * @param lo the index of the first element in the range to be sorted * @param hi the index after the last element in the range to be sorted * @param start the index of the first element in the range that is * not already known to be sorted ({@code lo <= start <= hi}) * @param c comparator to used for the sort */ @SuppressWarnings("fallthrough") private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) { assert lo <= start && start <= hi; if (start == lo) start++; K key0 = s.newKey(); K key1 = s.newKey(); Buffer pivotStore = s.allocate(1); for ( ; start < hi; start++) { s.copyElement(a, start, pivotStore, 0); K pivot = s.getKey(pivotStore, 0, key0); // Set left (and right) to the index where a[start] (pivot) belongs int left = lo; int right = start; assert left <= right; /* * Invariants: * pivot >= all in [lo, left). * pivot < all in [right, start). */ while (left < right) { int mid = (left + right) >>> 1; if (c.compare(pivot, s.getKey(a, mid, key1)) < 0) right = mid; else left = mid + 1; } assert left == right; /* * The invariants still hold: pivot >= all in [lo, left) and * pivot < all in [left, start), so pivot belongs at left. Note * that if there are elements equal to pivot, left points to the * first slot after them -- that's why this sort is stable. * Slide elements over to make room for pivot. */ int n = start - left; // The number of elements to move // Switch is just an optimization for arraycopy in default case switch (n) { case 2: s.copyElement(a, left + 1, a, left + 2); case 1: s.copyElement(a, left, a, left + 1); break; default: s.copyRange(a, left, a, left + 1, n); } s.copyElement(pivotStore, 0, a, left); } }
/** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
/** * Spill the current in-memory collection to disk if needed. * * @param usingMap whether we're using a map or buffer as our current in-memory collection */ private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill }
3. 总结
1. mapreduce框架下,对于数据的各种变换都要落盘,而Spark引入了算子的概念,只有当出现shuffle的行为时才会发生落盘的现象,减少了大量的IO
2. Spark采用了有向无环图的机制,只有进行action的操作时才会真正触发计算任务的执行,其余的transform操作只是在计算一个任务执行计划
3. Spark的分区也就是将数据分成多个范围的概念是mapreduce所不具备的,一个分区执行一个任务,一个任务运行在一个线程上,这样就使得Shuffle可以多线程地执行,而mapreduce只有一次shuffle
4. Spark使用了非常多的手段来较少内存消耗,磁盘IO来提高计算性能,如序列化技术,零拷贝技术,以及高效地缓存排序算法,这些都是mapreduce所没有的