使用Python编写MapReduce程序的最佳实践

目录

WordCount示例

去重(投影)示例

分 区


Hadoop原生态语言为Java,但现实要求用python编写MR程序,这就得借助于Hadoop Streaming,其介绍看上一篇博文。

mapper按行读取输入的数据,并将每行转换为键/值对,作为mapper的输出。默认情况下,第一个制表符之前的行内容作为键key,行的其余部分(不包括制表符)作为值value。如果行中没有制表符,则整行被视为键key,值为 null。

当 reducer 任务运行时,它会将其输入的key/value对转换为行,并将行馈送到进程的 stdin。同时,reducer将每行转换为一个键/值对作为输出。默认情况下,第一个制表符之前的行内容作为键key,行的其余部分(不包括制表符)作为值value

当然,我们也可根据实际情况去做更改。要执行程序,需要用到一下命令

mapred streaming [genericOptions] [streamingOptions]

注意:请务必将通用generic选项放在streaming选项之前,否则命令将失败,具体见上一篇博客。

WordCount示例

大数据入门第一个示例wordcount单词词频统计。主要是实现map和reduce的逻辑即可,在mapper阶段,输入是从标准流中读入的一行行文本,输出是<单词, 1> 的键值对。具体示例map01.py如下:

import sys
for line in sys.stdin:
   line = line.strip()     #去除字符串首尾的空格
   words = line.split()   #将字符串按照指定的分隔符进行分割,返回一个包含分割后子字符串的列表,参数为空时,表示使用空格作为分隔符
   for word in words:
      print('{}\t{}'.format(word,1))    #输出采用了制表符作为键值对的划分

在reducer阶段,输入的是分区排序后的数据,但与Java不同,这里输入的键值对是是mapper输出的每对key/value,而不是<k, <v1,v2,…,vn>>。因此我们编写这部分逻辑代码时需要自己去控制是否遇到了新的键key,稍微麻烦一点。具体如下reduce01.py(代码参考自其他博客)。

import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
   line = line.strip()
   
   word, count = line.split('\t',1)   #读到的是一行字符数据,需要通过制表符(由mapper输出决定)划分key/value
   
   try:
      count = int(count)
   except ValueError:
      continue
      
   if current_word == word:     #与上一次处理的是同一个单词,词频需要+1
      current_count += count
   else:
      if current_word is not None:    #与上一次处理的单词不相同,说明遇到了新的单词,且上次的单词词频已经统计完毕,需要将上次的单词输出。
         print('{}\t{}'.format(current_word, current_count))
      current_count = count
      current_word = word
      
if current_word == word:   #输出最后一个单词及词频
   print('{}\t{}'.format(current_word, current_count))

下面需要提交程序,为了方便,建议先建立一个脚本 run.sh,赋予可执行权限。

chmod  +x  run.sh

编辑run.sh输入下面命令:

hadoop jar /home/peng/bigdata/hadoop-3.1.3/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar  \
-file /home/peng/Desktop/map01.py  \
-mapper "python3 map01.py"  \
-file /home/peng/Desktop/reduce01.py  \
-reducer "python3 reduce01.py"  \
-input /test.txt  \
-output /output01

注意修改相应文件的路径,input的文件先上传到hdfs中,output输出路径不能存在。如有存在需要先删除 hadoop fs -rm -r /output01

执行文件  :

./run.sh

查看结果,hadoop fs -cat /output01/part-00000

去重(投影)示例

数据如下

1,tom,18,60
2,lily,5,50
3,maly,18,78
4,kafi,18,55
5,jojo,9,90
6,kawe,20,89

去重操作在数据处理中经常用到,现在想输出第3列age数据,重复的只保留一个。

Map的逻辑代码如下:

import sys

if __name__=="__main__":
    for line in sys.stdin:
        student = line.strip().split(",")
        if len(student) == 4:
              print("{}\t{}".format(student[2],1))

Reduce的逻辑代码如下:

import sys

age = None
if __name__=="__main__":
    for line in sys.stdin:
        k ,v = line.split("\t")
        tmp = int(k)
        if age is None:
            age = tmp
            print(age)
            continue
        if tmp != age:
            age = tmp
            print(age)

最终要求输出是降序排序,需要根据前一篇博客添加一些配置即可。如:

mapred streaming  \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.job.reduces=1 \
-D mapreduce.partition.keycomparator.options=-k1,1nr \
-file /home/peng/Desktop/projectMR/mr.py  \
-mapper "python3 mr.py"  \
-file /home/peng/Desktop/projectMR/re.py  \
-reducer "python3 re.py"  \
-input /RelationB.txt  \
-output /output04 \

-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \   与-D mapreduce.partition.keycomparator.options=-k1,1nr 的配合使用。 -r可以使得我们的输出是按key倒序排序的,k1,1说明排序的是key中的第一个字段(现实中可能会有多个字段组成一个key的情况,那么也要指定分隔符,具体看前一篇博客)。如果没有 -n 则默认是文本排序,需要根据数字排序,那么就添加 -n。

分 区

分区可以决定当前的键值对(行)最终落到哪个reducer处理。当key是多个字段的组合,且我们需要特定的字段作为分区字段时,需要进行特别设置。

有如下数据:

11.12.12.14
11.12.12.15
11.13.13.12
11.14.14.15
11.13.13.16
11.15.15.16
12.15.15.16

Map仅作简单的输出,逻辑如下:

import sys
if __name__=="__main__":
    for line in sys.stdin:
        words = line.strip().split(".")
        if len(words) == 4 :
           print(line)

Reduce仅作简单的输出,逻辑如下:

if __name__ == "__main__":
    for line in sys.stdin:
        print(line)

Hadoop streaming 执行文件如下:

mapred streaming  \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=2 \
-D map.output.key.field.separator=.  \
-D mapreduce.partition.keypartitioner.options=-k1,1  \
-D mapreduce.job.reduces=2 \
-file /home/peng/Desktop/mr.py  \
-mapper "python3 mr.py"  \
-file /home/peng/Desktop/re.py  \
-reducer "python3 re.py"   \
-input /testnumber.txt  \
-output /output02 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

其中 

  1.  

            -D stream.map.output.field.separator=. \

            -D stream.num.map.output.key.fields=2 \

    设置了mapper输出数据的分隔符为” . ” , key为第2个分隔符之前的内容

  2.  

             -D map.output.key.field.separator=.  \

             -D mapreduce.partition.keypartitioner.options=-k1,1  \

    设置了mapper输出的key中字段分隔符为” . ” , 默认会将整个key作为分区的依据,相同的key对应的键值对(行)会发送给同一个reducer处理,现在需要根据key中的第1个字段去分区,需要设置-k1,1。 如果需要设置前两个字段则为 -k1,2 ,如果需要设置第二个字段则为 -k2,2  。

  3. 允许Map/Reduce 框架根据某些键字段(而不是整个键)对map输出进行分区则还需要添加  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
  4. 如果想数据落到多个reducer,则需设置reduce数量,如下设为2

          -D mapreduce.job.reduces=2

查看结果:hadoop fs -cat /output02/part-00000

 hadoop fs -cat /output02/part-00001

显然,hadoop streaming使得其他不擅长Java编程的人员也可自由使用hadoop,总而言之,非常方便。

其实也可借助linux的管道技术,先进行本地的调试。

至此完成了hadoop streaming的最基础的学习。 

物联沃分享整理
物联沃-IOTWORD物联网 » 使用Python编写MapReduce程序的最佳实践

发表评论