TL;DR, How can I use the output of one MapReduce step as the input to the next step?
Currently I'm trying to us MapReduce to count sets of 4 words from a sample data set.
Mapper.py:
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into 4-grams (4 word sets)
ngrams = line.split()
# write the results to STDOUT (standard output)
print '%s %s %s %s<s>%s' % (ngrams[0], ngrams[1], ngrams[2], ngrams[3], ngrams[5])
Reducer.py:
#!/usr/bin/env python
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
words = line.split('<s>')
word = words[0]
count = words[1]
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Original code came from this great tutorial!
This code works perfectly for the dataset that I have; however, the output is 38 separate files (15MB each). Thus, I would like to run MapReduce again on the 38 files and further reduce the output into a single file. (Note: I think there is a way to get MR to output a single file, but I'm interested in chaining steps.)
My first attempt was simply to run the same scripts (seen above) on the output files a second time, but I get the following error:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads():
subprocess failed with code 1
The images below show how I have my MapReduce steps set up. I have the output of the first step set as the input for the second step. What am I doing wrong?
Aucun commentaire:
Enregistrer un commentaire