I have a script written with Spark GraphX (Scala 2.10) and other Spark libraries to process PageRank scores for a Wikipedia dump and retrieve the top results. I am able to get the script to run locally by putting it in the examples folder and shoehorning in the dependencies it needs. But I can find no way to compile this as an application so that it will run on Amazon EC2.
I have tried:
- Compiling the fat jar for the whole Spark project, with my script as a new class, and running the class from examples: org.apache.spark.examples.graphx.PageRankGraphX (very 'roundabout' path to successful compilation and ultimate failure on the cluster - SparkException: Application finished with failed status). This is the approach where I've had the most success on a local set-up, but I think my methods were not standard.
- Adding the script to another library in the source-code, e.g. GraphX (this makes it impossible to compile - many library items are not found, such as: java.lang.NoClassDefFoundError: com/google/common/util/concurrent/ThreadFactoryBuilder)
- Building an entirely new project without the Spark source, adding only my script as source code along with the Spark dependencies (for example Spark core, Spark streaming, Spark GraphX etc) in the sbt file - this compiles, but when I try to run the application at run-time it fails as it is missing code for almost every dependency (e.g. ClassDefNotFoundException: Scala/Serializer).
What I am hoping is that somebody has some easy to follow instructions for how to run a standalone jar, written in GraphX, that will process on Amazon EMR. My entire script is posted below - if this was yours, what would you do to make it run and produce an output file on AWS? I don't think any detail is too basic here:
- Is the correct way to do this to build the entire downloaded Spark project with my script as a new class within it? If so, within the Spark source code, which folder does the script belong to?
- Or is the correct way to build it in a brand new project? If so, how to make sure that sbt includes every single dependency that might be needed at run-time, so that the application will run correctly?
- Otherwise, if neither of these is really right, what should I do with my script do successfully run it on Amazon WS EMR? I have run clustered projects in MR before without this kind of trouble.
The script I'm trying to run is below - I can definitely confirm that this works as it should when all its dependencies are compiled properly - however, my compiled version still failed on the actual job in EMR.
package org.apache.spark.graphx
import java.io._
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.lib.PageRank
import org.apache.spark.rdd.RDD
object PageRankGraph extends Serializable {
def hashId(str: String): Long = {
val bytes = MessageDigest.getInstance("MD5").digest(str.getBytes(StandardCharsets.UTF_8))
(bytes(0) & 0xFFL) |
((bytes(1) & 0xFFL) << 8) |
((bytes(2) & 0xFFL) << 16) |
((bytes(3) & 0xFFL) << 24) |
((bytes(4) & 0xFFL) << 32) |
((bytes(5) & 0xFFL) << 40) |
((bytes(6) & 0xFFL) << 48) |
((bytes(7) & 0xFFL) << 56)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("WikipediaGraphXPageRank")
.setMaster(args(5))
.set("spark.executor.memory","1g")
val sc = new SparkContext(sparkConf)
val topics: RDD[String] = (sc.textFile(args(0))
.map(line => line.split("\t")).map(parts => (parts.head)))
val vertices = topics map (topic => hashId(topic) -> topic)
val uniqueHashes = vertices.map(_._1).countByValue()
val uniqueTopics = vertices.map(_._2).countByValue()
uniqueHashes.size == uniqueTopics.size
val linksall = (sc.textFile(args(0))).map(l => l.split("\t"))
val links = for (l <- linksall; l2 <- l(2).split(",")) yield (l(0), l2)
val edges = for (l <- links) yield Edge(hashId(l._1), hashId(l._2), 0)
val graph = Graph(vertices, edges, "").cache()
graph.vertices.count
if (args(4).toInt == 1) graph.partitionBy(PartitionStrategy.RandomVertexCut)
else if (args(4).toInt == 2) graph.partitionBy(PartitionStrategy.EdgePartition2D)
else if (args(4).toInt == 3) graph.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)
else graph.partitionBy(PartitionStrategy.EdgePartition1D)
var prGraph = PageRank.run(graph, args(2).toInt, args(3).toDouble)
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
}
val pw = new PrintWriter(new File(args(1)))
titleAndPrGraph.vertices.top(100) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t =>
if(t._2._2 != "") {
pw.write("\nTitle: " + t._2._2 + " : " + t._2._1 + "\n")
})
pw.close()
}
}
Anyone light you can shed on this will be very helpful. And like I said, I've found the level of detail lacking in almost every tutorial and guide I've read about this, so the more you can afford the better (PS I have tried all the above on a Windows machine).
Aucun commentaire:
Enregistrer un commentaire