Raspberry Pi から送信したセンサーの測定値などはサーバの Kafka ブローカに一時的に格納されています。ここでは Kafka ブローカに送られたテキストデータをクライアントで表示する consumer.py
の実行手順を示します。
flowchart LR
subgraph R[Raspberry Pi]
C(SINETStream)
end
subgraph S[Server]
B[Kafka Broker]
end
subgraph V["consumer.py"]
SUB(SINETStream)
end
C-.->B==>SUB
consumer.py
を実行するための前提条件を以下に示します。
- Python
- 3.8 以降
テキストデータの取得先となる Kafka ブローカが利用可能な状態になっている必要があります。以下に示すいずれかの構成でKafkaブローカを事前に構築してください。
- option/Server/Kafka
- NumericalSensorData/Server/Kafka-Grafana
- NumericalSensorData/Server/Kafka-Zabbix
consumer.py
が利用する Python ライブラリをインストールします。
pip install -U --user sinetstream-kafka sinetstream-mqtt
既にインストールしているライブラリとconflictしてしまいエラーとなる場合は venv や pipenv などの仮想環境の利用を検討してください。また環境によっては
pip
コマンドはpip3
となっていることがあります。必要に応じて読み替えて下さい。
consumer.py
ではSINETStreamライブラリを利用して Kafka ブローカからセンサーデータを取得します。SINETStreamではアクセスするメッセージブローカのアドレス(brokers)、トピック名(topic)、タイプ(type)などのパラメータを設定ファイル.sinetstream_config.yml
に記述しておく必要があります。設定ファイルの記述例を以下に示します。
sensors:
topic: sinetstream.sensor
brokers: kafka.example.org:9092
type: kafka
consistency: AT_LEAST_ONCE
group_id: text-consumer-001
brokers
と topic
の値を実行環境に合せて修正してください。他のパラメータの指定方法など .sinetstream_config.yml
の記述方法の詳細については SINETStream - 設定ファイル を参照してください。
.sinetstream_config.yml
の記述例となるファイルが example_sinetstream_config.yml にあります。テンプレートとして利用してください。
consumer.py
のコマンドライン引数の指定方法を以下に示します。
usage: consumer.py [-s <service name>] [-c <config name>]
-s/--service サービス名
-c/--config コンフィグ名
-s
でサービス名を指定します。SINETStreamの設定ファイル .sinetstream_config.yml
には複数のパラメータセットを記述することができます。それぞれのパラメータセットの識別子をSINETStreamではサービス名と呼んでいます。サービス名は設定ファイルのトップレベルのマップのキーとして記述されています。例えば、このディレクトリにある設定ファイルの記述例 example_sinetstream_config.yml のサービス名はsensors
になります。
ローカルファイルの設定ファイルを使わずに、コンフィグサーバを利用する場合は-c
でコンフィグ名を指定してください。