2.PySpark基础入门(二)
文章目录
传送门:
一、PySpark库
1.框架与类库
2.什么是PySpark
PySpark是可以在Python代码中:import pyspark
来进行调用的。PySpark 是Spark官方提供的一个Python类库,内置了完全的Spark API,可以通过PySpark类库来编写Spark应用程序,并将其提交到Spark集群中运行。
PySpark类库和标准Spark框架对比:
3. PySpark安装
在合适虚拟环境下,执行如下命令即可安装:
(shayun)C:\Users\admin>pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
二、本机开发环境搭建
1.本机PySpark环境配置(Window系统下)
- 将课程资料中提供的: hadoop-3.3.0 文件, 复制到一个地方, 比如E:\softs\hadoop-3.0.0
- 将文件夹内bin内的hadoop.dll复制到: C:\Windows\System32里面去
- 配置HADOOP_HOME环境变量指向 hadoop-3.3.0文件夹的路径, 如下图
配置这些的原因是:
hadoop设计用于linux运行, 我们写spark的时候,在windows上开发不可避免的会用到部分hadoop功能。为了避免在windows上报错, 我们给windows打补丁。
2.Pycharm本地与远程解释器配置
配置本地解释器:
配置远程SSH Linux解释器:
- 设置远程SSH python pySpark 环境
- 添加新的远程连接
- 设置虚拟机Python环境路径
- 将Windows文件夹与Linux文件夹进行同步
3.应用入口:SparkContext
Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:
# 创建SparkConf对象并且设置应用的名称AppName和应用运行Master
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
文档:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html
4.Wordcount代码实战
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""
需求:wordcount单词计数,读取HDFS上的word.txt文件,对齐内部出现的单词统计出现的数量
"""
from pyspark import SparkConf, SparkContext
# 如果在windows上运行,需要配置环境变量PYSPARK_SPARK
if __name__ == '__main__':
# local[*]:Local模式并使用本地的所有资源
conf = SparkConf().setMaster('local[*]').setAppName('WordCountHelloWord')
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
# 读取文件
file_rdd = sc.textFile('hdfs://node1:8020/test/input/wordcount.txt')
# file_rdd = sc.textFile('../data/input/words.txt')
# 将单词进行切割,得到一个存储全部单词的集合对象
words_rdd = file_rdd.flatMap(lambda line: line.split(" "))
# 将单词转换为元祖对象,key是单词,value是数字1
words_with_ones_rdd = words_rdd.map(lambda x: (x, 1))
# 将元祖的value按照key来分组,对所有value执行聚合相加操作
result_rdd = words_with_ones_rdd.reduceByKey(lambda a, b: a + b)
# 通过collect方法手机RDD的数据打印输出结果
print(result_rdd.collect())
ssh://root@node1:22/root/anaconda3/envs/pyspark_env/bin/python -u /tmp/pycharm_project_189/00_example/Hello_Word.py
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/20 23:13:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[('hadoop', 4), ('world', 1), ('hive', 4), ('hello', 2)]
原理解析:
5.提交Wordcount代码到Linux集群运行
Local模式
[root@node1 00_example]# /export/server/spark/bin/spark-submit --master local[*] /tmp/pycharm_project_189/00_example/Hello_Word.py
22/06/21 12:06:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[('hadoop', 4), ('world', 1), ('hive', 4), ('hello', 2)]
YARN模式
[root@node1 00_example]# /export/server/spark/bin/spark-submit --master yarn /tmp/pycharm_project_189/00_example/Hello_Word.py
22/06/21 12:07:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[('hadoop', 4), ('hive', 4), ('world', 1), ('hello', 2)]
[root@node1 00_example]# /export/server/spark/bin/spark-submit --master yarn --deploy-mode cluster /tmp/pycharm_project_189/00_example/Hello_Word.py
22/06/21 12:14:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
6.总结
Python语言开发Spark程序步骤?
主要是获取SparkContext对象,基于SparkContext对象作为执行环境入口
如何提交Spark应用?
将程序代码上传到服务器上, 通过spark-submit客户端工具进行提交
1.在代码中不要设置master,如果设置以代码为准 spark-submit工具的设置就无效了
2.提交程序到集群中的时候,读取的文件一定是各个机器都能访问到的地址。比如HDFS
三、分布式代码执行分析
1.Spark集群角色回顾(YARN)
当Spark Application运行在集群上时,主要有四个部分组成, 如下示意图:
- Master(ResourceManager):集群大管家,整个集群的资源管理和分配
- Worker(NodeManager):单个机器的管家,负责在单个服务器上提供运行容器,管理当前机器的资源
- Driver:单个Spark任务的管理者,管理Executor的任务执行和任务分解分配,类似YARN的ApplicationMaster;
- Executor:具体干活的进程, Spark的工作任务(Task)都由Executor来负责执行
2.分布式代码执行分析
Spark Application应用程序运行时,无论client还是cluster部署模式,当Driver Program和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序
为例剖析讲解。
第一、构建SparkContex对象和关闭SparkContext资源,都是在Driver Program中执行,上图中①和③都是,如下图所示:
第二、上图中②的加载数据【A】、处理数据【B】和输出数据【C】代码,都在Executors上执行,从WEB UI监控页面可以看到此Job(RDD#action触发一个Job)对应DAG图,如下所示:
所以对于刚刚的WordCount代码,简单分析后得知:SparkContext对象的构建以及 Spark程序的退出,由 Driver 负责执行 具体的数据处理步骤,由Executor在执行 其实简单来说就是:
非数据处理的部分由Driver工作 数据处理的部分(干活)由Executor工作 要知道: Executor不仅仅是一个,视集群规模,Executor的数量可以是很多的。那么在这里一定要有一个概念: 代码中的数据处理部分,是由非常多的服务器(Executor)执行的。这也是分布式代码执行的概念
3.Python On Spark 执行原理
Spark是典型的JVM框架,也就是说其运行在Java或者Scala平台上是源生的。Spark为了支持Python做了一定修改。PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。
在Driver端,Python的Driver代码翻译成JVM代码(py4j模块)变成JVM Driver运行。 在Exeutor端,Driver的操作指令发送给JVM Exeutor。JVM Exeutor 通过PySpark守护进程将指令发送给PySpark守护进程。pyspark守护进程,将指令调度到运行的Python进程上。Executor端本质上是Python进程在工作,指令是由JVM Executor发送而来。 总体来说,
Python代码 -> py4j -> JVM代码 -> JVM Driver -> RPC -> 调度JVM Executor -> PySpark中转 -> Python Executor进程
。Driver翻译过去跑JVM,Executor中转调度跑python。
4.总结
分布式代码执行的重要特征是什么?
代码在集群上运行,是被分布式运行的。
在Spark中,非任务处理部分由Driver执行(非RDD代码);任务处理部分由Executor执行(RDD代码)。Executor的数量可以很多,所以任务的计算是分布式在运行的。
简述PySpark的架构体系
Driver端由JVM执行, Executor端由JVM做命令转发,底层由Python解释器进行工作。
来源:跟乌龟赛跑