Spark中的数据读取保存和累加器实例详解
欣xy 人气:0数据读取与保存
Text文件
对于 Text文件的读取和保存 ,其语法和实现是最简单的,因此我只是简单叙述一下这部分相关知识点,大家可以结合demo具体分析记忆。
1)基本语法
(1)数据读取:textFile(String)
(2)数据保存:saveAsTextFile(String)
2)实现代码demo如下:
object Operate_Text { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 读取输入文件 val inputRDD: RDD[String] = sc.textFile("input/demo.txt") //3.2 保存数据 inputRDD.saveAsTextFile("textFile") //4.关闭连接 sc.stop() } }
Sequence文件
SequenceFile文件 是Hadoop中用来存储二进制形式的 key-value对 的一种平面文件(Flat File)。在SparkContext中,可以通过调用 sequenceFile[ keyClass,valueClass ] (path) 来调用。
1)基本语法
- (1)数据读取:sequenceFile[ keyClass, valueClass ] (path)
- (2)数据保存:saveAsSequenceFile(String)
2)实现代码demo如下:
object Operate_Sequence { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 创建rdd val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9))) //3.2 保存数据为SequenceFile dataRDD.saveAsSequenceFile("seqFile") //3.3 读取SequenceFile文件 sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println) //4.关闭连接 sc.stop() } }
Object对象文件
对象文件是将对象序列化后保存的文件,采用Hadoop的序列化机制。可以通过 objectFile[ k , v ] (path) 函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用 saveAsObjectFile() 实现对对象文件的输出。因为要序列化所以要指定类型。
1)基本语法
- (1)数据读取:objectFile[ k , v ] (path)
- (2)数据保存:saveAsObjectFile(String)
2)实现代码demo如下:
object Operate_Object { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 创建RDD val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2) //3.2 保存数据 dataRDD.saveAsObjectFile("objFile") //3.3 读取数据 sc.objectFile[Int]("objFile").collect().foreach(println) //4.关闭连接 sc.stop() } }
累加器
累加器概念
累加器,是一种变量---分布式共享只写变量。仅支持“add”,支持并发,但Executor和Executor之间不能读数据,可实现所有分片处理时更新共享变量的功能。
累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。
系统累加器
1)累加器定义(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
2)累加器添加数据(累加器.add方法)
sum.add(count)
3)累加器获取数据(累加器.value)
sum.value
注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。
4)累加器要放在行动算子中
因为转换算子执行的次数取决于job的数量,如果一个 spark应用 有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动算子中。
5) 代码实现:
object accumulator_system { package com.atguigu.cache import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object accumulator_system { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc = new SparkContext(conf) val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //需求:统计a出现的所有次数 ("a",10) //普通算子实现 reduceByKey 代码会走shuffle 效率低 val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _) //累加器实现 //1 声明累加器 val accSum: LongAccumulator = sc.longAccumulator("sum") dataRDD.foreach{ case (a,count) => { //2 使用累加器累加 累加器.add() accSum.add(count) // 4 不在executor端获取累加器的值,因为得到的值不准确,所以累加器叫分布式共享只写变量 //println("sum = " + accSum.value) } } //3 获取累加器的值 累加器.value println(("a",accSum.value)) sc.stop() } }
加载全部内容