-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_json_consumer.py
150 lines (123 loc) · 3.93 KB
/
kafka_json_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import argparse
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
API_KEY = 'HNUA2KUYENIP44PV'
ENDPOINT_SCHEMA_URL = 'https://psrc-35wr2.us-central1.gcp.confluent.cloud'
API_SECRET_KEY = 'TH5n14kG1JAD6b8rmf92Y6wyXPY66De2kzbiZUS0jytRfkxpEM4rWdlGVSsM/nFR'
BOOTSTRAP_SERVER = 'pkc-lzvrd.us-west4.gcp.confluent.cloud:9092'
SECURITY_PROTOCOL = 'SASL_SSL'
SSL_MACHENISM = 'PLAIN'
SCHEMA_REGISTRY_API_KEY = 'PBEUUAHOC2GTPJWT'
SCHEMA_REGISTRY_API_SECRET = 'EuAq+lp9CJYCs2n/TKOdhk9C2bbMl0ZRyE6KfYJ0v2Ng6anqHnLzqAtCjSwMSE+Y'
def sasl_conf():
sasl_conf = {'sasl.mechanism': SSL_MACHENISM,
# Set to SASL_SSL to enable TLS support.
# 'security.protocol': 'SASL_PLAINTEXT'}
'bootstrap.servers':BOOTSTRAP_SERVER,
'security.protocol': SECURITY_PROTOCOL,
'sasl.username': API_KEY,
'sasl.password': API_SECRET_KEY
}
return sasl_conf
def schema_config():
return {'url':ENDPOINT_SCHEMA_URL,
'basic.auth.user.info':f"{SCHEMA_REGISTRY_API_KEY}:{SCHEMA_REGISTRY_API_SECRET}"
}
class Car:
def __init__(self,record:dict):
for k,v in record.items():
setattr(self,k,v)
self.record=record
@staticmethod
def dict_to_car(data:dict,ctx):
return Car(record=data)
def __str__(self):
return f"{self.record}"
def main(topic):
schema_str = """
{
"$id": "http://example.com/myURI.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"description": "Sample schema to help you get started.",
"properties": {
"brand": {
"description": "The type(v) type is used.",
"type": "string"
},
"car_name": {
"description": "The type(v) type is used.",
"type": "string"
},
"engine": {
"description": "The type(v) type is used.",
"type": "number"
},
"fuel_type": {
"description": "The type(v) type is used.",
"type": "string"
},
"km_driven": {
"description": "The type(v) type is used.",
"type": "number"
},
"max_power": {
"description": "The type(v) type is used.",
"type": "number"
},
"mileage": {
"description": "The type(v) type is used.",
"type": "number"
},
"model": {
"description": "The type(v) type is used.",
"type": "string"
},
"seats": {
"description": "The type(v) type is used.",
"type": "number"
},
"seller_type": {
"description": "The type(v) type is used.",
"type": "string"
},
"selling_price": {
"description": "The type(v) type is used.",
"type": "number"
},
"transmission_type": {
"description": "The type(v) type is used.",
"type": "string"
},
"vehicle_age": {
"description": "The type(v) type is used.",
"type": "number"
}
},
"title": "SampleRecord",
"type": "object"
}
"""
json_deserializer = JSONDeserializer(schema_str,
from_dict=Car.dict_to_car)
consumer_conf = sasl_conf()
consumer_conf.update({
'group.id': 'group1',
'auto.offset.reset': "earliest"})
consumer = Consumer(consumer_conf)
consumer.subscribe([topic])
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue
car = json_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
if car is not None:
print("User record {}: car: {}\n"
.format(msg.key(), car))
except KeyboardInterrupt:
break
consumer.close()
main("test_topic")