Skip to content

Commit

Permalink
Adding deaggregator support for Boto3 Kinesis client
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Yang committed Jan 28, 2021
1 parent d21496b commit 4d0ff0f
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions python/aws_kinesis_agg/deaggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,26 @@ def _convert_from_kf_format(record):
return new_record


def _convert_from_boto3_format(record):
"""Convert From Boto3 Kinesis client record format to Kinesis Stream record format.
record - Raw Boto3 Kinesis client record to deaggregate. (dict)
return value - Each yield returns a single Kinesis user record. (dict)"""

new_record = {
'kinesis': {
'kinesisSchemaVersion': '1.0',
'sequenceNumber': record['SequenceNumber'],
'partitionKey': record['PartitionKey'],
'approximateArrivalTimestamp': record['ApproximateArrivalTimestamp'],
'data': record['Data']
}
}

return new_record


def deaggregate_records(records):
"""Given a set of Kinesis records, deaggregate any records that were packed using the
Kinesis Producer Library into individual records. This method will be a no-op for any
Expand All @@ -166,7 +186,7 @@ def deaggregate_records(records):
return return_records


def iter_deaggregate_records(records):
def iter_deaggregate_records(records, data_format=None):
"""Generator function - Given a set of Kinesis records, deaggregate them one at a time
using the Kinesis aggregated message format. This method will not affect any
records that are not aggregated (but will still return them).
Expand All @@ -183,7 +203,7 @@ def iter_deaggregate_records(records):
is_aggregated = True
sub_seq_num = 0

if 'kinesis' not in r and 'data' in r:
if 'kinesis' not in r and 'data' in r or 'Data' in r:
# Kinesis Analytics preprocessors & Firehose transformers use a different format for aggregated
# Kinesis Stream records, so we're going to convert KA / KF style records to KS style records.
if 'kinesisStreamRecordMetadata' in r:
Expand All @@ -192,11 +212,14 @@ def iter_deaggregate_records(records):
elif 'kinesisRecordMetadata' in r:
# Kinesis Firehose style record
r = _convert_from_kf_format(r)
elif data_format == 'Boto3':
# Boto3 Kinesis client style record
r = _convert_from_boto3_format(r)

# Decode the incoming data
raw_data = r['kinesis']['data']

decoded_data = base64.b64decode(raw_data)
decoded_data = base64.b64decode(raw_data) #if data_format != 'Boto3' else raw_data

# Verify the magic header
data_magic = None
Expand Down

0 comments on commit 4d0ff0f

Please sign in to comment.