lundi 30 mars 2015

How to do multistep or chained MapReduce on AWS (Using Python)

TL;DR, How can I use the output of one MapReduce step as the input to the next step?


Currently I'm trying to us MapReduce to count sets of 4 words from a sample data set.


Mapper.py:



#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into 4-grams (4 word sets)
ngrams = line.split()
# write the results to STDOUT (standard output)
print '%s %s %s %s<s>%s' % (ngrams[0], ngrams[1], ngrams[2], ngrams[3], ngrams[5])


Reducer.py:



#!/usr/bin/env python

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
words = line.split('<s>')
word = words[0]
count = words[1]

# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
continue

# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word

# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)


Original code came from this great tutorial!


This code works perfectly for the dataset that I have; however, the output is 38 separate files (15MB each). Thus, I would like to run MapReduce again on the 38 files and further reduce the output into a single file. (Note: I think there is a way to get MR to output a single file, but I'm interested in chaining steps.)


My first attempt was simply to run the same scripts (seen above) on the output files a second time, but I get the following error:



Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads():
subprocess failed with code 1


The images below show how I have my MapReduce steps set up. I have the output of the first step set as the input for the second step. What am I doing wrong?


Streaming Step 1 Streaming Step 2





Aucun commentaire:

Enregistrer un commentaire