I am using Java to receive records from Kinesis and then i will have to put these records in DynamoDB. The problem is that I don't receive all the record in the same istance of java, for example I sent 72000 records and I received only 71863 records in the same execution, if I restart my application I receive other records but more than how I lost. This is my class and I use totalRecord variable to count the records. My Kinesis stream has 3 shard
import java.net.InetAddress;
import java.util.UUID;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
public final class AmazonKinesisApplicationSample {
public static final String KINESIS_APPLICATION_STREAM_NAME = Configurations.getInstance().getProperty("kinesis_application_stream_name");
private static final String KINESIS_APPLICATION_NAME = Configurations.getInstance().getProperty("kinesis_application_name");;
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
//InitialPositionInStream.LATEST;
InitialPositionInStream.TRIM_HORIZON;
private static AWSCredentialsProvider credentialsProvider;
private static void init() {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
credentialsProvider = new ProfileCredentialsProvider(Configurations.getInstance().getProperty("KinesisProfileCredentials"));
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential, e);
}
try {
AmazonDynamoDBSample.init();
} catch (AmazonServiceException ase) {
System.out.println("Caught an AmazonServiceException, which means your request made it "
+ "to AWS, but was rejected with an error response for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("Caught an AmazonClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with AWS, "
+ "such as not being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
init();
if (args.length == 1 && "delete-resources".equals(args[0])) {
deleteResources();
return;
}
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(KINESIS_APPLICATION_NAME,
KINESIS_APPLICATION_STREAM_NAME,
credentialsProvider,
workerId);
kinesisClientLibConfiguration.withRegionName("eu-central-1");
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
IRecordProcessorFactory recordProcessorFactory = new AmazonKinesisApplicationRecordProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
System.out.printf("Running %s to process stream %s as worker %s...\n",
KINESIS_APPLICATION_NAME,
KINESIS_APPLICATION_STREAM_NAME,
workerId);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
System.err.println("Caught throwable while processing data.");
t.printStackTrace();
exitCode = 1;
}
System.exit(exitCode);
}
public static void deleteResources() {
AWSCredentials credentials = credentialsProvider.getCredentials();
// Delete the stream
AmazonKinesis kinesis = new AmazonKinesisClient(credentials);
System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
KINESIS_APPLICATION_STREAM_NAME);
try {
kinesis.deleteStream(KINESIS_APPLICATION_STREAM_NAME);
} catch (ResourceNotFoundException ex) {
// The stream doesn't exist.
}
// Delete the table
AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(credentialsProvider.getCredentials());
System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
KINESIS_APPLICATION_NAME);
try {
dynamoDB.deleteTable(KINESIS_APPLICATION_NAME);
} catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
// The table doesn't exist.
}
}
}
Other class
public class AmazonKinesisApplicationSampleRecordProcessor implements IRecordProcessor {
private static long totalRecord=0;
private static final Log LOG = LogFactory.getLog(AmazonKinesisApplicationSampleRecordProcessor.class);
private String kinesisShardId;
// Backoff and retry settings
private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
private static final int NUM_RETRIES = 10;
// Checkpoint about once a minute
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
System.out.println("*Initializing record processor for shard: " + shardId);
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
// LOG.info("Processing " + records.size() + " records from " + kinesisShardId);
// System.out.println("*Processing " + records.size() + " records from " + kinesisShardId);
totalRecord+=records.size();
System.out.println("TOTALE RECORD: "+ totalRecord);
// Process records and perform all exception handling.
// processRecordsWithRetries(records);
// Checkpoint once every checkpoint interval.
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
}
Someone has had my same problem?One possible solution? thanks
Aucun commentaire:
Enregistrer un commentaire