本文介绍: 数据不动代码动的最高境界是数据就在当前节点内存中。有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,例如,遇到reduceBykey等宽依赖操作算子,Spark将根据宽依赖划分Stage,Stage内部通过Pipeline操作,通过Block Manager获取相关的数据,因为具体的split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的split都会映射成BlockManager的Block,而具体split会被函数处理函数处理的具体形式是以任务的形式进行的。

1、什么是Apache Spark?Spark是什么?

基于内存分布式大数据并行计算框架,可用于构建大型的、低延迟数据分析应用程序包含Spark core、Spark sql、Spark streaming 、Spark MLlibspark GraphX五个核心组件

2、Spark核心组件是什么?

Spark Core:是其它组件的基础,spark内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装底层通讯框架, 是Spark的基础。实现了 Spark 的基本功能包含任务调度内存管理错误恢复,与存储系统交互模块。还包含了对弹性分布式数据集(Resilient Distributed Dataset,简称RDD)的API 定义
Spark SQL:用于处理结构化数据的模块支持SQL查询和DataFrame API。是Spark用来操作结构化数据的程序包可以使用SQL或者HQL来对历史数据交互式查询(即席查询用户根据自己需求 自定义查询)。Spark SQL 支持多种数据源比如Hive表,Parquet 以及 JSON 等。
Spark Streaming:是一个实时数据流进行高通量、容错处理流式处理系统可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字) 进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业
MLlib:Spark的机器学习库,提供了常见的机器学习算法工具构建在 Spark 之上的提供常见的机器学习(ML)功能程序库,支持系列数据挖掘算法。包括分类回归聚类协同过滤,还提供了模型评估、数据导入额外支持功能
GraphX:Spark的图计算库,用于处理分析任务

3、spark资源调度方式spark的有几种部署模式,每种模式特点?

