I am a new bee in Spark and trying to run Spark on Amazon EMR. Here's my code (I've copied from an example and did a little bit modification):
package test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import com.google.common.base.Optional;
public class SparkTest {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
sc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJRU2XC2I3CVG2PZQ");
sc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "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");
JavaRDD<String> customerInputFile = sc.textFile("s3n://aws-logs-494322476419-ap-southeast-1/test/customers_data.txt");
JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] customerSplit = s.split(",");
return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
}
}).distinct();
JavaRDD<String> transactionInputFile = sc.textFile("s3n://aws-logs-494322476419-ap-southeast-1/test/transactions_data.txt");
JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] transactionSplit = s.split(",");
return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
}
});
//Default Join operation (Inner join)
JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
System.out.println("Joins function Output: "+joinsOutput.collect());
//Left Outer join operation
JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
//Right Outer join operation
JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+rightJoinOutput.collect());
sc.close();
}
}
But after made a jar and setup a cluster and run, it always report such error and fail:
2015-07-24 12:22:41,550 INFO [main] client.RMProxy (RMProxy.java:createRMProxy(98)) - Connecting to ResourceManager at ip-10-0-0-61.ap-southeast-1.compute.internal/10.0.0.61:8032
2015-07-24 12:22:42,619 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Requesting a new application from cluster with 2 NodeManagers
2015-07-24 12:22:42,694 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
2015-07-24 12:22:42,698 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Will allocate AM container, with 896 MB memory including 384 MB overhead
2015-07-24 12:22:42,700 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Setting up container launch context for our AM
2015-07-24 12:22:42,707 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Preparing resources for our AM container
2015-07-24 12:22:45,445 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Uploading resource file:/usr/lib/spark/lib/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar -> hdfs://10.0.0.61:8020/user/hadoop/.sparkStaging/application_1437740323036_0001/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
2015-07-24 12:22:47,701 INFO [main] metrics.MetricsSaver (MetricsSaver.java:showConfigRecord(643)) - MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1437740335527
2015-07-24 12:22:47,713 INFO [main] metrics.MetricsSaver (MetricsSaver.java:<init>(284)) - Created MetricsSaver j-1NM41B4W6K3IP:i-525f449f:SparkSubmit:06588 period:60 /mnt/var/em/raw/i-525f449f_20150724_SparkSubmit_06588_raw.bin
2015-07-24 12:22:49,047 INFO [DataStreamer for file /user/hadoop/.sparkStaging/application_1437740323036_0001/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar block BP-1554902524-10.0.0.61-1437740270491:blk_1073741830_1015] metrics.MetricsSaver (MetricsSaver.java:compactRawValues(464)) - 1 aggregated HDFSWriteDelay 183 raw values into 1 aggregated values, total 1
2015-07-24 12:23:03,845 INFO [main] fs.EmrFileSystem (EmrFileSystem.java:initialize(107)) - Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2015-07-24 12:23:06,316 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[E987B96CAE12A2B2], ServiceEndpoint=[http://ift.tt/1JJ5eUp], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=0, ClientExecuteTime=[2266.609], HttpRequestTime=[1805.926], HttpClientReceiveResponseTime=[17.096], RequestSigningTime=[187.361], ResponseProcessingTime=[0.66], HttpClientSendRequestTime=[1.065],
2015-07-24 12:23:06,329 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Uploading resource s3://aws-logs-494322476419-ap-southeast-1/test/spark-test.jar -> hdfs://10.0.0.61:8020/user/hadoop/.sparkStaging/application_1437740323036_0001/spark-test.jar
2015-07-24 12:23:06,568 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[C40A7775223B6772], ServiceEndpoint=[http://ift.tt/1JJ5eUp], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[237.557], HttpRequestTime=[20.943], HttpClientReceiveResponseTime=[13.247], RequestSigningTime=[29.321], ResponseProcessingTime=[186.674], HttpClientSendRequestTime=[1.998],
2015-07-24 12:23:07,265 INFO [main] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1159)) - Opening 's3://aws-logs-494322476419-ap-southeast-1/test/spark-test.jar' for reading
2015-07-24 12:23:07,312 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[FB5C0051C241A9AC], ServiceEndpoint=[http://ift.tt/1JJ5eUp], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[42.753], HttpRequestTime=[31.778], HttpClientReceiveResponseTime=[20.426], RequestSigningTime=[1.266], ResponseProcessingTime=[7.357], HttpClientSendRequestTime=[1.065],
2015-07-24 12:23:07,330 INFO [main] metrics.MetricsSaver (MetricsSaver.java:<init>(915)) - Thread 1 created MetricsLockFreeSaver 1
2015-07-24 12:23:07,875 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Uploading resource file:/tmp/spark-91e17f5e-45f2-466a-b4cf-585174b9fa98/__hadoop_conf__3852777564911495008.zip -> hdfs://10.0.0.61:8020/user/hadoop/.sparkStaging/application_1437740323036_0001/__hadoop_conf__3852777564911495008.zip
2015-07-24 12:23:07,965 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Uploading resource s3://aws-logs-494322476419-ap-southeast-1/test/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://10.0.0.61:8020/user/hadoop/.sparkStaging/application_1437740323036_0001/spark-assembly-1.4.1-hadoop2.6.0.jar
2015-07-24 12:23:07,993 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[25260792F013C91A], ServiceEndpoint=[http://ift.tt/1JJ5eUp], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[23.713], HttpRequestTime=[15.297], HttpClientReceiveResponseTime=[12.147], RequestSigningTime=[6.568], ResponseProcessingTime=[0.312], HttpClientSendRequestTime=[1.033],
2015-07-24 12:23:08,003 INFO [main] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1159)) - Opening 's3://aws-logs-494322476419-ap-southeast-1/test/spark-assembly-1.4.1-hadoop2.6.0.jar' for reading
2015-07-24 12:23:08,064 INFO [main] amazonaws.latency (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[DDF86EA9B896052A], ServiceEndpoint=[http://ift.tt/1JJ5eUp], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[60.109], HttpRequestTime=[55.175], HttpClientReceiveResponseTime=[43.324], RequestSigningTime=[1.067], ResponseProcessingTime=[3.409], HttpClientSendRequestTime=[1.16],
2015-07-24 12:23:09,002 INFO [main] metrics.MetricsSaver (MetricsSaver.java:commitPendingKey(1043)) - 1 MetricsLockFreeSaver 2 comitted 556 matured S3ReadDelay values
2015-07-24 12:23:24,296 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Setting up the launch environment for our AM container
2015-07-24 12:23:24,724 INFO [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing view acls to: hadoop
2015-07-24 12:23:24,727 INFO [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing modify acls to: hadoop
2015-07-24 12:23:24,731 INFO [main] spark.SecurityManager (Logging.scala:logInfo(59)) - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
2015-07-24 12:23:24,912 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Submitting application 1 to ResourceManager
2015-07-24 12:23:25,818 INFO [main] impl.YarnClientImpl (YarnClientImpl.java:submitApplication(252)) - Submitted application application_1437740323036_0001
2015-07-24 12:23:26,872 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:26,893 INFO [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1437740605459
final status: UNDEFINED
tracking URL: http://ift.tt/1JJ5d2N
user: hadoop
2015-07-24 12:23:27,902 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:28,906 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:29,909 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:30,913 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:31,917 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:32,920 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:33,924 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:34,931 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:35,936 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:36,939 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:37,944 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:38,948 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:39,951 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:40,965 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:41,969 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:42,973 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:43,978 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:44,981 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:45,991 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:46,994 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: ACCEPTED)
2015-07-24 12:23:47,999 INFO [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1437740323036_0001 (state: FAILED)
2015-07-24 12:23:48,002 INFO [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: Application application_1437740323036_0001 failed 2 times due to AM Container for appattempt_1437740323036_0001_000002 exited with exitCode: -1000
For more detailed output, check application tracking page:http://ift.tt/1JJ5eUr, click on links to logs of each attempt.
Diagnostics: File does not exist: hdfs://10.0.0.61:8020/user/hadoop/.sparkStaging/application_1437740323036_0001/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
java.io.FileNotFoundException: File does not exist: hdfs://10.0.0.61:8020/user/hadoop/.sparkStaging/application_1437740323036_0001/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1437740605459
final status: FAILED
tracking URL: http://ift.tt/1D4pEdG
user: hadoop
2015-07-24 12:23:48,038 INFO [Thread-0] util.Utils (Logging.scala:logInfo(59)) - Shutdown hook called
2015-07-24 12:23:48,040 INFO [Thread-0] util.Utils (Logging.scala:logInfo(59)) - Deleting directory /tmp/spark-91e17f5e-45f2-466a-b4cf-585174b9fa98
Can anyone find out what the problem is?
Thank you very much.
Aucun commentaire:
Enregistrer un commentaire