2.PySpark基础入门(二)

文章目录

  • 一、PySpark库
  • 1.框架与类库
  • 2.什么是PySpark
  • 3. PySpark安装
  • 二、本机开发环境搭建
  • 1.本机PySpark环境配置(Window系统下)
  • 2.Pycharm本地与远程解释器配置
  • 3.应用入口:SparkContext
  • 4.Wordcount代码实战
  • 5.提交Wordcount代码到Linux集群运行
  • 6.总结
  • 三、分布式代码执行分析
  • 1.Spark集群角色回顾(YARN)
  • 2.分布式代码执行分析
  • 3.Python On Spark 执行原理
  • 4.总结

  • 传送门:

  • 视频地址:黑马程序员Spark全套视频教程
  • 1.PySpark基础入门(一)
  • 2.PySpark基础入门(二)
  • 3.PySpark核心编程(一)
  • 4.PySpark核心编程(二)
  • 5.PySaprk——SparkSQL学习(一)
  • 6.PySaprk——SparkSQL学习(二)
  • 7.Spark综合案例——零售业务统计分析
  • 8. Spark3新特性及核心概念(背)

  • 一、PySpark库

    1.框架与类库

  • 类库:—堆别人写好的代码,你可以导入进行使用。PySpark就是一个类库。
  • 框架:可以独立运行,并提供编程结构的—种软件产品。Spark就是一个独立的框架。
  • 2.什么是PySpark

      PySpark是可以在Python代码中:import pyspark来进行调用的。PySpark 是Spark官方提供的一个Python类库,内置了完全的Spark API,可以通过PySpark类库来编写Spark应用程序,并将其提交到Spark集群中运行。
    PySpark类库和标准Spark框架对比:

    3. PySpark安装

      在合适虚拟环境下,执行如下命令即可安装:

    (shayun)C:\Users\admin>pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
    

    二、本机开发环境搭建

    1.本机PySpark环境配置(Window系统下)

    1. 将课程资料中提供的: hadoop-3.3.0 文件, 复制到一个地方, 比如E:\softs\hadoop-3.0.0
    2. 将文件夹内bin内的hadoop.dll复制到: C:\Windows\System32里面去
    3. 配置HADOOP_HOME环境变量指向 hadoop-3.3.0文件夹的路径, 如下图

    配置这些的原因是:
    hadoop设计用于linux运行, 我们写spark的时候,在windows上开发不可避免的会用到部分hadoop功能。为了避免在windows上报错, 我们给windows打补丁。

    2.Pycharm本地与远程解释器配置

    配置本地解释器


    配置远程SSH Linux解释器

    1. 设置远程SSH python pySpark 环境
    2. 添加新的远程连接
    3. 设置虚拟机Python环境路径
    4. 将Windows文件夹与Linux文件夹进行同步

    3.应用入口:SparkContext

      Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:

  • 第一步:创建SparkConf对象
  • 设置Spark Application基本信息,比如应用的名称AppName和应用运行Master
  • 第二步:基于SparkConf对象,创建SparkContext对象
  • # 创建SparkConf对象并且设置应用的名称AppName和应用运行Master
    conf = SparkConf().setAppName(appName).setMaster(master)
    sc = SparkContext(conf=conf)
    

    文档:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html

    4.Wordcount代码实战

    #!usr/bin/env python
    # -*- coding:utf-8 -*-
    """
        需求:wordcount单词计数,读取HDFS上的word.txt文件,对齐内部出现的单词统计出现的数量
    """
    from pyspark import SparkConf, SparkContext
    # 如果在windows上运行,需要配置环境变量PYSPARK_SPARK
    
    if __name__ == '__main__':
        # local[*]:Local模式并使用本地的所有资源
        conf = SparkConf().setMaster('local[*]').setAppName('WordCountHelloWord')
        # 通过SparkConf对象构建SparkContext对象
        sc = SparkContext(conf=conf)
    
        # 读取文件
        file_rdd = sc.textFile('hdfs://node1:8020/test/input/wordcount.txt')
        # file_rdd = sc.textFile('../data/input/words.txt')
        # 将单词进行切割,得到一个存储全部单词的集合对象
        words_rdd = file_rdd.flatMap(lambda line: line.split(" "))
        # 将单词转换为元祖对象,key是单词,value是数字1
        words_with_ones_rdd = words_rdd.map(lambda x: (x, 1))
        # 将元祖的value按照key来分组,对所有value执行聚合相加操作
        result_rdd = words_with_ones_rdd.reduceByKey(lambda a, b: a + b)
        # 通过collect方法手机RDD的数据打印输出结果
        print(result_rdd.collect())
    
    ssh://root@node1:22/root/anaconda3/envs/pyspark_env/bin/python -u /tmp/pycharm_project_189/00_example/Hello_Word.py
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/06/20 23:13:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    [('hadoop', 4), ('world', 1), ('hive', 4), ('hello', 2)]
    

    原理解析

    5.提交Wordcount代码到Linux集群运行

  • Local模式

    [root@node1 00_example]# /export/server/spark/bin/spark-submit --master local[*] /tmp/pycharm_project_189/00_example/Hello_Word.py
    22/06/21 12:06:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    [('hadoop', 4), ('world', 1), ('hive', 4), ('hello', 2)]
    
  • YARN模式

  • client模式
    [root@node1 00_example]# /export/server/spark/bin/spark-submit --master yarn /tmp/pycharm_project_189/00_example/Hello_Word.py
    22/06/21 12:07:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    [('hadoop', 4), ('hive', 4), ('world', 1), ('hello', 2)]
    
  • cluster模式
    [root@node1 00_example]# /export/server/spark/bin/spark-submit --master yarn --deploy-mode cluster /tmp/pycharm_project_189/00_example/Hello_Word.py
    22/06/21 12:14:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    

  • 6.总结

  • Python语言开发Spark程序步骤?
    主要是获取SparkContext对象,基于SparkContext对象作为执行环境入口

  • 如何提交Spark应用?
    将程序代码上传到服务器上, 通过spark-submit客户端工具进行提交

    1.在代码中不要设置master,如果设置以代码为准 spark-submit工具的设置就无效了
    2.提交程序到集群中的时候,读取的文件一定是各个机器都能访问到的地址。比如HDFS

  • 三、分布式代码执行分析

    1.Spark集群角色回顾(YARN)

      当Spark Application运行在集群上时,主要有四个部分组成, 如下示意图:

    1. Master(ResourceManager):集群大管家,整个集群的资源管理和分配
    2. Worker(NodeManager):单个机器的管家,负责在单个服务器上提供运行容器,管理当前机器的资源
    3. Driver:单个Spark任务的管理者,管理Executor的任务执行和任务分解分配,类似YARN的ApplicationMaster;
    4. Executor:具体干活的进程, Spark的工作任务(Task)都由Executor来负责执行

    2.分布式代码执行分析

      Spark Application应用程序运行时,无论client还是cluster部署模式,当Driver Program和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序为例剖析讲解。

    第一、构建SparkContex对象和关闭SparkContext资源,都是在Driver Program中执行,上图中①和③都是,如下图所示:

    第二、上图中②的加载数据【A】、处理数据【B】和输出数据【C】代码,都在Executors上执行,从WEB UI监控页面可以看到此Job(RDD#action触发一个Job)对应DAG图,如下所示:

    所以对于刚刚的WordCount代码,简单分析后得知:

  • SparkContext对象的构建以及 Spark程序的退出,由 Driver 负责执行
  • 具体的数据处理步骤,由Executor在执行
  • 其实简单来说就是:

  • 非数据处理的部分由Driver工作
  • 数据处理的部分(干活)由Executor工作
  • 要知道: Executor不仅仅是一个,视集群规模,Executor的数量可以是很多的。那么在这里一定要有一个概念: 代码中的数据处理部分,是由非常多的服务器(Executor)执行的。这也是分布式代码执行的概念

    3.Python On Spark 执行原理

      Spark是典型的JVM框架,也就是说其运行在Java或者Scala平台上是源生的。Spark为了支持Python做了一定修改。PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。

  • 在Driver端,Python的Driver代码翻译成JVM代码(py4j模块)变成JVM Driver运行。
  • 在Exeutor端,Driver的操作指令发送给JVM Exeutor。JVM Exeutor 通过PySpark守护进程将指令发送给PySpark守护进程。pyspark守护进程,将指令调度到运行的Python进程上。Executor端本质上是Python进程在工作,指令是由JVM Executor发送而来。
  • 总体来说,Python代码 -> py4j -> JVM代码 -> JVM Driver -> RPC -> 调度JVM Executor -> PySpark中转 -> Python Executor进程。Driver翻译过去跑JVM,Executor中转调度跑python。

    4.总结

  • 分布式代码执行的重要特征是什么?
    代码在集群上运行,是被分布式运行的。

    在Spark中,非任务处理部分由Driver执行(非RDD代码);任务处理部分由Executor执行(RDD代码)。Executor的数量可以很多,所以任务的计算是分布式在运行的。

  • 简述PySpark的架构体系
    Driver端由JVM执行, Executor端由JVM做命令转发,底层由Python解释器进行工作。

  • 来源:跟乌龟赛跑

    物联沃分享整理
    物联沃-IOTWORD物联网 » 2.PySpark基础入门(二)

    发表评论