使用Python和Hadoop Streaming编写MapReduce


Hadoop 和 MapReduce已经如日中天。Hadoop 不仅可以使用Java进行MapReduce的编写,也通过Hadoop Streaming的方式提供了其他语言编写MR的接口。更重要的是,使用python来编写MR,比使用亲儿子Java编写MR要更简单和方便……所以在一些不非常复杂的任务中使用python来编写MR比起使用Java,是更加划算的。

上图是MR的workflow,在介绍Hadoop Streaming的时候,可以拿来做参照.

Hadoop Streaming

Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力。

Hadoop Streaming比较独特的一点是利用的UNIX标准输入输出stdin和stdout,所以只要能处理stdin和stdout的编程语言都能够使用Hadoop Streaming来进行MR的编写。甚至,wc、awk这些linux自带的能处理标准输入输出的程序,也能被用来编写Hadoop Streaming。

How it works

Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力。

Hadoop Streaming比较独特的一点是利用的UNIX标准输入输出stdin和stdout,所以只要能处理stdin和stdout的编程语言都能够使用Hadoop Streaming来进行MR的编写。甚至,wc、awk这些linux自带的能处理标准输入输出的程序,也能被用来编写Hadoop Streaming。

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /path/to/your/mapper \
    -reducer /path/to/your/reducer

指定了input、output、mapper和reducer四个必选参数。

Workflow

Hadoop Streaming的工作流程大概如下:

  1. hadoop-streaming.jar向Hadoop集群注册一个新的job,传入input path和output path等
  2. 开始mapper时,Hadoop Streaming会将输入文件按行传入stdin
  3. 我们自己编写的mapper需要读取stdin每一行,对其进行处理
  4. mapper处理完毕的中间结果也要写入stdout,在Python中print语句默认即输出到stdout,当然若是不放心,也可以手动指定输出流。对于输出到stdout中的每一行,hadoop将默认以’\t’作为分隔符切割成k-v的格式。
  5. mapper处理结束后,Hadoop 自动进行partition、sort、group,准备进行reduce任务
  6. Hadoop Streaming将中间结果按行传给reducer
  7. 我们编写的reducer需要读取stdin的每一行,对其进行处理
  8. 处理结束之后仍然输出到stdout中
  9. Hadoop 转存到output path中
  10. 结束

局限性

Hadoop Streaming 是一个非常易用的MR工具,然而对于MR的很多细节,我们并不能随心所欲的掌控,比如Hadoop Streaming虽然提供了自定义Partitioner的参数,但是这个参数仅限于提供Java Class—-所以如果需要自定义Partitioner,还不如直接用Java来写MR。

此外,因为增加了标准输入输出流中的流转处理时间,所以在执行速度上,Hadoop Streaming实际上比原生的Hadoop是要慢一点的。

所以Hadoop Streaming 适合进行数据统计等简单的纯粹的MR job。

使用Python编写MapReduce

使用Python来编写MapReduce,就需要编写一个mapper,一个reducer,把他们放入相应参数中。

我们以word count为例编写Hadoop Streaming的MapReduce

Mapper

import sys

def map():
    for line in sys.stdin:
        words = line.strip().split()
        for word in words:
            value = 1
            print "%s\t%d" % (word, value)

if __name__ == '__main__':
    map()
  • python的stdin位于sys.stdin
  • 从stdin中读入一行,然后进行相应处理
  • 写入stdout中,key和value以‘\t’隔开

Reducer

在Hadoop 内部,经过了mapper、partitioner和sorter之后,传给reducer的格式和用Java写不同,传统方式到达reduce这一步骤的时候会自动将同样的key合并,value是一个iterator,但是在Hadoop Streaming不会这样,到达reducer的格式仍然是key-value的格式。

因此需要我们自己进行相同key的合并操作。在python的itertools有groupby,提供了类似的功能.

groupby功能的说明,见 https://docs.python.org/2.7/library/itertools.html#itertools.groupby

import sys
from itertools import groupby

def fromstdin():
    for line in sys.stdin:
        word, count = line.rstrip('\n').split('\t')
        yield (word, count)

def reduce():
    for word, group in groupby(self.fromstdin(), key=lambda x: x[0]):
        count = sum([1 for i in group])
        print '%s\t%s' % (word, str(count))


if __name__ == '__main__':
    reduce()

调试Mapper和Reducer

Hadoop Streaming 选择stdin和stdout作为输入输出流给我们的调试带来了极大的方便,我们只需要让自己的测试输入和输出在标准输出流里面就可以让mapper和reducer跑起来,而不依赖Hadoop 和 HDFS。

我们利用管道命令符号|来组成pipeline,在bash中就能够完成调试。命令如下:

$ cat input.txt | ./mapper.py | sort | ./reducer.py > output.txt

我们使用cat作为read input到标准输入输出流的工具,然后经过mapper,sorter,reducer最后输出到output.txt文件中。可以通过查看output.txt的内容判断是否正确处理。

注意:在这种方式进行调试不能使用print大法来打出进程信息,因为你的输入输出已经进入管道,不能显示在屏幕上,建议使用logging 模块等方式来打日志

参数

命令行参数

Hadoop Streaming也提供了很多其他参数。

ParameterOptional/Required Description
-input directoryname or filename Required Input location for mapper
-output directoryname Required Output location for reducer
-mapper executable or JavaClassName Required Mapper executable
-reducer executable or JavaClassName Required Reducer executable
-file filename Optional Make the mapper, reducer, or combiner executable available locally on the compute nodes
-inputformat JavaClassName Optional Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default
-outputformat JavaClassName Optional Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default
-partitioner JavaClassName Optional Class that determines which reduce a key is sent to
-combiner streamingCommand or JavaClassName Optional Combiner executable for map output
-cmdenv name=value Optional Pass environment variable to streaming commands
-inputreader Optional For backwards-compatibility: specifies a record reader class (instead of an input format class)
-verbose Optional Verbose output
-lazyOutput Optional Create output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write)
-numReduceTasks Optional Specify the number of reducers
-mapdebug Optional Script to call when map task fails
-reducedebug Optional Script to call when reduce task fails

通用参数

值得注意的是,Hadoop Streaming 提供了通用参数,使得一些不属于mapper或者reducer的参数可以被指定。

其中使用最多的参数是-D, 这个参数可以指定自定义的kv,极大地提高了自由度。

如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -D map.output.key.field.separa=. \
    -D mapred.text.key.partitioner.options=-k1,2 \
    -D mapred.data.field.separator=. \
    -D map.output.key.value.fields.spec=6,5,1-3:0- \
    -D reduce.output.key.value.fields.spec=0-2:5- \
    -D mapred.reduce.tasks=12 \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
    -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner w
ParameterOptional/Required Description
-conf configuration_file Optional Specify an application configuration file
-D property=value Optional Use value for given property
-fs host:port or local Optional Specify a namenode
-jt host:port or local Optional Specify a job tracker
-files Optional Specify comma-separated files to be copied to the Map/Reduce cluster
-libjars Optional Specify comma-separated jar files to include in the classpath
-archives Optional Specify comma-separated archives to be unarchived on the compute machines
Comments
Write a Comment