1)Local(本地模式):运行一台计算机上的模式,通常就是用于本机上练手和测试
local:只启动一个executor
local[k]:启动k个executor
local[*]:启动cpu数目相同的 executor
2)Standalone模式:分布式部署集群自带完整的服务资源管理任务监控是Spark自己监控,这个模式也是其他模式的基础。构建一个由Master+Slave 构成的 Spark 集群,Spark 运行在集群中。
3)Spark on yarn模式:分布式部署集群,资源任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式包含clusterclient运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能client适合调试dirver运行客户端
4)Spark On Mesos模式(国内不常用):官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。用户选择两种调度模式之一运行自己应用程序
(1)粗粒度模式(Coarse-grained Mode):每个应用程序运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源内部可运行多个Task对应多少个“slot”)。应用程序各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
(2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配

4、 spark 的特点?
运行快:基于内存,Spark 实现了高效的DAG执行引擎可以通过基于内存来高效处理数据流,计算的中间结果存在于内存中的。

易于使用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题方法

通用:Spark提供了统一解决方案,可用于批处理交互式查询,实时处理,机器学习和图计算,都可以在同一个应用中无缝使用

兼容性:Spark可以很方便的与其他的开源产品进行融合。

5、Spark支持哪些数据源和数据格式

Spark支持多种数据源和数据格式,包括:

文件系统:Spark可以读写各种文件系统,如HDFS、本地文件系统、S3等。
数据库:Spark可以连接读写关系型数据库,如MySQL、PostgreSQL等,也支持NoSQL数据库,如MongoDB、Cassandra等。
实时数据流:Spark支持读取实时数据流,如Kafka、Flume等。
数据格式:Spark支持常见的数据格式,如CSV、JSON、Parquet、Avro等。

6、Spark的数据处理模型中的RDD是什么?什么是RDD?

RDD(Reslilent Distributed Dataset弹性分布式数据集),是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区里面元素可并行计算的集合

7、RDD的特点是什么?

分布式:RDD是一个抽象概念,RDD在spark driver中,通过RDD来引用数据,数据真正存储在节点机的partition上。

只读在Spark中RDD一旦生成了,就不能修改。 那么为什么设置只读设置只读的话,因为不存在修改并发吞吐量就上来了。

血缘关系:我们需要对RDD进行一系列操作,因为RDD是只读的,我们只能不断的生产新的RDD,这样,新的RDD与原来的RDD就会存在一些血缘关系。

Spark会记录这些血缘关系,在后期的容错上会有很大的益处。

缓存当一个 RDD 需要重复使用时,或者当任务失败重新计算的时候,这时如果将 RDD 缓存起来,就可以避免重新计算,保证程序运行性能

8、RDD的五大属性?RDD的五大特点(参考Spark之RDD的五大特性 – 简书

一组分区(Partition),即数据集的基本组成单位
一个计算每个分区函数
RDD之间的依赖关系。
一个Partitioner,即RDD的分片函数
一个列表存储存取每个Partition的优先位置

详细内容
1)分区列表(a list of partitions)。Spark RDD是被分区的,每一个分区都会被一个计算任务(Task处理分区数决定并行计算数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个Partition,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数,如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序分配到的资源的CPU核数(每个Core可以承载2~4个Partition),如果是从HDFS文件创建默认文件的Block数。
2)每一个分区都有一个计算函数(a function for computing each split)。每个分区都会有计算函数,Spark的RDD的计算函数是以分片基本单位的,每个RDD都会实现compute函数,对具体的分片进行计算,RDD中的分片是并行的,所以是分布式并行计算。有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,例如,遇到reduceBykey等宽依赖操作算子,Spark将根据宽依赖划分Stage,Stage内部通过Pipeline操作,通过Block Manager获取相关的数据,因为具体的split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的split都会映射成BlockManager的Block,而具体split会被函数处理,函数处理的具体形式是以任务的形式进行的。
3)依赖于其他RDD的列表(a list of dependencies on other RDDs)。RDD的依赖关系,由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线的前后依赖关系,当然,宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有的数据分片,这时数据分片就不进行内存中的Pipeline,这时一般是跨机器的。因为有前后的依赖关系,所以当有分区数据丢失的时候,Spark会通过依赖关系重新计算,算出丢失的数据,而不是对RDD所有的分区进行重新计算。RDD之间的依赖有两种:窄依赖(Narrow Dependency)、宽依赖(Wide Dependency)。RDD是Spark的核心数据结构,通过RDD的依赖关系形成调度关系。通过对RDD的操作形成整个Spark程序
RDD有Narrow Dependency和Wide Dependency两种不同类型的依赖,其中的Narrow Dependency指的是每一个parent RDD的Partition最多被child RDD的一个Partition所使用,而Wide Dependency指的是多个child RDD的Partition会依赖于同一个parent RDD的Partition。可以从两个方面来理解RDD之间的依赖关系:一方面是该RDD的parent RDD是什么;另一方面是依赖于parent RDD的哪些Partitions;根据依赖于parent RDD的Partitions的不同情况,Spark将Dependency分为宽依赖和窄依赖两种。Spark中宽依赖指的是生成的RDD的每一个partition都依赖于父RDD的所有partition,宽依赖典型的操作groupByKey、sortByKey等,宽依赖意味着shuffle操作,这是Spark划分Stage边界的依据,Spark中宽依赖支持两种Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基于Hash的Shuffle机制,后者是基于排序的Shuffle机制。Spark 2.2现在的版本中已经没有Hash Shuffle的方式。
4)keyvalue数据类型的RDD分区器(-Optionally,a Partitioner for keyvalue RDDS),控制分区策略和分区数。每个keyvalue形式的RDD都有Partitioner属性,它决定了RDD如何分区。当然,Partition的个数还决定每个Stage的Task个数。RDD的分片函数,想控制RDD的分片函数的时候可以分区(Partitioner)传入相关参数,如HashPartitioner、RangePartitioner,它本身针对keyvalue的形式,如果不是keyvalue的形式,它就不会有具体的Partitioner。Partitioner本身决定了下一步会产生多少并行的分片,同时,它本身也决定了当前并行(parallelize)Shuffle输出的并行数据,从而使Spark具有能够控制数据在不同节点上分区的特性用户可以自定义分区策略,如Hash分区等。Spark提供了“partitionBy”运算符,能通过集群对RDD进行数据再分配创建一个新的RDD。
5)每个分区都有一个优先位置列表(-Optionally,a list of preferred locations to compute each split on)。它会存储每个Partition的优先位置,对于一个HDFS文件来说,就是每个Partition块的位置。观察运行spark集群的控制台发现Spark的具体计算,具体分片前,它已经清楚地知道任务发生在什么节点上,也就是说,任务本身是计算层面的、代码层面的,代码发生运算之前已经知道它要运算的数据在什么地方,有具体节点的信息。这就符合大数据中数据不动代码动的特点。数据不动代码动的最高境界是数据就在当前节点的内存中。这时有可能是memory级别或Alluxio级别的,Spark本身在进行任务调度时候,会尽可能将任务分配到处理数据的数据块所在的具体位置。据Spark的RDD.Scala源码函数getPreferredLocations可知,每次计算都符合完美的数据本地性。

