一、概念

1.Spack是什么?
Apache Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

2.PySpark是什么?
pyspark是用spark官方开发的python第三方库,可以使用pip程序快速安装,并像其他第三方库那样使用。PySpark可以作为Python库进行数据处理,提交至Spark集群进行分布式集群计算。

二、准备工作

1.安装PySpark
按win+r键,输入cmd打开命令提示符程序,输入
pip install pystark
或使用国内代理镜像站(清华大学源)
pip install -i https://pypi.tuna/tsinghua.edu.cn/simple pyspark
也可以在Pycharm里直接安装

2.除此之外,还需要安装java,地址:https://www.oracle.com/java/technologies/downloads/
配置环境:

变量名:JAVA_HOME
变量值:C:\Program Files (x86)\Java\jdk-20     // 要根据自己的实际路径配置

变量名:CLASSPATH
变量值:.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;     

变量名:Path
变量值:%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;

重启Pycharm,选择Run—>Edit Configurations,添加JAVAHOME

3.测试是否安装成功

想要使用Pyspark库完成数据处理,首先需要构建一个执行环境入口对象
PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象

#conf=SparkConf()
#conf.setMaster("local[*]")  # setMaster()指定spark的运行模式,local指以单机模式运行在本机上
#conf.setAppName("test_spark_app") #指定名称

#链式调用的原则是调用的方法返回值都是同一个对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)  #sc就是执行环境入口对象

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行(停止运行PyStark程序)
sc.stop()

运行代码,成功显示PySpark的运行版本,证明安装成功

三、PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口

PySpark的编程,主要分为如下三大步骤:
1.数据输入
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
2.数据处理计算
通过RDD类对象的成员方法,完成各种数据计算的需求
3.数据输出
将处理完成后的RDD对像调用各种成员方法完成写出文件,转换为list,tuple,dict等操作

