Massive log collection based on OTEL(otlp) standard protocol, real-time abnormal analysis and alarm.
cat << EOF > /tmp/cep-pattern-for-log-alarm.json
[
{
"engine": "FLINK_CEP_GRAPH",
"name": "root",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"times": {
"from": 1,
"to": 3,
"windowTime": {
"unit": "MINUTES",
"size": 5
}
},
"untilCondition": null,
"properties": ["SINGLE"]
},
"condition": null,
"nodes": [{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"times": null,
"untilCondition": null,
"properties": ["SINGLE"]
},
"condition": {
"nestedConditions": [{
"expression": "body.level == 'ERROR'",
"type": "AVIATOR"
}, {
"expression": "body.level == 'FATAL'",
"type": "AVIATOR"
}],
"type": "CLASS",
"className": "org.apache.flink.cep.pattern.conditions.RichOrCondition"
},
"attributes": {
"top": "10px"
},
"type": "ATOMIC"
}, {
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"times": null,
"untilCondition": null,
"properties": ["SINGLE"]
},
"condition": {
"expression": "body.level == 'TRACE' || body.level == 'DEBUG' || body.level == 'INFO' || body.level == 'WARN'",
"type": "AVIATOR"
},
"attributes": {
"top": "20px"
},
"type": "ATOMIC"
}],
"edges": [{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT",
"attributes": {}
}],
"window": {
"type": "PREVIOUS_AND_CURRENT",
"time": {
"unit": "MINUTES",
"size": 5
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
]
EOF
-
Environment
FLINK_PROPERTIES="jobmanager.rpc.address: 127.0.0.1"
- JobManager
# e.g: --fromSavepoint file:///tmp/flinksavepoint
# e.g: -e JVM_ARGS=" -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8000,suspend=n "
docker run \
--name=rengine_job_jm \
--rm \
-e FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
-v $(pwd)/job/src/main/resources/log4j.properties:/opt/flink/conf/log4j.properties \
--network host \
--security-opt=seccomp:unconfined \
wl4g/rengine-job:1.0.0 \
standalone-job \
--job-classname com.wl4g.rengine.job.kafka.RengineKafkaFlinkCepStreaming \
--allowNonRestoredState \
--checkpointDir=file:///tmp/flinksavepoint \
--inProcessingTime=true \
--parallelism=4 \
--brokers=localhost:9092 \
--groupId=rengine_test \
--eventTopic=rengine_applog \
--keyByExprPath=body.service \
--alertTopic=rengine_alert \
--cepPatterns=$(cat /tmp/cep-pattern-for-log-alarm.json | base64 -w 0)
- TaskManager
docker run \
--name=rengine_job_tm \
--rm \
-e FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
-v $(pwd)/job/src/main/resources/log4j.properties:/opt/flink/conf/log4j.properties \
--network host \
--security-opt=seccomp:unconfined \
wl4g/rengine-job:1.0.0 \
taskmanager
docker run \
--name=rengine_job \
--rm \
-e FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
-e JVM_ARGS=" -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8000,suspend=n " \
-v $(pwd)/job/src/main/resources/log4j.properties:/opt/flink/conf/log4j.properties \
--security-opt=seccomp:unconfined \
wl4g/rengine-job:1.0.0 \
flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=rengine-job-1 \
-Dkubernetes.container.image=wl4g/rengine-job:1.0.0 \
local:///opt/flink/usrlib/rengine-job-1.0.0-jar-with-dependencies.jar \
--checkpointDir=file:///tmp/flinksavepoint \
--inProcessingTime=true \
--parallelism=4 \
--groupId=rengine_test \
--eventTopic=rengine_applog \
--keyByExprPath=body.service \
--alertTopic=rengine_alert \
--cepPatterns=$(cat /tmp/cep-pattern-for-log-alarm.json | base64 -w 0)
- print help
export JAVA_HOME=/usr/local/jdk-11.0.10/
export JOB_CLASSPATH="job/target/rengine-job-1.0.0-jar-with-dependencies.jar"
$JAVA_HOME/bin/java -cp $JOB_CLASSPATH com.wl4g.rengine.job.kafka.RengineKafkaFlinkCepStreaming --help
- using flink k8s cli
$JAVA_HOME/bin/java -cp $JOB_CLASSPATH org.apache.flink.client.cli.CliFrontend run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=rengine-base-job-cluster-1 \
-Dkubernetes.container.image=flink:1.14.4-scala_2.11-java11 \
local://job/job-base/target/rengine-job-base-1.0.0-jar-with-dependencies.jar
- Mock for logs producer
IFS=$'\n'
for line in `cat docs/en/user-examples/applog-realtime-analysis-alarm/applog-sample.log`; do
echo "Sending => $line"
echo "rengine_applog:$line" | docker exec -i kafka1 kafka-console-producer.sh \
--broker-list 127.0.0.1:9092 \
--topic rengine_applog \
--property parse.key=true \
--property key.separator=:
done
- Mock for consumer
docker exec -it kafka1 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic rengine_applog