mardi 26 mai 2015

Cannot locate MySql Driver using PIG 0.12.0 and Hadoop 2.4.0 on Amazon EMR Cluster

I am using SWF to run a workflow which creates an EMR cluster, on which is run a PIG script. I am attempting to run this with PIG 0.12.0 and Hadoop 2.4.0, and at the point where the script is attempting to store to our MySql database in RDS, using org.apache.pig.piggybank.storage.DBStorage, an exception is thrown:

2015-05-26 14:36:47,057 [main] ERROR org.apache.pig.piggybank.storage.DBStorage - 
    can't load DB driver:com.mysql.jdbc.Driver
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:191)
  at org.apache.pig.piggybank.storage.DBStorage.<init>(DBStorage.java:66)

This has been previously working using Pig 0.11.1 and Hadoop 1.0.3. The SWF workflow and activity are written in Java, using the Java AWS SDK version 1.9.19. Searching for information on the wider information is suggesting that the PIG_CLASSPATH needs to be amended to include the MySql connector JAR - currently the script includes

REGISTER $LIB_PATH/mysql-connector-java-5.1.26.jar;

where $LIB_PATH is an S3 location, but there are suggestions that this is no longer sufficient for Pig 0.12.0 + Hadoop 2.4.0

The code which constructs the request which is used to launch the cluster is shown below

public final RunJobFlowRequest constructRequest(final List<String> params) {
    ConductorContext config = ContextHolder.get();

    final JobFlowInstancesConfig instances = new JobFlowInstancesConfig().withInstanceCount(config.getEmrInstanceCount())
            .withMasterInstanceType(config.getEmrMasterType()).withSlaveInstanceType(config.getEmrSlaveType())
            .withKeepJobFlowAliveWhenNoSteps(false).withHadoopVersion(config.getHadoopVersion());

    if (!StringUtils.isBlank(config.getEmrEc2SubnetId())) {
        instances.setEc2SubnetId(config.getEmrEc2SubnetId());
    }

    final BootstrapActionConfig bootStrap = new BootstrapActionConfig().withName("Bootstrap Pig").withScriptBootstrapAction(
            new ScriptBootstrapActionConfig().withPath(config.getEmrBootstrapPath()).withArgs(config.getEmrBootstrapArgs()));

    final StepFactory stepFactory = new StepFactory();
    final List<StepConfig> steps = new LinkedList<>();

    steps.add(new StepConfig().withName("Enable Debugging").withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW)
            .withHadoopJarStep(stepFactory.newEnableDebuggingStep()));

    steps.add(new StepConfig().withName("Install Pig").withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW)
            .withHadoopJarStep(stepFactory.newInstallPigStep(config.getPigVersion())));

    for (final PigScript originalScript : config.getScripts()) {
        ArrayList<String> newParams = new ArrayList<>();

        newParams.addAll(Arrays.asList(originalScript.getScriptParams()));
        newParams.addAll(params);

        final PigScript script = new PigScript(originalScript.getName(), originalScript.getScriptUrl(),
                AWSHelper.burstParameters(newParams.toArray(new String[newParams.size()])));

        steps.add(new StepConfig()
                .withName(script.getName())
                .withActionOnFailure(ActionOnFailure.CONTINUE)
                .withHadoopJarStep(
                        stepFactory.newRunPigScriptStep(script.getScriptUrl(), config.getPigVersion(), script.getScriptParams())));
    }

    final RunJobFlowRequest request = new RunJobFlowRequest().withName(makeRunJobName()).withSteps(steps).withVisibleToAllUsers(true)
            .withBootstrapActions(bootStrap).withLogUri(config.getEmrLogUrl()).withInstances(instances);

    return request;
}




Aucun commentaire:

Enregistrer un commentaire