-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkcl_reader.rb
48 lines (39 loc) · 1.32 KB
/
kcl_reader.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
require 'aws/kclrb'
require 'base64'
module Stats
module Input
class KclReader < Aws::KCLrb::RecordProcessorBase
attr_reader :processor
def initialize(processor=$stderr)
@processor = processor
end
def init_processor(shard_id)
end
def process_records(records, checkpointer)
return if records.empty?
events = records.map do |record|
data = Base64.decode64(record['data'])
hash = JSON.load(data)
end
processor.process(events)
checkpoint_helper(checkpointer, records.last["sequenceNumber"])
end
def shutdown(checkpointer, reason)
checkpoint_helper(checkpointer) if 'TERMINATE' == reason
end
private
# Helper method that retries checkpointing once.
# @param checkpointer [Aws::KCLrb::Checkpointer] The checkpointer instance to use.
# @param sequence_number (see Aws::KCLrb::Checkpointer#checkpoint)
def checkpoint_helper(checkpointer, sequence_number=nil)
begin
checkpointer.checkpoint(sequence_number)
rescue Aws::KCLrb::CheckpointError => e
# Here, we simply retry once.
# More sophisticated retry logic is recommended.
checkpointer.checkpoint(sequence_number) if sequence_number
end
end
end
end
end