I am trying to build a kinesis consumer script using python 3.4 below is an example of my code. I want the records to be saved to a local file that I can later push to S3:
from boto import kinesis
import time
import json
# AWS Connection Credentials
aws_access_key = 'your_key'
aws_access_secret = 'your_secret key'
# Selected Kinesis Stream
stream = 'TwitterTesting'
# Aws Authentication
auth = {"aws_access_key_id": aws_access_key, "aws_secret_access_key": aws_access_secret}
conn = kinesis.connect_to_region('us-east-1',**auth)
# Targeted file to be pushed to S3 bucket
fileName = "KinesisDataTest2.txt"
file = open("C:\\Users\\csanders\\PycharmProjects\\untitled\\KinesisDataTest.txt", "a")
# Describe stream and get shards
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
response = conn.describe_stream(stream)
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
else:
raise TimeoutError('Stream is still not active, aborting...')
# Get Shard Iterator and get records from stream
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
stream_name = response['StreamDescription']['StreamName']
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = conn.get_shard_iterator(stream,
shard_id, 'TRIM_HORIZON')
shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})
tries = 0
result = []
while tries < 100:
tries += 1
response = json.load(conn.get_records(shard_ids, 100))
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
print(result, shard_iterator)
For some reason when I run this script I get the following error each time:
Traceback (most recent call last):
File "C:/Users/csanders/PycharmProjects/untitled/Get_records_Kinesis.py", line 57, in <module>
response = json.load(conn.get_records(shard_ids, 100))
File "C:\Python34\lib\site-packages\boto-2.38.0-py3.4.egg\boto\kinesis\layer1.py", line 327, in get_records
body=json.dumps(params))
File "C:\Python34\lib\site-packages\boto-2.38.0- py3.4.egg\boto\kinesis\layer1.py", line 874, in make_request
body=json_body)
boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request
{'Message': 'Start of list found where not expected', '__type': 'SerializationException'}
My end goal is to eventually kick this data into an S3 bucket. I just need to get these records to return and print first. Any suggestions and advice would be great, I am still new to python and at a complete lost. The data going into the stream is JSON dump twitter data using the "put_record" function. I can post that code too if needed.
Thanks!!
Aucun commentaire:
Enregistrer un commentaire