一、广播变量和累加器

通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。
1.1 广播变量:
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
Spark的动作通过一系列的步骤执行,这些步骤由分布式的shuffle操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。
1.2 累加器:
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
下面的代码展示了如何把一个数组中的所有元素累加到累加器上:
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
尽管上面的例子使用了内置支持的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来创建它们自己的累加器类型。AccumulatorParam接口有两个方法:
zero方法为你的类型提供一个0值。
addInPlace方法将两个值相加。
假设我们有一个代表数学vector的Vector类。我们可以向下面这样实现:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元素来创建一个列表)。同时,SparkContext..accumulableCollection方法来累加通用的Scala的集合类型。
累加器仅仅在动作操作内部被更新,Spark保证每个任务在累加器上的更新操作只被执行一次,也就是说,重启任务也不会更新。在转换操作中,用户必须意识到每个任务对累加器的更新操作可能被不只一次执行,如果重新执行了任务和作业的阶段。
累加器并没有改变Spark的惰性求值模型。如果它们被RDD上的操作更新,它们的值只有当RDD因为动作操作被计算时才被更新。因此,当执行一个惰性的转换操作,比如map时,不能保证对累加器值的更新被实际执行了。下面的代码片段演示了此特性:
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在这里,accum的值仍然是0,因为没有动作操作引起map被实际的计算.
二.Java和Scala版本的实战演示
2.1 Java版本:
/**
* 实例:利用广播进行黑名单过滤!
* 检查新的数据 根据是否在广播变量-黑名单内,从而实现过滤数据。
*/
public class BroadcastAccumulator {
/**
* 创建一个List的广播变量
*
*/
private static volatile Broadcast<List<String>> broadcastList = null;
/**
* 计数器!
*/
private static volatile Accumulator<Integer> accumulator = null;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").
setAppName("WordCountOnlineBroadcast");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/**
* 注意:分发广播需要一个action操作触发。
* 注意:广播的是Arrays的asList 而非对象的引用。广播Array数组的对象引用会出错。
* 使用broadcast广播黑名单到每个Executor中!
*/
broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
/**
* 累加器作为全局计数器!用于统计在线过滤了多少个黑名单!
* 在这里实例化。
*/
accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
/**
* 这里省去flatmap因为名单是一个个的!
*/
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
/**
* Funtion里面 前几个参数是 入参。
* 后面的出参。
* 体现在call方法里面!
*
*/
wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
if (broadcastList.value().contains(wordPair._1)) {
/**
* accumulator不仅仅用来计数。
* 可以同时写进数据库或者缓存中。
*/
accumulator.add(wordPair._2);
return false;
}else {
return true;
}
};
/**
* 广播和计数器的执行,需要进行一个action操作!
*/
}).collect();
System.out.println("广播器里面的值"+broadcastList.value());
System.out.println("计时器里面的值"+accumulator.value());
return null;
}
});
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
2.2 Scala版本
package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
* Created by lxh on 2016/6/30.
*/
object BroadcastAccumulatorStreaming {
/**
* 声明一个广播和累加器!
*/
private var broadcastList:Broadcast[List[String]] = _
private var accumulator:Accumulator[Int] = _
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
val sc = new SparkContext(sparkConf)
/**
* duration是ms
*/
val ssc = new StreamingContext(sc,Duration(2000))
// broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
/**
* 获取数据!
*/
val lines = ssc.socketTextStream("localhost",9999)
/**
* 1.flatmap把行分割成词。
* 2.map把词变成tuple(word,1)
* 3.reducebykey累加value
* (4.sortBykey排名)
* 4.进行过滤。 value是否在累加器中。
* 5.打印显示。
*/
val words = lines.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1))
wordpair.filter(record => {broadcastList.value.contains(record._1)})
val pair = wordpair.reduceByKey(_+_)
/**
* 这个pair 是PairDStream<String, Integer>
* 查看这个id是否在黑名单中,如果是的话,累加器就+1
*/
/* pair.foreachRDD(rdd => {
rdd.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(1)
return true
} else {
return false
}
})
})*/
val filtedpair = pair.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(record._2)
true
} else {
false
}
}).print
println("累加器的值"+accumulator.value)
// pair.filter(record => {broadcastList.value.contains(record._1)})
/* val keypair = pair.map(pair => (pair._2,pair._1))*/
/**
* 如果DStream自己没有某个算子操作。就通过转化transform!
*/
/* keypair.transform(rdd => {
rdd.sortByKey(false)//TODO
})*/
pair.print()
ssc.start()
ssc.awaitTermination()
}
}
总结
以上就是本文关于Spark的广播变量和累加器使用方法代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:详解Java编写并运行spark应用程序的方法 、 Spark入门简介等,有什么问题可以随时留言,小编会及时回复大家。感谢朋友们对网站的支持。
# spark
# 累加器的使用
# 广播变量
# 使用
# Spark学习笔记之Spark中的RDD的具体使用
# Spark学习笔记之Spark SQL的具体使用
# Spark学习笔记Spark Streaming的使用
# 初识Spark入门
# 详解Java编写并运行spark应用程序的方法
# Python中用Spark模块的使用教程
# pyspark 读取csv文件创建DataFrame的两种方法
# 使用docker快速搭建Spark集群的方法教程
# Spark学习笔记(一)Spark初识【特性、组成、应用】
# 累加器
# 可以通过
# 在这里
# 器上
# 有效地
# 序列化
# 它会
# 创建一个
# 自己的
# 机器上
# 的是
# 是一个
# 有什么
# 几个
# 多个
# 就不
# 两种
# 还不
# 计时器
# 感兴趣
相关文章:
,交易猫的商品怎么发布到网站上去?
正规网站制作公司有哪些,目前国内哪家网页网站制作设计公司比较专业靠谱?口碑好?
北京专业网站制作设计师招聘,北京白云观官方网站?
家具网站制作软件,家具厂怎么跑业务?
建站之星24小时客服电话如何获取?
网站制作怎么样才能赚钱,用自己的电脑做服务器架设网站有什么利弊,能赚钱吗?
建站之星代理如何优化在线客服效率?
大连网站制作公司哪家好一点,大连买房网站哪个好?
如何在云虚拟主机上快速搭建个人网站?
如何确保西部建站助手FTP传输的安全性?
建站三合一如何选?哪家性价比更高?
香港服务器网站搭建教程-电商部署、配置优化与安全稳定指南
网站制作中优化长尾关键字挖掘的技巧,建一个视频网站需要多少钱?
临沂网站制作公司有哪些,临沂第四中学官网?
广州网站设计制作一条龙,广州巨网网络科技有限公司是干什么的?
已有域名和空间如何快速搭建网站?
如何快速查询网址的建站时间与历史轨迹?
C#怎么创建控制台应用 C# Console App项目创建方法
香港代理服务器配置指南:高匿IP选择、跨境加速与SEO优化技巧
非常酷的网站设计制作软件,酷培ai教育官方网站?
如何快速选择适合个人网站的云服务器配置?
儿童网站界面设计图片,中国少年儿童教育网站-怎么去注册?
设计网站制作公司有哪些,制作网页教程?
武清网站制作公司,天津武清个人营业执照注销查询系统网站?
全景视频制作网站有哪些,全景图怎么做成网页?
个人摄影网站制作流程,摄影爱好者都去什么网站?
b2c电商网站制作流程,b2c水平综合的电商平台?
重庆网站制作公司哪家好,重庆中考招生办官方网站?
制作门户网站的参考文献在哪,小说网站怎么建立?
如何做静态网页,sublimetext3.0制作静态网页?
建站DNS解析失败?如何正确配置域名服务器?
再谈Python中的字符串与字符编码(推荐)
c++怎么实现高并发下的无锁队列_c++ std::atomic原子变量与CAS操作【详解】
如何在阿里云香港服务器快速搭建网站?
网站制作公司,橙子建站是合法的吗?
如何快速搭建个人网站并优化SEO?
c# 服务器GC和工作站GC的区别和设置
高配服务器限时抢购:企业级配置与回收服务一站式优惠方案
如何用PHP快速搭建高效网站?分步指南
公司网站制作费用多少,为公司建立一个网站需要哪些费用?
国美网站制作流程,国美电器蒸汽鍋怎么用官方网站?
官网自助建站系统:SEO优化+多语言支持,快速搭建专业网站
如何高效利用200m空间完成建站?
头像制作网站在线制作软件,dw网页背景图像怎么设置?
如何在IIS服务器上快速部署高效网站?
专业网站建设制作报价,网页设计制作要考什么证?
西安大型网站制作公司,西安招聘网站最好的是哪个?
如何通过宝塔面板实现本地网站访问?
如何通过虚拟主机空间快速建站?
深圳网站制作的公司有哪些,dido官方网站?
*请认真填写需求信息,我们会在24小时内与您取得联系。