Spark 入门详解 α

什么是Spark

spark是基于内存的用于大规模数据处理(离线计算、实时计算、快速查询)的统一分析引擎。
也是一个生态系统。

Spark的特点

1、速度快
比MapReduce块10-100倍
2、易用(算法多)
MR只支持一种计算 算法,Spark支持多种算法。
3、通用
Spark可以支持离线计算、实时计算、快速查询(交互式)、机器学习、图计算
4、兼容性强
支持大数据中现有的Yarn. Mesos等多种调度平台,可以处理hadoop支持的数据。

Spark发展史

2009 年诞生于加州大学伯克利分校AMP 实验室
2014年成为 Apache 的顶级项目

Spark为什么会流行

原因1:优秀的数据模型和计算抽
支持多种计算模型,而且基于内存(内存比硬盘速度快)
RDD 是一个可以容错且并行的数据结构
原因2:完善的生态圈(Spark生态圈)

Spark Core:实现Spark 基本功能(RDD)
SparK SQL: 操作结构化数据
Spark Streaming : 对实时数据进行流式计算
Spark MLlib : 机器学习(ML)功能
GraphX(图计算) : 用于图计算的API

Hadoop VS Spark

Hadoop(HDFS-MR-YARN) Spark
类型 基础平台, 包含计算, 存储, 调度 分布式计算工具
场景 大规模数据集上的批处理 迭代计算, 交互式计算, 流计算
价格 对机器要求低, 便宜 对内存有要求, 相对较贵
编程范式 Map+Reduce, API 较为底层, 算法适应性差 RDD组成DAG有向无环图, API 较为顶层, 方便使用
数据存储结构 MapReduce中间计算结果存在HDFS磁盘上, 延迟大 RDD中间运算结果存在内存中 , 延迟小
运行方式 Task以进程方式维护, 任务启动慢 Task以线程方式维护, 任务启动快

Spark运行模式

1.local本地模式(单机)--开发测试使用
2.standalone独立集群模式--开发测试使用
3.standalone-HA高可用模式--生产环境使用
4.on yarn集群模式--生产环境使用
5.on mesos集群模式--国内使用较少
6.on cloud集群模式--中小公司未来会更多的使用云服务

Spark安装部署

Local模式安装部署

使用CDH5.14.0-Spark2.2版本

第一步:上传解压
第二步:开箱即用(local模式)
进入spark-shell方式
1、Spark-shell
2、Spark-shell --master local[*]
*表示使用当前机器上所有可用的资源
3、Spark-shell --master local[n]
数字N表示在本地模拟N个线程来运行当前任务

本地数据计算
val textFile = sc.textFile("file:////opt/spark01/tt.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
集群上的数据计算
val textFile = sc.textFile("hdfs://node01:8020/tt.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://node01:8020/ttt")

standalone集群模式部署

第一步:上传并解压
第二步:修改配置

#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077

第三步:分发到其他节点
scp -r /export/servers/spark node02:/export/servers
scp -r /export/servers/spark node03:/export/servers

说明:Spark的环境变量可以添加到服务器环境变量内,但是spark和hadoop有部分脚冲突,需要修改冲突的脚本中的一个。
第四步:启动
sbin/start-all.sh
sbin/stop-all.sh

standaloneHA集群模式部署

第一步:上传并解压
第二步:修改配置

#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定zookeeper
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
#指定spark Master的端口
export SPARK_MASTER_PORT=7077

第三步同步到其他节点
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0 node02:$PWD
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0 node03:$PWD

第四步: 先启动ZK ,再启动spark

on yarn集群模式 安装部署

准备工作
Hadoop正常安装、 单机版本的spark安装成功
第一步:上传解压
第二步:修改配置

#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077
#设置hadoop配置路径
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop

第三步:使用spark-submit提交任务(不需要开启spark)

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master S \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10

参数说明

-deploy-mode 任务运行模式
cluster模式:生产环境中使用该模式
1.Driver程序在YARN集群中
2.应用的运行结果不能在客户端显示
3.该模式下Driver运行ApplicattionMaster这个进程中,如果出现问题,yarn会重启ApplicattionMaster(Driver)

●client模式:
1.Driver运行在Client上的Spark Submit进程中
2.应用程序运行结果会在客户端显示

spark-submit命令用来提交jar包给spark集群/YARN

--master spark://node01:7077 指定 Master 的地址
--name "appName" 指定程序运行的名称
--class 程序的main方法所在的类
--jars xx.jar 程序额外使用的 jar 包
--driver-memory 512m Driver运行所需要的内存, 默认1g
--executor-memory 2g 指定每个 executor 可用内存为 2g, 默认1g
--executor-cores 1 指定每一个 executor 可用的核数
--total-executor-cores 2 指定整个集群运行任务使用的 cup 核数为 2 个
--queue default 指定任务的对列
--deploy-mode S 指定运行模式(client/cluster)

编写spark代码的流程

前提:创建一个maven项目
编写代码
1、创建spark conf
2、实例一个sparkcontext
3、读物数据,对数据进行操作(业务逻辑)
4、保存最终的结果

Jar包执行
讲代码到导出成为jar文件,上传到集群,通过spark-submit提交任务

。。。。。。

SparkCore

什么是RDD
弹性分布式数据集(保存在内存中)
弹性的,RDD中的数据可以保存在内存中或者磁盘里面
分布式存储的,可以用于分布式计算
集合,可以存放很多元素
代表一个不可变、可分区、里面的元素可并行计算的集合。
rdd1 rdd2 rdd3 不能改变

RDD的主要属性

1、数据集的基本组成单位,一组分片或多分区
每个分片(每个分区)都会被一个计算任务处理,分片数决定并行度(与kafka相同)
用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值(默认值是2)
2、Spark中RDD的计算是以分区为单位的,计算函数会被作用在每一个分区。
3、一个RDD会依赖于其他多个RDD。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制)

4、对于KV类型的RDD会有一个Partitioner函数, 即RDD的分区函数(可选项)
非key-value的RDD的Parititioner的值是None
Partitioner函数决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。

5、一个列表,存储每个Partition的位置(preferred location)。
计算程序通过列表找到数据

RDD-API

创建RDD
1、由外部存储系统的数据集创建
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
2、通过已有的RDD经过算子转换生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
3、由一个已经存在的Scala集合创建
A: val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
B: val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

RDD的方法/算子分类

RDD的算子分为两类:
1.Transformation转换操作:返回一个新的RDD

2.Action动作操作:返回值不是RDD(无返回值或返回其他的)

如何判断一个方法是Transformation?还是Action?
当经过转换后返回 值是rdd表示此操作是个Transformation,反之就是一个Actions。

如何理解Spark的惰性计算?
RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算
遇到Action动作时,这些转换才会真正运行。没有遇到不执行。

 

 

之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。

详细的Transformation和Action

Transformation

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars]) 对rdd进行管道操作
coalesce(numPartitions) 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
repartition(numPartitions) 重新给 RDD 分区

Action

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
foreachPartition(func) 在数据集的每一个分区上,运行函数func

Spark RDD分区原则

1.启动的时候指定的CPU核数确定了一个参数值:
spark.default.parallelism=指定的CPU核数(集群模式最小2)
2.对于Scala集合调用parallelize(集合,分区数)方法,
如果没有指定分区数,就使用spark.default.parallelism,
如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
3.对于textFile(文件,分区数) defaultMinPartitions
如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)
如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数
rdd的分区数
对于本地文件:
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
对于HDFS文件:
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2

点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注