vendredi 31 juillet 2015

AWS Kinesis Consumer Python 3.4 Boto

I am trying to build a kinesis consumer script using python 3.4 below is an example of my code. I want the records to be saved to a local file that I can later push to S3:

from boto import kinesis
import time
import json

# AWS Connection Credentials
aws_access_key = 'your_key'
aws_access_secret = 'your_secret key'

# Selected Kinesis Stream
stream = 'TwitterTesting'

# Aws Authentication
auth = {"aws_access_key_id": aws_access_key, "aws_secret_access_key": aws_access_secret}
conn = kinesis.connect_to_region('us-east-1',**auth)

# Targeted file to be pushed to S3 bucket
fileName = "KinesisDataTest2.txt"
file = open("C:\\Users\\csanders\\PycharmProjects\\untitled\\KinesisDataTest.txt", "a")

# Describe stream and get shards
tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    response = conn.describe_stream(stream)
    if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
        break
else:
    raise TimeoutError('Stream is still not active, aborting...')

# Get Shard Iterator and get records from stream
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']
    for shard_id in response['StreamDescription']['Shards']:
        shard_id = shard_id['ShardId']
        shard_iterator = conn.get_shard_iterator(stream,
        shard_id, 'TRIM_HORIZON')
        shard_ids.append({'shard_id': shard_id, 'shard_iterator': shard_iterator['ShardIterator']})
        tries = 0
        result = []
        while tries < 100:
            tries += 1
            response = json.load(conn.get_records(shard_ids, 100))
            shard_iterator = response['NextShardIterator']
            if len(response['Records'])> 0:
                for res in response['Records']:
                    result.append(res['Data'])
                    print(result, shard_iterator)

For some reason when I run this script I get the following error each time:

Traceback (most recent call last):
  File "C:/Users/csanders/PycharmProjects/untitled/Get_records_Kinesis.py",  line 57, in <module>
    response = json.load(conn.get_records(shard_ids, 100))
  File "C:\Python34\lib\site-packages\boto-2.38.0-py3.4.egg\boto\kinesis\layer1.py", line 327, in get_records
    body=json.dumps(params))
  File "C:\Python34\lib\site-packages\boto-2.38.0- py3.4.egg\boto\kinesis\layer1.py", line 874, in make_request
    body=json_body)
boto.exception.JSONResponseError: JSONResponseError: 400 Bad Request
{'Message': 'Start of list found where not expected', '__type':   'SerializationException'}

My end goal is to eventually kick this data into an S3 bucket. I just need to get these records to return and print first. Any suggestions and advice would be great, I am still new to python and at a complete lost. The data going into the stream is JSON dump twitter data using the "put_record" function. I can post that code too if needed.

Thanks!!




Aucun commentaire:

Enregistrer un commentaire