RDD(Resilient Distributed Datasets),全称:弹性分布式数据集
RDD对象:PySpark支持多种数据的输入,在输入完成之后,都会得到一个RDD的对象。
PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD中内
  • 各类数据的计算方式也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象
  • 四、数据输入

    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    
    # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
    rdd1 = sc.parallelize([1, 2, 3, 4, 5])
    rdd2 = sc.parallelize((1, 2, 3, 4, 5))
    rdd3 = sc.parallelize("PySpark")
    rdd4 = sc.parallelize({1, 2, 3, 4, 5})
    rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
    
    #读取文件转RDD对象
    rdd6 = sc.textFile("C:/test.txt")
    
    # 如果要查看RDD中有什么内容,需要用collect()方法
    print(rdd1.collect())  # [1, 2, 3, 4, 5]
    print(rdd2.collect())  # [1, 2, 3, 4, 5]
    print(rdd3.collect())  # ['P', 'y', 'S', 'p', 'a', 'r', 'k']
    print(rdd4.collect())  # [1, 2, 3, 4, 5]
    print(rdd5.collect())  # ['key1', 'key2']
    print(rdd6.collect())  # ['测试1', '', '测试2', '', '测试3']
    sc.stop()
    
    

    五、数据处理

    RDD内置丰富的成员方法(算子)

    map算子
    功能:map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD

    from pyspark import SparkConf, SparkContext
    #Spark不能自动找到python解释器,需要指定
    import os
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    
    # 通过map方法将全部数据都乘10
    def func(data):
        return data * 10
    
    rdd2 = rdd.map(func)
    print(rdd2.collect())    #[10, 20, 30, 40, 50]
    # 链式调用
    # 给rdd中的每一个元素乘10后再加5
    rdd3=rdd.map(lambda x:x*10).map(lambda x:x+5)
    print(rdd3.collect())	 #[15, 25, 35, 45, 55]
    sc.stop()
    
    

    flatmap算子
    功能:对RDD执行map操作,然后进行解除嵌套操作(与map相比之多了一层解嵌套)
    解除嵌套:
    list=[[1,2,3],[4,5,6],[7,8,9]] ——> list=[1,2,3,4,5,6,7,8,9]

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON']='C:/Users/tangling/venv/Scripts/python.exe'
    
    conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc=SparkContext(conf=conf)
    rdd=sc.parallelize(["python project","hello world"])
    
    #需求:将RDD数据中的一个个单词提取出来
    rdd2=rdd.flatMap(lambda x:x.split(" "))
    print(rdd2.collect())  #['python', 'project', 'hello', 'world']
    sc.close()
    

    reduceByKey算子
    功能:针对KV型RDD,自动按照Key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

    func:(V,V)—>V
    接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致

    from pyspark import SparkConf, SparkContext
    import os
    
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('男', 99), ('男', 88), ('男', 79), ('女', 97), ('女', 89)])
    
    # 需求:求男生和女生两个组的成绩之和
    result = rdd.reduceByKey(lambda a, b: a + b)
    print(result.collect())	   #[('男', 266), ('女', 186)]
    
    

    案例:统计指定文件的词频

    from pyspark import SparkConf, SparkContext
    import os
    
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    rdd = sc.textFile('C:/test.txt')
    #取出全部单词
    words=rdd.flatMap(lambda x:x.split(" "))
    #将所有单词都转化成二元元组,单词为key,value设置为1
    word_with_one_rdd=words.map(lambda word:(word,1)) 
    #分组求和
    result=word_with_one_rdd.reduceByKey(lambda a,b:a+b)
    
    print(result.collect())  #[('world', 1), ('i', 3), ('love', 2), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]
    

    Filter算子
    功能:过滤想要的数据进行保留
    func:(T)—>bool 传入一个参数进来类型随意,返回值必须是True或False,返回值是True的被留下来,False的数据被丢弃

    from pyspark import SparkConf, SparkContext
    import os
    
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    #只保留rdd中的偶数
    result = rdd.filter(lambda num: num % 2 == 0)
    print(result.collect())    #[2,4]
    
    

    distinct算子
    功能:对RDD数据进行去重,返回新的RDD,无需传参

    rdd = sc.parallelize([1, 1,2,3,3,4])
    result = rdd.distinct()
    print(result.collect())   #[1,2,3,4]
    

    sortBy算子
    功能:对RDD数据进行排序,基于你指定的排序依据
    语法:rdd.sortBy(func,ascending=False,numPartitions=1)
    func:(T)—>U:告知按照RDD中的哪一个数据进行排序,比如lambda x:x[1]表示按照rdd中的第二列元素进行排序
    ascending=True(升序)False(降序)
    numPartitons:用多少分区排序,全局排序需要设置分区数为1

    对之前案例中的结果进行排序

    from pyspark import SparkConf, SparkContext
    import os
    
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    rdd = sc.textFile('C:/test.txt')
    #取出全部单词
    words=rdd.flatMap(lambda x:x.split(" "))
    #将所有单词都转化成二元元组,单词为key,value设置为1
    word_with_one_rdd=words.map(lambda word:(word,1)) 
    #分组求和
    result_rdd=word_with_one_rdd.reduceByKey(lambda a,b:a+b)
    #对结果进行排序
    final_rdd=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
    
    print(final_rdd.collect()) #[('i', 3), ('love', 2), ('world', 1), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]
    

    综合案例:结合所学知识,完成以下需求:
    需求1:城市销售额排名
    需求2:全部城市有哪些商品类别在售卖
    需求3:北京有哪些商品类型在售卖

    {“id”:1,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“平板电脑”,“areaName”:“北京”,“money”:“1450”}|{“id”:2,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“1450”}|{“id”:3,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“8412”}
    {“id”:4,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“电脑”,“areaName”:“上海”,“money”:“1513”}|{“id”:5,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“1550”}|{“id”:6,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“1550”}
    {“id”:7,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“5611”}|{“id”:8,“timestamp”:“2019-05-08T03:01.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“4410”}|{“id”:9,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家具”,“areaName”:“郑州”,“money”:“1120”}
    {“id”:10,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“家具”,“areaName”:“北京”,“money”:“6661”}|{“id”:11,“timestamp”:“2019-05-08T05:03.00Z”,“category”:“家具”,“areaName”:“杭州”,“money”:“1230”}|{“id”:12,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}
    {“id”:13,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}|{“id”:14,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“1261”}|{“id”:15,“timestamp”:“2019-05-08T03:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“6660”}
    {“id”:16,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“天津”,“money”:“6660”}|{“id”:17,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“9000”}|{“id”:18,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“1230”}
    {“id”:19,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“5551”}|{“id”:20,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“2450”}
    {“id”:21,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“5520”}|{“id”:22,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“6650”}
    {“id”:23,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“1240”}|{“id”:24,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“天津”,“money”:“5600”}
    {“id”:25,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“7801”}|{“id”:26,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“服饰”,“areaName”:“北京”,“money”:“9000”}
    {“id”:27,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“5600”}|{“id”:28,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“8000”}|{“id”:29,“timestamp”:“2019-05-08T02:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“7000”}

    from pyspark import SparkConf, SparkContext
    import json
    import os
    
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    # TODO 需求1:城市销售额排名
    file_rdd = sc.textFile('C:/orders.txt')
    # 取出一个个json字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
    # 将json字符串都转化成字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
    #取出城市和销售额数据
    city=dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))
    #按城市分组按销售额聚合
    city_result_rdd=city.reduceByKey(lambda a,b:a+b)
    #按销售额聚合结果进行排序
    city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
    print(f"需求1的结果{city_result_rdd.collect()}")
    
    #TODO  需求2:全部城市有哪些商品类别在售卖
    category_rdd=dict_rdd.map(lambda x:x['category']).distinct()
    print(f"需求2的结果{category_rdd.collect()}")
    
    #TODO   需求3:北京有哪些商品类型在售卖
    #过滤出北京的数据
    beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
    #取出全部商品类别
    result3=beijing_data_rdd.map(lambda x:x['category']).distinct()
    print(f"需求3的结果{result3.collect()}")
    

    六、数据输出

    数据输出的方法:

    collect算子
    功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象 即 RDD—>LIST

    rdd = sc.parallelize([1, 2, 3, 4, 5])
    # collect算子,输出RDD为list对象
    rdd_list: list = rdd.collect()
    print(rdd_list)  # [1, 2, 3, 4, 5]
    print(type(rdd_list))  #<class 'list'>
    

    reduce算子
    功能:对RDD数据集按照你传入的逻辑进行聚合
    语法:rdd.reduce(func)
    func:(T,T)—>T 2个参数,一个返回值,返回值和参数要求类型一致
    逻辑图:

    rdd = sc.parallelize(range(1,10))
    # 将rdd的数据进行累加求和
    print(rdd.reduce(lambda a,b:a+b))    #45
    

    take算子
    功能:取出RDD的前n个元素,组合成list返回

    rdd=sc.parallelize([3,2,1,3,4,5]).take(5)
    print(rdd)    #[3, 2, 1, 3, 4]
    

    count算子
    功能:计算RDD有多少条数据,返回值是一个数字

    rdd_count=sc.parallelize([3,2,1,3,4,5]).count()
    print(rdd_count)		#6
    

    saveAsTextFile算子 功能:将RDD的数据写入文本文件中,支持本地写出,hdfs等文件系统

    rdd1=sc.parallelize([3,2,1,3,4,5])
    rdd1.saveAsTextFile("D:/output1")
    

    修改rdd分区为1个
    方法一:SparkConf对象设置属性全局并行度为1

    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    conf.set("spark.default.parallelism","1")
    sc = SparkContext(conf=conf)
    

    方法二:在创建RDD的时候,设置parallelize方法传入numSlices参数为1

    rdd1=sc.parallelize([3,2,1,3,4,5],numSlices=1)
    #rdd1=sc.parallelize([1,2,3,4,5],1)
    
    物联沃分享整理
    物联沃-IOTWORD物联网 » PySpark

    发表评论