mercredi 27 mai 2015

Less records retrieved from Kinesis by AWS SDK for Java

I am using Java to receive records from Kinesis and then i will have to put these records in DynamoDB. The problem is that I don't receive all the record in the same istance of java, for example I sent 72000 records and I received only 71863 records in the same execution, if I restart my application I receive other records but more than how I lost. This is my class and I use totalRecord variable to count the records. My Kinesis stream has 3 shard

import java.net.InetAddress;
import java.util.UUID;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;




public final class AmazonKinesisApplicationSample { 
        public static final String KINESIS_APPLICATION_STREAM_NAME = Configurations.getInstance().getProperty("kinesis_application_stream_name");
        private static final String KINESIS_APPLICATION_NAME = Configurations.getInstance().getProperty("kinesis_application_name");;

        // Initial position in the stream when the application starts up for the first time.
        // Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
        private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
               //InitialPositionInStream.LATEST;
               InitialPositionInStream.TRIM_HORIZON;


        private static AWSCredentialsProvider credentialsProvider;

        private static void init() {
            // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
            java.security.Security.setProperty("networkaddress.cache.ttl", "60"); 
            credentialsProvider = new ProfileCredentialsProvider(Configurations.getInstance().getProperty("KinesisProfileCredentials"));
            try {

                credentialsProvider.getCredentials();
            } catch (Exception e) {
                throw new AmazonClientException("Cannot load the credentials from the credential, e);
            }


            try {
                AmazonDynamoDBSample.init();
             } catch (AmazonServiceException ase) {
                    System.out.println("Caught an AmazonServiceException, which means your request made it "
                            + "to AWS, but was rejected with an error response for some reason.");
                    System.out.println("Error Message:    " + ase.getMessage());
                    System.out.println("HTTP Status Code: " + ase.getStatusCode());
                    System.out.println("AWS Error Code:   " + ase.getErrorCode());
                    System.out.println("Error Type:       " + ase.getErrorType());
                    System.out.println("Request ID:       " + ase.getRequestId());
                } catch (AmazonClientException ace) {
                    System.out.println("Caught an AmazonClientException, which means the client encountered "
                            + "a serious internal problem while trying to communicate with AWS, "
                            + "such as not being able to access the network.");
                    System.out.println("Error Message: " + ace.getMessage());
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

        }



        public static void main(String[] args) throws Exception {
            init();

            if (args.length == 1 && "delete-resources".equals(args[0])) {
                deleteResources();
                return;
            }

            String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
            KinesisClientLibConfiguration kinesisClientLibConfiguration =
                    new KinesisClientLibConfiguration(KINESIS_APPLICATION_NAME,
                            KINESIS_APPLICATION_STREAM_NAME,
                            credentialsProvider,
                            workerId);

            kinesisClientLibConfiguration.withRegionName("eu-central-1");
            kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);

            IRecordProcessorFactory recordProcessorFactory = new AmazonKinesisApplicationRecordProcessorFactory();
            Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);

            System.out.printf("Running %s to process stream %s as worker %s...\n",
                    KINESIS_APPLICATION_NAME,
                    KINESIS_APPLICATION_STREAM_NAME,
                    workerId);

            int exitCode = 0;
            try {
                worker.run();

            } catch (Throwable t) {
                System.err.println("Caught throwable while processing data.");
                t.printStackTrace();
                exitCode = 1;
            }
            System.exit(exitCode);
        }

        public static void deleteResources() {
            AWSCredentials credentials = credentialsProvider.getCredentials();

            // Delete the stream
            AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
            System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
                    KINESIS_APPLICATION_STREAM_NAME);
            try {
                kinesis.deleteStream(KINESIS_APPLICATION_STREAM_NAME);
            } catch (ResourceNotFoundException ex) {
                // The stream doesn't exist.
            }

            // Delete the table
            AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
            System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
                    KINESIS_APPLICATION_NAME);
            try {
                dynamoDB.deleteTable(KINESIS_APPLICATION_NAME);
            } catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
                // The table doesn't exist.
            }
        }
    }

Other class

    public class AmazonKinesisApplicationSampleRecordProcessor implements IRecordProcessor {
    private static long totalRecord=0;
    private static final Log LOG = LogFactory.getLog(AmazonKinesisApplicationSampleRecordProcessor.class);
    private String kinesisShardId;

    // Backoff and retry settings
    private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
    private static final int NUM_RETRIES = 10;

    // Checkpoint about once a minute
    private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    private long nextCheckpointTimeInMillis;

    private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();

    /**
     * {@inheritDoc}
     */
    @Override
    public void initialize(String shardId) {

        System.out.println("*Initializing record processor for shard: " + shardId);

        LOG.info("Initializing record processor for shard: " + shardId);
        this.kinesisShardId = shardId;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
//        LOG.info("Processing " + records.size() + " records from " + kinesisShardId);
//        System.out.println("*Processing " + records.size() + " records from " + kinesisShardId);

        totalRecord+=records.size();
        System.out.println("TOTALE RECORD: "+ totalRecord);
        // Process records and perform all exception handling.
//        processRecordsWithRetries(records);               
        // Checkpoint once every checkpoint interval.
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }
}

Someone has had my same problem?One possible solution? thanks




Aucun commentaire:

Enregistrer un commentaire