9、RDD 的缓存三种方式?

cache、persist、checkPoint

1、cache 方法不是在被调用的时候立即进行缓存,而是当触发action 类型算子之后,才会进行缓存
2、cache 和 persist 的区别 其实 cache 底层实际调用的就是 persist 方法,只是缓存级别默认是 MEMORY_ONLY,而 persist 方法可以指定其他的缓存级别。
3、cachecheckPoint区别 checkPoint 是将数据缓存本地或者 HDFS 文件存储系统中,当某个节点的 executor 宕机了之后,缓存的数据不会丢失,而通过 cache 缓存的数据就会丢掉。
checkPoint 的时候会把 job 从开始重新再计算一遍,因此在 checkPoint 之前最好先进行一步 cache 操作,cache 不需要重新计算,这样可以节省计算的时间
4、persist 和 checkPoint 的区别 persist 也可以选择将数据缓存磁盘当中,但是它交给 blockManager 管理的,一旦程序运行结束blockManager 也会被停止,这时候缓存的数据就会被释放掉。而 checkPoint 持久化的数据并不会被释放,是一直存在的,可以被其它的程序所使用。

10、RDD的创建方式?

1)由一个已经存在的Scala数据集合创建(由内存中创建
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
2)由外部存储系统的数据集创建,包括本地文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,关系型数据库mysql
val rdd =sc.textFile(“hdfs://hadoop10/spark/wc/input/words.txt“)
3)从其他RDD转化而来

11、什么是RDD宽依赖和窄依赖?

RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency
1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition

12、RDD的弹性表现在哪几点?

1)自动的进行内存和磁盘存储切换
2)基于Lineage的高效容错;
3)task如果失败自动进行特定次数的重试
4)stage如果失败自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久缓存
6)数据调度弹性,DAG TASK调度和资源无关;
7)数据分片的高度弹性

13、RDD有哪些缺陷

1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的。所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读。
2)不支持增量迭代计算,Flink支持。

14、Spark为什么比mapreduce快?

1)基于内存计算,减少低效的磁盘交互
2)高效的调度算法基于DAG;
3)容错机制Linage,精华部分就是DAG和Lingae

15、简单说一下hadoop和spark的shuffle相同和差异?

1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。
2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sortbased,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hashbased,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。
如果我们将 map划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job逻辑或者物理执行图中加入 shuffle write 和 shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

16、Spark为什么要持久化,一般什么场景下要进行persist操作?

为什么要进行持久化?
spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。 以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化
2)计算链条非常长,重新恢复要算很多步骤,很好使,persist
3)checkpoint所在的rdd要持久化persist。checkpoint前,要持久化,写个rdd.cache或者rdd.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint之前一定会进行persist。
4)shuffle之后要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。

17、介绍一下join操作优化经验?

join其实常见的就分为两类: map-side join 和 reduce-side join。当大表和小表join时,用map-side join能显著提高效率。将多份数据进行关联数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。
备注:这个题目面试中非常非常大概率见到,务必搜索相关资料掌握这里抛砖引玉。

18、Spark使用parquet文件存储格式能带来哪些好处?

1)如果说HDFS是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准
2)速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况下,使用parquet很多时候可以成功运行。
3)parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作(例如会导致lost tasklost executor)但是此时如果使用parquet就可以正常的完成。
4)极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理数据的时候的数据输入内容,尤其是在spark1.6x有个下推过滤器在一些情况下可以极大的减少磁盘的IO和内存的占用,(下推过滤器)。
5)spark 1.6x parquet方式极大的提升了扫描吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu消耗。
6)采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径

19、介绍parition和block有什么关联关系?

1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应内容
2)Spark中的partion是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间最小单元,同一份数据(RDD)的partion大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;
3)block位于存储空间、partion位于计算空间block的大小是固定的、partion大小是不固定的,是从两个不同的角度去看数据。

20、不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?

不一定,当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。

21、Sort-based shuffle的缺陷?

1)如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序。

22、Spark有哪两种算子

Transformation(转化)算子和Action(执行)算子。

23、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

在我们的开发过程中,能避免则尽可能避免使用reduceByKey、joindistinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。 这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

24、Spark并行度怎么设置比较合适?

spark并行度,每个core承载2~4个partition,如,32个core,那么64~128之间的并行度,也就是设置64~128个partion,并行读和数据规模无关, 只和内存使用量和cpu使用时间有关。

原文地址:https://blog.csdn.net/Flychuer/article/details/134654340

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_19844.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注