forked from awslabs/amazon-timestream-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request awslabs#180 from awslabs/kafka_ingestor
ingesting data to kafka topic
- Loading branch information
Showing
6 changed files
with
2,485 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
195 changes: 195 additions & 0 deletions
195
tools/java/kafka_ingestor/Purchase_History_Publishing.jmx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.6"> | ||
<hashTree> | ||
<TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan"> | ||
<boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp> | ||
<elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables"> | ||
<collectionProp name="Arguments.arguments"/> | ||
</elementProp> | ||
</TestPlan> | ||
<hashTree> | ||
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Puchase History Thread Group"> | ||
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller"> | ||
<intProp name="LoopController.loops">-1</intProp> | ||
</elementProp> | ||
<stringProp name="ThreadGroup.num_threads">1</stringProp> | ||
<stringProp name="ThreadGroup.ramp_time">1</stringProp> | ||
<boolProp name="ThreadGroup.scheduler">true</boolProp> | ||
<stringProp name="ThreadGroup.duration">300</stringProp> | ||
<stringProp name="ThreadGroup.delay"></stringProp> | ||
</ThreadGroup> | ||
<hashTree> | ||
<LoopController guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller"> | ||
<boolProp name="LoopController.continue_forever">true</boolProp> | ||
<stringProp name="LoopController.loops">1</stringProp> | ||
</LoopController> | ||
<hashTree> | ||
<JSR223Sampler guiclass="TestBeanGUI" testclass="JSR223Sampler" testname="Purchase History Ingestor"> | ||
<stringProp name="cacheKey">false</stringProp> | ||
<stringProp name="filename"></stringProp> | ||
<stringProp name="parameters">${channel} ${ip_address} ${session_id} ${user_id} ${event} ${user_group} ${current_time} ${query} ${product_id} ${product} ${quantityrequired}</stringProp> | ||
<stringProp name="script">import org.apache.kafka.clients.producer.KafkaProducer | ||
import org.apache.kafka.clients.producer.ProducerRecord | ||
import groovy.json.JsonOutput | ||
|
||
try { | ||
//Set connection properties | ||
Properties properties = new Properties() | ||
properties.put('bootstrap.servers', props.get('bootstrapServer')) | ||
properties.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer') | ||
properties.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer') | ||
properties.put('security.protocol', 'SASL_SSL') | ||
properties.put('sasl.mechanism', 'AWS_MSK_IAM') | ||
properties.put('sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;') | ||
properties.put('sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler') | ||
//Establish Connection | ||
KafkaProducer < String, String > producer = new KafkaProducer < String, String > (properties) | ||
int fail = 0 | ||
try { | ||
message = JsonOutput.toJson([current_time: new Date().getTime(), | ||
channel: vars.get('channel'), ip_address: vars.get('ip_address'), | ||
session_id: vars.get('session_id'), | ||
user_id: vars.get('user_id'), | ||
event: vars.get('event'), user_group: vars.get('user_group'), | ||
query: vars.get('query'), product_id: vars.get('product_id'), | ||
product: vars.get('product'), quantityrequired: vars.get('quantityrequired')]) | ||
log.info('Message to be published-->: ' + message) | ||
ProducerRecord < String, String > producerRecord = | ||
new ProducerRecord < String, String >(props.get('topic'), vars.get('user_id'), message) | ||
producer.send(producerRecord) | ||
log.info('Published message') | ||
producerRecord = null | ||
} catch (Exception e) { | ||
log.error('Error Publishing: ' + e) | ||
fail = fail + 1 | ||
} | ||
//Set results as per executions | ||
if (fail > 0) { | ||
SampleResult.setErrorCount(fail) | ||
SampleResult.setSuccessful(false) | ||
} else { | ||
SampleResult.setSuccessful(true) | ||
} | ||
} catch (Exception e) { | ||
log.error('ERROR: ', e) | ||
SampleResult.setSuccessful(false) | ||
} finally { | ||
producer.close() | ||
producer = null | ||
props = null | ||
} | ||
</stringProp> | ||
<stringProp name="scriptLanguage">groovy</stringProp> | ||
</JSR223Sampler> | ||
<hashTree/> | ||
<CSVDataSet guiclass="TestBeanGUI" testclass="CSVDataSet" testname="Purchase History CSV File"> | ||
<stringProp name="delimiter">,</stringProp> | ||
<stringProp name="fileEncoding">UTF-8</stringProp> | ||
<stringProp name="filename">purchase_history.csv</stringProp> | ||
<boolProp name="ignoreFirstLine">true</boolProp> | ||
<boolProp name="quotedData">false</boolProp> | ||
<boolProp name="recycle">true</boolProp> | ||
<stringProp name="shareMode">shareMode.all</stringProp> | ||
<boolProp name="stopThread">false</boolProp> | ||
<stringProp name="variableNames">channel,ip_address,session_id,user_id,event,user_group,current_time,query,product_id,product,quantity</stringProp> | ||
</CSVDataSet> | ||
<hashTree/> | ||
</hashTree> | ||
<ResultCollector guiclass="SummaryReport" testclass="ResultCollector" testname="Summary Report"> | ||
<boolProp name="ResultCollector.error_logging">false</boolProp> | ||
<objProp> | ||
<name>saveConfig</name> | ||
<value class="SampleSaveConfiguration"> | ||
<time>true</time> | ||
<latency>true</latency> | ||
<timestamp>true</timestamp> | ||
<success>true</success> | ||
<label>true</label> | ||
<code>true</code> | ||
<message>true</message> | ||
<threadName>true</threadName> | ||
<dataType>true</dataType> | ||
<encoding>false</encoding> | ||
<assertions>true</assertions> | ||
<subresults>true</subresults> | ||
<responseData>false</responseData> | ||
<samplerData>false</samplerData> | ||
<xml>false</xml> | ||
<fieldNames>true</fieldNames> | ||
<responseHeaders>false</responseHeaders> | ||
<requestHeaders>false</requestHeaders> | ||
<responseDataOnError>false</responseDataOnError> | ||
<saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage> | ||
<assertionsResultsToSave>0</assertionsResultsToSave> | ||
<bytes>true</bytes> | ||
<sentBytes>true</sentBytes> | ||
<url>true</url> | ||
<threadCounts>true</threadCounts> | ||
<idleTime>true</idleTime> | ||
<connectTime>true</connectTime> | ||
</value> | ||
</objProp> | ||
<stringProp name="filename"></stringProp> | ||
</ResultCollector> | ||
<hashTree/> | ||
</hashTree> | ||
<JSR223PreProcessor guiclass="TestBeanGUI" testclass="JSR223PreProcessor" testname="UDV Check"> | ||
<stringProp name="cacheKey">true</stringProp> | ||
<stringProp name="filename"></stringProp> | ||
<stringProp name="parameters">${topic} ${bootstrap_server}</stringProp> | ||
<stringProp name="script">topic = vars.get('topic') | ||
bootstrapServer = vars.get('bootstrap_server') | ||
stop = false | ||
if (topic == null || 'null'.equals(topic)) { | ||
log.error('ERROR: topic is not passed in the arguments') | ||
println('ERROR: topic is not passed in the arguments') | ||
stop = true | ||
} else { | ||
log.info('INFO: topic is set to {}', topic) | ||
props.put('topic', topic) | ||
} | ||
|
||
if (bootstrapServer == null || 'null'.equals(bootstrapServer)) { | ||
println('ERROR: bootstrapServer is not passed in the arguments') | ||
log.error('ERROR: bootstrapServer is not passed in the arguments') | ||
stop = true | ||
} else { | ||
log.info('INFO: bootstrapServer is set to {}', bootstrapServer) | ||
try { | ||
String[] splits = bootstrapServer.split(':') | ||
String uri = splits[0] | ||
String port = Integer.valueOf(splits[1]) | ||
log.info('INFO: URI: [{}], port: [{}]', uri, port) | ||
props.put('bootstrapServer', bootstrapServer) | ||
} catch (Throwable e) { | ||
stop = true | ||
println('ERROR: bootstrapServer ' + bootstrapServer + | ||
' is NOT valid. It must be of format HOSTNAME:PORT') | ||
log.error('ERROR: bootstrapServer {} is NOT valid. It must be of format HOSTNAME:PORT', bootstrapServer) | ||
} | ||
} | ||
if (stop == true) { | ||
prev.setStopTestNow(true) | ||
exit(1) | ||
}</stringProp> | ||
<stringProp name="scriptLanguage">groovy</stringProp> | ||
</JSR223PreProcessor> | ||
<hashTree/> | ||
<Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="UDV"> | ||
<collectionProp name="Arguments.arguments"> | ||
<elementProp name="topic" elementType="Argument"> | ||
<stringProp name="Argument.name">topic</stringProp> | ||
<stringProp name="Argument.value">${__P(topic,null)}</stringProp> | ||
<stringProp name="Argument.metadata">=</stringProp> | ||
</elementProp> | ||
<elementProp name="bootstrap_server" elementType="Argument"> | ||
<stringProp name="Argument.name">bootstrap_server</stringProp> | ||
<stringProp name="Argument.value">${__P(bootstrap_server,null)}</stringProp> | ||
<stringProp name="Argument.metadata">=</stringProp> | ||
</elementProp> | ||
</collectionProp> | ||
</Arguments> | ||
<hashTree/> | ||
</hashTree> | ||
</hashTree> | ||
</jmeterTestPlan> |
Oops, something went wrong.