I'm using Python, MRJob, and Amazon EMR to run a job based on the example (http://ift.tt/1LDUrNK) for Common Crawl.
This streams WET files from Amazon S3 and works well for small jobs (when the input is ~50 WET file paths). If I increase the input size to > 50 WET file paths, the job always completes 63/64 tasks, but fails on the last one.
In my code, I surround all logic with a try/except block to silence any errors (as I understand anything going wrong kills the whole job:
try:
# my code
except:
pass
Here is the error output available from Amazon EMR:
+ __mrjob_PWD=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076
+ exec
+ python -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'
+ export PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076/mrcc.py.tar.gz:
+ PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076/mrcc.py.tar.gz:
+ exec
+ cd /mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076
+ python2.7 tag_grabber.py --step-num=0 --mapper --source s3
Traceback (most recent call last):
File "tag_grabber.py", line 45, in <module>
TagGrabber.run()
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 461, in run
mr_job.execute()
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 470, in execute
self.run_mapper(self.options.step_num)
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 535, in run_mapper
for out_key, out_value in mapper(key, value) or ():
File "/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076/http://ift.tt/1HIZ5fv", line 40, in mapper
for i, record in enumerate(f):
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 393, in __iter__
record = self.read_record()
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 364, in read_record
self.finish_reading_current_record()
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 359, in finish_reading_current_record
self.expect(self.current_payload.fileobj, "\r\n")
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 352, in expect
raise IOError(message)
IOError: Expected '\r\n', found '
2015-05-26 08:25:37,365 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-05-26 08:25:37,417 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-05-26 08:25:37,954 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2015-05-26 08:25:37,976 INFO [main] org.apache.hadoop.metrics2.sink.cloudwatch.CloudWatchSink: Initializing the CloudWatchSink for metrics.
2015-05-26 08:25:38,115 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSinkAdapter: Sink file started
2015-05-26 08:25:38,222 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 300 second(s).
2015-05-26 08:25:38,222 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2015-05-26 08:25:38,240 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2015-05-26 08:25:38,240 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1432626749504_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@568c11d5)
2015-05-26 08:25:38,355 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2015-05-26 08:25:38,796 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001,/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001,/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001,/mnt3/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001
2015-05-26 08:25:38,931 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-05-26 08:25:38,947 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-05-26 08:25:39,348 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2015-05-26 08:25:40,088 INFO [main] amazon.emr.metrics.MetricsSaver: MetricsSaver YarnChild root:hdfs:///mnt/var/em/ period:120 instanceId:i-fc835515 jobflow:j-3CNU7SCUOLBJB
2015-05-26 08:25:40,282 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-05-26 08:25:40,285 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-05-26 08:25:40,287 INFO [main] com.amazon.ws.emr.hadoop.fs.guice.EmrFSBaseModule: Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as FileSystem implementation.
2015-05-26 08:25:41,348 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2015-05-26 08:25:42,324 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2015-05-26 08:25:42,325 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2015-05-26 08:25:42,339 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
2015-05-26 08:25:42,587 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: s3://mrjob-91ef8460071eeea6/tmp/tag_grabber.root.20150526.073515.773670/files/process.wet:440+220
2015-05-26 08:25:42,595 INFO [main] com.amazon.ws.emr.hadoop.fs.guice.EmrFSBaseModule: Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as FileSystem implementation.
2015-05-26 08:25:42,665 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2015-05-26 08:25:42,769 INFO [main] com.hadoop.compression.lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
2015-05-26 08:25:42,772 INFO [main] com.hadoop.compression.lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 77cfa96225d62546008ca339b7c2076a3da91578]
2015-05-26 08:25:42,804 INFO [main] com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem: Opening 's3://mrjob-91ef8460071eeea6/tmp/tag_grabber.root.20150526.073515.773670/files/process.wet' for reading
2015-05-26 08:25:43,139 INFO [main] com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem: Stream for key 'tmp/tag_grabber.root.20150526.073515.773670/files/process.wet' seeking to position '440'
2015-05-26 08:25:43,471 INFO [main] org.apache.hadoop.mapred.MapTask: numReduceTasks: 0
2015-05-26 08:25:43,637 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/bin/sh, -ex, setup-wrapper.sh, python2.7, tag_grabber.py, --step-num=0, --mapper, --source, s3]
2015-05-26 08:25:43,680 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2015-05-26 08:25:44,484 INFO [Thread-13] org.apache.hadoop.streaming.PipeMapRed: Records R/W=1/1
2015-05-26 08:35:35,384 INFO [Thread-13] org.apache.hadoop.streaming.PipeMapRed: Records R/W=1/1143
2015-05-26 08:35:35,422 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2015-05-26 08:35:35,424 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
2015-05-26 08:35:36,315 INFO [main] com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore: s3.putObject tagscout tag-1408500800767.23-251-351/part-00002 160997
2015-05-26 08:35:36,319 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
2015-05-26 08:35:36,322 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2015-05-26 08:35:36,322 INFO [main] org.apache.hadoop.mapred.DirectFileOutputCommitter: Nothing to clean up on abort since there are no temporary files written
If I remove the "pass" under except and instead print any errors, I get the same errors above but also this:
+ __mrjob_PWD=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029
+ exec
+ python -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'
+ export PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029/mrcc.py.tar.gz:
+ PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029/mrcc.py.tar.gz:
+ exec
+ cd /mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029
+ python2.7 tag_grabber.py --step-num=0 --mapper --source s3
Traceback (most recent call last):
File "tag_grabber.py", line 56, in <module>
TagGrabber.run()
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 461, in run
mr_job.execute()
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 470, in execute
self.run_mapper(self.options.step_num)
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 535, in run_mapper
for out_key, out_value in mapper(key, value) or ():
File "/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029/http://ift.tt/1HIZ5fv", line 39, in mapper
for i, record in enumerate(f):
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 393, in __iter__
record = self.read_record()
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 373, in read_record
header = self.read_header(fileobj)
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 338, in read_header
line = fileobj.readline()
File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 75, in readline
result = super(GzipStreamFile, self).readline(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 48, in readinto
data = self.read(len(b))
File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 38, in read
raw = self.stream.read(io.DEFAULT_BUFFER_SIZE)
File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 400, in read
data = self.resp.read(size)
File "/usr/lib/python2.7/site-packages/boto/connection.py", line 413, in read
return http_client.HTTPResponse.read(self, amt)
File "/usr/lib64/python2.7/httplib.py", line 567, in read
s = self.fp.read(amt)
File "/usr/lib64/python2.7/socket.py", line 380, in read
data = self._sock.recv(left)
File "/usr/lib64/python2.7/ssl.py", line 246, in recv
return self.read(buflen)
File "/usr/lib64/python2.7/ssl.py", line 165, in read
return self._sslobj.read(len)
socket.error: [Errno 104] Connection reset by peer
It looks to me like the issue is related first being unable to stream the file from S3 which causes another issue (the IOError).
1
Am I correct in thinking that this is an error related to S3?
2
Is S3 unreliable with > 50 input files at a time, particularly WET files (which are ~200GB each)?
Aucun commentaire:
Enregistrer un commentaire