forked from SatadruMukherjee/Data-Preprocessing-Models
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMSK project with serverless Producer & Consumer.txt
137 lines (101 loc) · 3.32 KB
/
MSK project with serverless Producer & Consumer.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
Step 1:
--------
Create a NAT Gateway & attach with Private subnet route table
Step 2:
---------
Launch one MSK Cluster in private subnet
Step 3:
----------
Create a Lambda code (Python 3.8)
from time import sleep
from json import dumps
from kafka import KafkaProducer
import json
topic_name='{Provide the topic name here}'
producer = KafkaProducer(bootstrap_servers=['{Put the broker URLs here}'
,'{Put the broker URLs here}'],value_serializer=lambda x: dumps(x).encode('utf-8'))
def lambda_handler(event, context):
print(event)
for i in event['Records']:
sqs_message =json.loads((i['body']))
print(sqs_message)
producer.send(topic_name, value=sqs_message)
producer.flush()
Step 4:
----------
Increase the timeout for Lambda to 2 mins , provide SQS,MSK and VPC access & put in Private VPC (where MSK Brokers are running)
Configure Lambda Layer--
Reference:
------------
https://youtube.com/watch?v=uleTVY7LkMM&feature=shares
Step 5:
---------
Launch one SQS Queue with visibility timeout to 240 sec
Step 6:
----------
Create an API Gateway and setup integration with SQS Queue
Step 7:
---------
Test the integration , if works , then setup integration with AWS Lambda Producer
Step 8:
---------
Create an s3 bucket for data archival
Step 9:
---------
Configure kinesis Firehose
Step 10:
-----------
Configure the Consumer Lambda Code:
import base64
import boto3
import json
client = boto3.client('firehose')
def lambda_handler(event, context):
print(event)
for partition_key in event['records']:
partition_value=event['records'][partition_key]
for record_value in partition_value:
actual_message=json.loads((base64.b64decode(record_value['value'])).decode('utf-8'))
print(actual_message)
newImage = (json.dumps(actual_message)+'\n').encode('utf-8')
print(newImage)
response = client.put_record(
DeliveryStreamName='{Kinesis Delivery Stream Name}',
Record={
'Data': newImage
})
Step 11:
-----------
Provide KinesisFirehose write access , VPC access , MSK access to this Lambda
Step 12:
----------
Launch an EC2 in a public subnet in same VPC as of MSK Cluster in a public subnet.
Launch an EC2 in private subnet in same VPC as of MSK Cluster in a private subnet.
Step 13:
-----------
Add private ec2 security group and msk security group both way all traffic.
Step 14:
-------------
Enter in public subnet , from there enter in private subnet.
sudo yum install java-1.8.0-openjdk
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xvf kafka_2.12-2.8.1.tgz
cd kafka_2.12-2.8.1
bin/kafka-topics.sh --create --topic demo_testing2 --bootstrap-server {} --replication-factor 1 --partitions 2
Step 15:
------------
Start kafka console consumer and check whether from Lambda messages are getting published in kafka topic or not
bin/kafka-console-consumer.sh --topic demo_testing2 --bootstrap-server {}
Step 16:
------------
Add MSK Trigger from Consumer Lambda
Step 17:
---------
Peform end to end testing
{"station":"OH","temp":"26.39f"}
{"station":"WA","temp":"40.00F"}
{"station":"TX","temp":"15.01F"}
{"station":"NC","temp":"32.36f"}
{"station":"WA","temp":"62.86F"}
{"station":"NC","temp":"49.43f"}
{"station":"MD","temp":"2.30f"}