-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Zenoh publish/subscribe and RPC examples (#4)
* Add Zenoh publish/subscribe and RPC examples * Removed time sleep * Add zenoh default config method
- Loading branch information
1 parent
2668a54
commit ac112fe
Showing
5 changed files
with
274 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
""" | ||
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
See the NOTICE file(s) distributed with this work for additional | ||
information regarding copyright ownership. | ||
This program and the accompanying materials are made available under the | ||
terms of the Apache License Version 2.0 which is available at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
SPDX-License-Identifier: Apache-2.0 | ||
""" | ||
|
||
import json | ||
import logging | ||
from enum import Enum | ||
|
||
import zenoh | ||
from uprotocol.proto.uri_pb2 import UAuthority | ||
from uprotocol.proto.uri_pb2 import UEntity | ||
from uprotocol.proto.uri_pb2 import UResource | ||
from uprotocol.uri.factory.uresource_builder import UResourceBuilder | ||
|
||
# Configure the logging | ||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | ||
|
||
|
||
class ExampleType(Enum): | ||
PUBLISHER = "publisher" | ||
SUBSCRIBER = "subscriber" | ||
RPC_SERVER = "rpc_server" | ||
RPC_CLIENT = "rpc_client" | ||
|
||
|
||
def authority() -> UAuthority: | ||
return UAuthority(name="auth_name", id=bytes([1, 2, 3, 4])) | ||
|
||
|
||
def entity(example_type: ExampleType) -> UEntity: | ||
mapping = {ExampleType.PUBLISHER : ("publisher", 1), ExampleType.SUBSCRIBER: ("subscriber", 2), | ||
ExampleType.RPC_SERVER: ("rpc_server", 3), ExampleType.RPC_CLIENT: ("rpc_client", 4)} | ||
name, id = mapping[example_type] | ||
return UEntity(name=name, id=1, version_major=id) | ||
|
||
|
||
def pub_resource() -> UResource: | ||
return UResource(name="door", instance="front_left", message="Door", id=5678) | ||
|
||
|
||
def rpc_resource() -> UResource: | ||
return UResourceBuilder.for_rpc_request("getTime", 5678) | ||
|
||
|
||
def get_zenoh_config(): | ||
# start your zenoh router and provide router ip and port | ||
zenoh_ip = "192.168.29.79" # zenoh router ip | ||
zenoh_port = 9090 # zenoh router port | ||
conf = zenoh.Config() | ||
if zenoh_ip is not None: | ||
endpoint = [f"tcp/{zenoh_ip}:{zenoh_port}"] | ||
logging.debug(f"EEE: {endpoint}") | ||
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps("client")) | ||
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(endpoint)) | ||
return conf | ||
|
||
|
||
# Initialize Zenoh with default configuration | ||
def get_zenoh_default_config(): | ||
# Create a Zenoh configuration object with default settings | ||
config = zenoh.Config() | ||
|
||
# # Set the mode to Peer (or Router, Client depending on your use case) | ||
# config = "peer" | ||
|
||
return config |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
""" | ||
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
See the NOTICE file(s) distributed with this work for additional | ||
information regarding copyright ownership. | ||
This program and the accompanying materials are made available under the | ||
terms of the Apache License Version 2.0 which is available at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
SPDX-License-Identifier: Apache-2.0 | ||
""" | ||
|
||
import time | ||
|
||
from uprotocol.proto.uattributes_pb2 import UPriority | ||
from uprotocol.proto.umessage_pb2 import UMessage | ||
from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload | ||
from uprotocol.proto.uri_pb2 import UUri | ||
from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder | ||
|
||
from up_client_zenoh.examples import common_uuri | ||
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \ | ||
get_zenoh_default_config | ||
from up_client_zenoh.upclientzenoh import UPClientZenoh | ||
|
||
publisher = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.PUBLISHER)) | ||
|
||
|
||
def publishtoZenoh(): | ||
# create uuri | ||
uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) | ||
cnt = 0 | ||
while True: | ||
data = f"{cnt}" | ||
attributes = UAttributesBuilder.publish(uuri, UPriority.UPRIORITY_CS4).build() | ||
payload = UPayload(value=data.encode('utf-8'), format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) | ||
umessage = UMessage(attributes=attributes, payload=payload) | ||
common_uuri.logging.debug(f"Sending {data} to {uuri}...") | ||
publisher.send(umessage) | ||
time.sleep(3) | ||
cnt += 1 | ||
|
||
|
||
if __name__ == '__main__': | ||
publishtoZenoh() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
""" | ||
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
See the NOTICE file(s) distributed with this work for additional | ||
information regarding copyright ownership. | ||
This program and the accompanying materials are made available under the | ||
terms of the Apache License Version 2.0 which is available at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
SPDX-License-Identifier: Apache-2.0 | ||
""" | ||
|
||
from uprotocol.proto.uattributes_pb2 import CallOptions | ||
from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat | ||
from uprotocol.proto.uri_pb2 import UUri | ||
|
||
from up_client_zenoh.examples import common_uuri | ||
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, get_zenoh_default_config | ||
from up_client_zenoh.upclientzenoh import UPClientZenoh | ||
|
||
rpc_client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_CLIENT)) | ||
|
||
|
||
def send_rpc_request_to_zenoh(): | ||
# create uuri | ||
uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource()) | ||
# create UPayload | ||
data = "GetCurrentTime" | ||
payload = UPayload(length=0, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, value=bytes([ord(c) for c in data])) | ||
# invoke RPC method | ||
common_uuri.logging.debug(f"Send request to {uuri.entity}/{uuri.resource}") | ||
response_future = rpc_client.invoke_method(uuri, payload, CallOptions(ttl=1000)) | ||
# process the result | ||
result = response_future.result() | ||
if result and isinstance(result.payload.value, bytes): | ||
data = list(result.payload.value) | ||
value = ''.join(chr(c) for c in data) | ||
common_uuri.logging.debug(f"Receive rpc response {value}") | ||
else: | ||
common_uuri.logging.debug("Failed to get result from invoke_method.") | ||
|
||
|
||
if __name__ == '__main__': | ||
send_rpc_request_to_zenoh() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
""" | ||
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
See the NOTICE file(s) distributed with this work for additional | ||
information regarding copyright ownership. | ||
This program and the accompanying materials are made available under the | ||
terms of the Apache License Version 2.0 which is available at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
SPDX-License-Identifier: Apache-2.0 | ||
""" | ||
|
||
import time | ||
from datetime import datetime | ||
|
||
from uprotocol.proto.umessage_pb2 import UMessage | ||
from uprotocol.proto.upayload_pb2 import UPayload | ||
from uprotocol.proto.upayload_pb2 import UPayloadFormat | ||
from uprotocol.proto.uri_pb2 import UUri | ||
from uprotocol.proto.ustatus_pb2 import UStatus | ||
from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder | ||
from uprotocol.transport.ulistener import UListener | ||
|
||
from up_client_zenoh.examples import common_uuri | ||
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, \ | ||
get_zenoh_default_config | ||
from up_client_zenoh.upclientzenoh import UPClientZenoh | ||
|
||
rpc_server = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_SERVER)) | ||
|
||
|
||
class RPCRequestListener(UListener): | ||
|
||
def on_receive(self, msg: UMessage) -> UStatus: | ||
attributes = msg.attributes | ||
payload = msg.payload | ||
value = ''.join(chr(c) for c in payload.value) | ||
source = attributes.source | ||
sink = attributes.sink | ||
common_uuri.logging.debug(f"Receive {value} from {source} to {sink}") | ||
response_payload = format(datetime.utcnow()).encode('utf-8') | ||
payload = UPayload(value=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) | ||
attributes = UAttributesBuilder.response(msg.attributes).build() | ||
rpc_server.send(UMessage(attributes=attributes, payload=payload)) | ||
|
||
|
||
if __name__ == '__main__': | ||
uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource()) | ||
|
||
common_uuri.logging.debug("Register the listener...") | ||
rpc_server.register_listener(uuri, RPCRequestListener()) | ||
|
||
while True: | ||
time.sleep(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
""" | ||
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation | ||
See the NOTICE file(s) distributed with this work for additional | ||
information regarding copyright ownership. | ||
This program and the accompanying materials are made available under the | ||
terms of the Apache License Version 2.0 which is available at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
SPDX-License-Identifier: Apache-2.0 | ||
""" | ||
|
||
import time | ||
|
||
from uprotocol.proto.umessage_pb2 import UMessage | ||
from uprotocol.proto.uri_pb2 import UUri | ||
from uprotocol.proto.ustatus_pb2 import UStatus | ||
from uprotocol.transport.ulistener import UListener | ||
|
||
from up_client_zenoh.examples import common_uuri | ||
from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \ | ||
get_zenoh_default_config | ||
from up_client_zenoh.upclientzenoh import UPClientZenoh | ||
|
||
|
||
class MyListener(UListener): | ||
|
||
def on_receive(self, msg: UMessage) -> UStatus: | ||
common_uuri.logging.debug('on receive called') | ||
common_uuri.logging.debug(msg.payload.value) | ||
common_uuri.logging.debug(msg.attributes.__str__()) | ||
return UStatus(message="Received event") | ||
|
||
|
||
client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.SUBSCRIBER)) | ||
|
||
|
||
def subscribeToZenoh(): | ||
# create uuri | ||
uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) | ||
client.register_listener(uuri, MyListener()) | ||
|
||
|
||
if __name__ == '__main__': | ||
subscribeToZenoh() | ||
while True: | ||
time.sleep(1) |