diff --git a/README.md b/README.md index 2e536cd..b12fc00 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# uprotocol-ulink-zenoh-python -Python uLink implementation for the Zenoh transport +# up-transport-zenoh-python +uProtocol transport implementation for Zenoh in Python diff --git a/requirements.txt b/requirements.txt index df7cf4b..4b7d565 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -up-python==0.1.2-dev -eclipse-zenoh==0.11.0rc3 +up-python==0.2.0-dev +eclipse-zenoh==0.11.0 diff --git a/setup.py b/setup.py index cb8a84d..0205931 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ from setuptools import find_packages, setup -project_name = "up-client-zenoh-python" +project_name = "up-transport-zenoh-python" script_directory = os.path.realpath(os.path.dirname(__file__)) REQUIREMENTS = [i.strip() for i in open(os.path.join("requirements.txt")).readlines()] @@ -32,11 +32,11 @@ name=project_name, author="Neelam Kushwah", author_email="neelam.kushwah@gm.com", - version="0.1.0-dev", + version="0.2.0-dev", python_requires=">=3.8", packages=find_packages(), package_data={ - 'up_client_zenoh': ['**'], + 'up_transport_zenoh': ['**'], }, include_package_data=True, install_requires=REQUIREMENTS, diff --git a/up_client_zenoh/examples/publish.py b/up_client_zenoh/examples/publish.py deleted file mode 100644 index 3af6f2d..0000000 --- a/up_client_zenoh/examples/publish.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -import time - -from uprotocol.proto.uattributes_pb2 import UPriority -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UUri -from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder - -from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, pub_resource -from up_client_zenoh.upclientzenoh import UPClientZenoh - -publisher = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.PUBLISHER)) - - -def publish_to_zenoh(): - # create uuri - uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) - cnt = 0 - while True: - data = f"{cnt}" - attributes = UAttributesBuilder.publish(uuri, UPriority.UPRIORITY_CS4).build() - payload = UPayload(value=data.encode('utf-8'), format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) - umessage = UMessage(attributes=attributes, payload=payload) - common_uuri.logging.debug(f"Sending {data} to {uuri}...") - publisher.send(umessage) - time.sleep(3) - cnt += 1 - - -if __name__ == '__main__': - publish_to_zenoh() diff --git a/up_client_zenoh/examples/rpc_client.py b/up_client_zenoh/examples/rpc_client.py deleted file mode 100644 index e39c3ad..0000000 --- a/up_client_zenoh/examples/rpc_client.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -from uprotocol.proto.uattributes_pb2 import CallOptions -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UUri - -from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, rpc_resource -from up_client_zenoh.upclientzenoh import UPClientZenoh - -rpc_client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_CLIENT)) - - -def send_rpc_request_to_zenoh(): - # create uuri - uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource()) - # create UPayload - data = "GetCurrentTime" - payload = UPayload(length=0, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, value=bytes([ord(c) for c in data])) - # invoke RPC method - common_uuri.logging.debug(f"Send request to {uuri.entity}/{uuri.resource}") - response_future = rpc_client.invoke_method(uuri, payload, CallOptions(ttl=1000)) - # process the result - result = response_future.result() - if result and isinstance(result.payload.value, bytes): - data = list(result.payload.value) - value = ''.join(chr(c) for c in data) - common_uuri.logging.debug(f"Receive rpc response {value}") - else: - common_uuri.logging.debug("Failed to get result from invoke_method.") - - -if __name__ == '__main__': - send_rpc_request_to_zenoh() diff --git a/up_client_zenoh/examples/rpc_server.py b/up_client_zenoh/examples/rpc_server.py deleted file mode 100644 index bba35ee..0000000 --- a/up_client_zenoh/examples/rpc_server.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -import time -from datetime import datetime - -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UUri -from uprotocol.proto.ustatus_pb2 import UStatus -from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder -from uprotocol.transport.ulistener import UListener - -from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, rpc_resource -from up_client_zenoh.upclientzenoh import UPClientZenoh - -rpc_server = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_SERVER)) - - -class RPCRequestListener(UListener): - def on_receive(self, msg: UMessage) -> UStatus: - attributes = msg.attributes - payload = msg.payload - value = ''.join(chr(c) for c in payload.value) - source = attributes.source - sink = attributes.sink - common_uuri.logging.debug(f"Receive {value} from {source} to {sink}") - response_payload = format(datetime.utcnow()).encode('utf-8') - payload = UPayload(value=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) - attributes = UAttributesBuilder.response(msg.attributes).build() - rpc_server.send(UMessage(attributes=attributes, payload=payload)) - - -if __name__ == '__main__': - uuri = UUri(entity=entity(ExampleType.RPC_SERVER), resource=rpc_resource()) - - common_uuri.logging.debug("Register the listener...") - rpc_server.register_listener(uuri, RPCRequestListener()) - - while True: - time.sleep(1) diff --git a/up_client_zenoh/examples/subscribe.py b/up_client_zenoh/examples/subscribe.py deleted file mode 100644 index c4b09ae..0000000 --- a/up_client_zenoh/examples/subscribe.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -import time - -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.uri_pb2 import UUri -from uprotocol.proto.ustatus_pb2 import UStatus -from uprotocol.transport.ulistener import UListener - -from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, pub_resource -from up_client_zenoh.upclientzenoh import UPClientZenoh - - -class MyListener(UListener): - def on_receive(self, msg: UMessage) -> UStatus: - common_uuri.logging.debug('on receive called') - common_uuri.logging.debug(msg.payload.value) - common_uuri.logging.debug(msg.attributes.__str__()) - return UStatus(message="Received event") - - -client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.SUBSCRIBER)) - - -def subscribe_to_zenoh(): - # create uuri - uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) - client.register_listener(uuri, MyListener()) - - -if __name__ == '__main__': - subscribe_to_zenoh() - while True: - time.sleep(1) diff --git a/up_client_zenoh/upclientzenoh.py b/up_client_zenoh/upclientzenoh.py deleted file mode 100644 index 71b7ce8..0000000 --- a/up_client_zenoh/upclientzenoh.py +++ /dev/null @@ -1,469 +0,0 @@ -""" -SPDX-FileCopyrightText: Copyright (c) 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -SPDX-FileType: SOURCE -SPDX-License-Identifier: Apache-2.0 -""" - -import logging -from concurrent.futures import Future -from threading import Lock -from typing import Dict, Tuple - -import zenoh -from uprotocol.proto.uattributes_pb2 import CallOptions, UAttributes, UMessageType, UPriority -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UAuthority, UEntity, UUri -from uprotocol.proto.ustatus_pb2 import UCode, UStatus -from uprotocol.rpc.rpcclient import RpcClient -from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder -from uprotocol.transport.ulistener import UListener -from uprotocol.transport.utransport import UTransport -from uprotocol.transport.validate.uattributesvalidator import Validators -from uprotocol.uri.factory.uresource_builder import UResourceBuilder -from uprotocol.uri.validator.urivalidator import UriValidator -from zenoh import Config, Encoding, Query, Queryable, Sample, Subscriber, Value -from zenoh import open as zenoh_open - -from up_client_zenoh.zenohutils import ZenohUtils - -# Configure the logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') - - -class UPClientZenoh(UTransport, RpcClient): - def __init__(self, config: Config, uauthority: UAuthority, uentity: UEntity): - self.session = zenoh_open(config) - self.subscriber_map: Dict[Tuple[str, UListener], Subscriber] = {} - self.queryable_map: Dict[Tuple[str, UListener], Queryable] = {} - self.query_map: Dict[str, Query] = {} - self.rpc_callback_map: Dict[str, UListener] = {} - self.source_uuri = UUri(authority=uauthority, entity=uentity) - self.rpc_callback_lock = Lock() - self.queryable_lock = Lock() - self.subscriber_lock = Lock() - - def get_response_uuri(self) -> UUri: - new_source = self.source_uuri - new_source.resource.CopyFrom(UResourceBuilder.for_rpc_response()) - return new_source - - def send_publish_notification(self, zenoh_key: str, payload: UPayload, attributes: UAttributes) -> UStatus: - # Get the data from UPayload - - if not payload.value: - msg = "The data in UPayload should be Data::Value" - logging.debug(f"ERROR: {msg}") - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - buf = payload.value - - # Transform UAttributes to user attachment in Zenoh - attachment = ZenohUtils.uattributes_to_attachment(attributes) - if not attachment: - msg = "Unable to transform UAttributes to attachment" - logging.debug(f"ERROR: {msg}") - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - # Map the priority to Zenoh - priority = ZenohUtils.map_zenoh_priority(attributes.priority) - if not priority: - msg = "Unable to map to Zenoh priority" - logging.debug(f"ERROR: {msg}") - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - try: - # Simulate sending data - logging.debug(f"Sending data to Zenoh with key: {zenoh_key}") - logging.debug(f"Data: {buf}") - logging.debug(f"Priority: {priority}") - logging.debug(f"Attachment: {attachment}") - encoding = Encoding.APP_CUSTOM().with_suffix(str(payload.format)) - - self.session.put(keyexpr=zenoh_key, encoding=encoding, value=buf, attachment=attachment, priority=priority) - - msg = "Successfully sent data to Zenoh" - logging.debug(f"SUCCESS:{msg}") - return UStatus(code=UCode.OK, message=msg) - except Exception as e: - msg = f"Unable to send with Zenoh: {e}" - logging.debug(f"ERROR: {msg}") - return UStatus(code=UCode.INTERNAL, message=msg) - - def send_request(self, zenoh_key: str, payload: UPayload, attributes: UAttributes) -> UStatus: - data = payload.value - if not data: - msg = "The data in UPayload should be Data::Value" - logging.debug(f"ERROR: {msg}") - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - # Transform UAttributes to user attachment in Zenoh - attachment = ZenohUtils.uattributes_to_attachment(attributes) - if attachment is None: - msg = "Unable to transform UAttributes to attachment" - logging.debug(msg) - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - # Retrieve the callback - - if attributes.source is None: - msg = "Lack of source address" - logging.debug(msg) - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - resp_callback = self.rpc_callback_map.get(attributes.source.SerializeToString()) - if resp_callback is None: - msg = "Unable to get callback" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - def zenoh_callback(reply: Query.reply) -> None: - if isinstance(reply.sample, Sample): - sample = reply.sample - # Get the encoding of UPayload - encoding = ZenohUtils.to_upayload_format(sample.encoding) - if encoding is None: - msg = "Unable to get the encoding" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - # Get UAttribute from the attachment - attachment = sample.attachment - if attachment is None: - msg = "Unable to get the attachment" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - u_attribute = ZenohUtils.attachment_to_uattributes(attachment) - if u_attribute is None: - msg = "Transform attachment to UAttributes failed" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - # Create UMessage - msg = UMessage( - attributes=u_attribute, payload=UPayload(length=0, format=encoding, value=sample.payload) - ) - resp_callback.on_receive(msg) - else: - msg = f"Error while parsing Zenoh reply: {reply.error}" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - # Send query - ttl = attributes.ttl / 1000 if attributes.ttl is not None else 1000 - - value = Value(payload.value, encoding=Encoding.APP_CUSTOM().with_suffix(str(payload.format))) - self.session.get( - zenoh_key, - lambda reply: zenoh_callback(reply), - target=zenoh.QueryTarget.BEST_MATCHING(), - value=value, - timeout=ttl, - ) - msg = "Successfully sent rpc request to Zenoh" - logging.debug(f"SUCCESS:{msg}") - return UStatus(code=UCode.OK, message=msg) - - def send_response(self, payload: UPayload, attributes: UAttributes) -> UStatus: - # Transform attributes to user attachment in Zenoh - attachment = ZenohUtils.uattributes_to_attachment(attributes) - # Find out the corresponding query from dictionary - reqid = attributes.reqid - query = self.query_map.pop(reqid.SerializeToString(), None) - if not query: - msg = "Query doesn't exist" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) # Send back the query - value = Value(payload.value, Encoding.APP_CUSTOM().with_suffix(str(payload.format))) - reply = Sample(query.key_expr, value, attachment=attachment) - - try: - query.reply(reply) - msg = "Successfully sent rpc response to Zenoh" - logging.debug(f"SUCCESS:{msg}") - return UStatus(code=UCode.OK, message=msg) - - except Exception as e: - msg = "Unable to reply with Zenoh: {}".format(str(e)) - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - def register_publish_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: - # Get Zenoh key - zenoh_key = ZenohUtils.to_zenoh_key_string(topic) - - # Setup callback - def callback(sample: Sample) -> None: - # Get the UAttribute from Zenoh user attachment - attachment = sample.attachment - if attachment is None: - msg = "Unable to get attachment" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - u_attribute = ZenohUtils.attachment_to_uattributes(attachment) - if u_attribute is None: - msg = "Unable to decode attributes" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - # Create UPayload - format = ZenohUtils.to_upayload_format(sample.encoding) - if format: - u_payload = UPayload(length=0, format=format, value=sample.payload) - # Create UMessage - msg = UMessage(attributes=u_attribute, payload=u_payload) - listener.on_receive(msg) - else: - msg = "Unable to get payload encoding" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - # Create Zenoh subscriber - try: - subscriber = self.session.declare_subscriber(zenoh_key, callback) - if subscriber: - with self.subscriber_lock: - self.subscriber_map[(topic.SerializeToString(), listener)] = subscriber - - else: - msg = "Unable to register callback with Zenoh" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - except Exception: - msg = "Unable to register callback with Zenoh" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - msg = "Successfully register callback with Zenoh" - logging.debug(msg) - return UStatus(code=UCode.OK, message=msg) - - def register_request_listener(self, topic: UUri, listener: UListener) -> UStatus: - zenoh_key = ZenohUtils.to_zenoh_key_string(topic) - - def callback(query: Query) -> None: - nonlocal self, listener, topic - attachment = query.attachment - if not attachment: - msg = "Unable to get attachment" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - u_attribute = ZenohUtils.attachment_to_uattributes(attachment) - if isinstance(u_attribute, UStatus): - msg = f"Unable to transform user attachment to UAttributes: {u_attribute}" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - value = query.value - if value: - encoding = ZenohUtils.to_upayload_format(value.encoding) - if not encoding: - msg = "Unable to get payload encoding" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - u_payload = UPayload(format=encoding, value=value.payload) - else: - u_payload = UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) - - msg = UMessage(attributes=u_attribute, payload=u_payload) - - self.query_map[u_attribute.id.SerializeToString()] = query - listener.on_receive(msg) - - try: - queryable = self.session.declare_queryable(zenoh_key, callback) - with self.queryable_lock: - self.queryable_map[(topic.SerializeToString(), listener)] = queryable - except Exception: - msg = "Unable to register callback with Zenoh" - logging.debug(msg) - return UStatus(code=UCode.INTERNAL, message=msg) - - return UStatus(code=UCode.OK, message="Successfully register callback with Zenoh") - - def register_response_listener(self, topic: UUri, listener: UListener) -> UStatus: - with self.rpc_callback_lock: - self.rpc_callback_map[topic.SerializeToString()] = listener - return UStatus(code=UCode.OK, message="Successfully register response callback with Zenoh") - - def send(self, message: UMessage) -> UStatus: - payload = message.payload - attributes = message.attributes - # Check the type of UAttributes (Publish / Notification / Request / Response) - msg_type = attributes.type - if msg_type == UMessageType.UMESSAGE_TYPE_PUBLISH: - Validators.PUBLISH.validator().validate(attributes) - topic = attributes.source - zenoh_key = ZenohUtils.to_zenoh_key_string(topic) - return self.send_publish_notification(zenoh_key, payload, attributes) - elif msg_type == UMessageType.UMESSAGE_TYPE_NOTIFICATION: - Validators.NOTIFICATION.validator().validate(attributes) - topic = attributes.sink - zenoh_key = ZenohUtils.to_zenoh_key_string(topic) - return self.send_publish_notification(zenoh_key, payload, attributes) - - elif msg_type == UMessageType.UMESSAGE_TYPE_REQUEST: - Validators.REQUEST.validator().validate(attributes) - topic = attributes.sink - zenoh_key = ZenohUtils.to_zenoh_key_string(topic) - return self.send_request(zenoh_key, payload, attributes) - - elif msg_type == UMessageType.UMESSAGE_TYPE_RESPONSE: - Validators.RESPONSE.validator().validate(attributes) - return self.send_response(payload, attributes) - - else: - return UStatus(code=UCode.INVALID_ARGUMENT, message="Wrong Message type in UAttributes") - - def register_listener(self, topic: UUri, listener: UListener) -> None: - if topic.authority and not topic.entity and not topic.resource: - # This is special UUri which means we need to register for all of Publish, Request, and Response - # RPC response - # Register for all of Publish, Notification, Request, and Response - response_status = self.register_response_listener(topic, listener) - request_status = self.register_request_listener(topic, listener) - publish_status = self.register_publish_notification_listener(topic, listener) - if all(status.code == UCode.OK for status in [response_status, request_status, publish_status]): - return UStatus(code=UCode.OK, message="Successfully register listener with Zenoh") - else: - return UStatus(code=UCode.INTERNAL, message="Unsuccessful registration") - - else: - # Validate topic - UriValidator.validate(topic) - status = None - if UriValidator.is_rpc_response(topic): - status = self.register_response_listener(topic, listener) - elif UriValidator.is_rpc_method(topic): - status = self.register_request_listener(topic, listener) - else: - status = self.register_publish_notification_listener(topic, listener) - if status.code == UCode.OK: - return UStatus(code=UCode.OK, message="Successfully register listener with Zenoh") - else: - return UStatus(code=UCode.INTERNAL, message="Unsuccessful registration") - - def unregister_listener(self, topic: UUri, listener: UListener) -> None: - remove_pub_listener = False - remove_req_listener = False - remove_resp_listener = False - if topic.authority and not topic.entity and not topic.resource: - remove_pub_listener = True - remove_req_listener = True - remove_resp_listener = True - else: - # Validate topic - UriValidator.validate(topic) - if UriValidator.is_rpc_response(topic): - remove_resp_listener = True - elif UriValidator.is_rpc_method(topic): - remove_req_listener = True - else: - remove_pub_listener = True - if remove_resp_listener: - self._remove_response_listener(topic) - if remove_req_listener: - self._remove_request_listener(topic, listener) - if remove_pub_listener: - self._remove_publish_listener(topic, listener) - - def _remove_response_listener(self, topic: UUri) -> None: - with self.rpc_callback_lock: - if self.rpc_callback_map.pop(topic.SerializeToString(), None) is None: - raise ValueError("RPC response callback doesn't exist") - - def _remove_publish_listener(self, topic: UUri, listener: UListener) -> None: - with self.subscriber_lock: - if self.subscriber_map.pop((topic.SerializeToString(), listener), None) is None: - raise ValueError("Publish listener doesn't exist") - - def _remove_request_listener(self, topic: UUri, listener: UListener) -> None: - with self.queryable_lock: - if self.queryable_map.pop((topic.SerializeToString(), listener), None) is None: - raise ValueError("RPC request listener doesn't exist") - - def invoke_method( - self, - topic: UUri, - payload: UPayload, - options: CallOptions, - ) -> Future: - try: - # Validate UUri - if not UriValidator.validate(topic): - msg = "Invalid UUri for invoke_method" - logging.debug(f"{msg}") - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - # Get Zenoh key - zenoh_key_result = ZenohUtils.to_zenoh_key_string(topic) - if isinstance(zenoh_key_result, UStatus): - msg = "Unable to transform UUri to Zenoh key" - logging.debug(f"{msg}") - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - zenoh_key = zenoh_key_result - - # Create UAttributes and put into Zenoh user attachment - uattributes = UAttributesBuilder.request( - self.get_response_uuri(), topic, UPriority.UPRIORITY_CS4, options.ttl - ).build() - - attachment = ZenohUtils.uattributes_to_attachment(uattributes) - - # Get the data from UPayload - if not payload.value: - msg = "The data in UPayload should be Data::Value" - logging.debug(f"{msg}") - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - buf = payload.value - value = Value(buf, encoding=Encoding.APP_CUSTOM().with_suffix(str(payload.format))) - - # Send the query - get_builder = self.session.get( - zenoh_key, - zenoh.Queue(), - target=zenoh.QueryTarget.BEST_MATCHING(), - value=value, - attachment=attachment, - timeout=options.ttl / 1000, - ) - - for reply in get_builder.receiver: - if reply.is_ok: - encoding = ZenohUtils.to_upayload_format(reply.ok.encoding) - if not encoding: - msg = "Error while parsing Zenoh encoding" - logging.debug(f"{msg}") - raise UStatus(code=UCode.INTERNAL, message=msg) - - umessage = UMessage( - attributes=uattributes, payload=UPayload(format=encoding, value=reply.ok.payload) - ) - future = Future() - future.set_result(umessage) - return future - else: - msg = f"Error while parsing Zenoh reply: {reply.err}" - logging.debug(f"{msg}") - raise UStatus(code=UCode.INTERNAL, message=msg) - - except Exception as e: - msg = f"Unexpected error: {e}" - logging.debug(f"{msg}") - raise UStatus(code=UCode.INTERNAL, message=msg) diff --git a/up_client_zenoh/zenohutils.py b/up_client_zenoh/zenohutils.py deleted file mode 100644 index 201d00a..0000000 --- a/up_client_zenoh/zenohutils.py +++ /dev/null @@ -1,152 +0,0 @@ -""" -SPDX-FileCopyrightText: Copyright (c) 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -SPDX-FileType: SOURCE -SPDX-License-Identifier: Apache-2.0 -""" - -import logging -from typing import Union - -from uprotocol.proto.uattributes_pb2 import UAttributes, UPriority -from uprotocol.proto.upayload_pb2 import UPayloadFormat -from uprotocol.proto.uri_pb2 import UUri -from uprotocol.proto.ustatus_pb2 import UCode, UStatus -from zenoh import Encoding, Priority -from zenoh.value import Attachment - -UATTRIBUTE_VERSION: int = 1 - -# Configure the logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') - - -class ZenohUtils: - @staticmethod - def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]: - if uri.authority: - try: - authority_bytes = uri.authority.SerializeToString() - # Iterate over each byte and formate it as a two digit hexa decimal - return "".join(f"{c:02x}" for c in authority_bytes) - except Exception as e: - msg = f"Unable to transform UAuthority into micro form: {e}" - logging.debug(msg) - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - else: - msg = "UAuthority is empty" - logging.debug(msg) - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - @staticmethod - def to_zenoh_key_string(uri: UUri) -> Union[str, UStatus]: - if uri.authority and not uri.entity and not uri.resource: - try: - authority = ZenohUtils.get_uauth_from_uuri(uri) - if isinstance(authority, UStatus): - return authority - return f"upr/{authority}/**" - except Exception as e: - msg = f"Failed to generate Zenoh key: {e}" - logging.debug(msg) - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - else: - try: - uri_bytes = uri.SerializeToString() - if len(uri_bytes) > 8: - authority_hex = ''.join(format(c, '02x') for c in uri_bytes[8:]) - micro_zenoh_key = f"upr/{authority_hex}/" - else: - micro_zenoh_key = "upl/" - rest_hex = ''.join(format(c, '02x') for c in uri_bytes[:8]) - micro_zenoh_key += rest_hex - return micro_zenoh_key - except Exception as e: - msg = f"Failed to generate Zenoh key: {e}" - logging.debug(msg) - return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - @staticmethod - def map_zenoh_priority(upriority: UPriority) -> Priority: - mapping = { - UPriority.UPRIORITY_CS0: Priority.BACKGROUND(), - UPriority.UPRIORITY_CS1: Priority.DATA_LOW(), - UPriority.UPRIORITY_CS2: Priority.DATA(), - UPriority.UPRIORITY_CS3: Priority.DATA_HIGH(), - UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW(), - UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH(), - UPriority.UPRIORITY_CS6: Priority.REAL_TIME(), - UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW(), - } - return mapping[upriority] - - @staticmethod - def to_upayload_format(encoding: Encoding) -> UPayloadFormat: - try: - value = int(encoding.suffix) - return value if UPayloadFormat.Name(value) else None - except (ValueError, AttributeError): - return None - - @staticmethod - def uattributes_to_attachment(uattributes: UAttributes): - attachment = [("", UATTRIBUTE_VERSION.to_bytes(1, byteorder='little')), ("", uattributes.SerializeToString())] - return attachment - - @staticmethod - def attachment_to_uattributes(attachment: Attachment) -> UAttributes: - try: - version = None - version_found = False - uattributes = None - - items = attachment.items() - for pair in items: - if not version_found: - version = pair[1] - version_found = True - else: - # Process UAttributes data - uattributes = UAttributes() - uattributes.ParseFromString(pair[1]) - break - - if version is None: - msg = f"UAttributes version is empty (should be {UATTRIBUTE_VERSION})" - logging.debug(msg) - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - if not version_found: - msg = "UAttributes version is missing in the attachment" - logging.debug(msg) - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - if version != UATTRIBUTE_VERSION.to_bytes(1, byteorder='little'): - msg = f"UAttributes version is {version} (should be {UATTRIBUTE_VERSION})" - logging.debug(msg) - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - if uattributes is None: - msg = "Unable to get the UAttributes" - logging.debug(msg) - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) - - return uattributes - except Exception as e: - msg = f"Failed to convert Attachment to UAttributes: {e}" - logging.debug(msg) - raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) diff --git a/up_client_zenoh/__init__.py b/up_transport_zenoh/__init__.py similarity index 100% rename from up_client_zenoh/__init__.py rename to up_transport_zenoh/__init__.py diff --git a/up_client_zenoh/examples/common_uuri.py b/up_transport_zenoh/examples/common_uuri.py similarity index 61% rename from up_client_zenoh/examples/common_uuri.py rename to up_transport_zenoh/examples/common_uuri.py index b058fdb..46cc9c3 100644 --- a/up_client_zenoh/examples/common_uuri.py +++ b/up_transport_zenoh/examples/common_uuri.py @@ -17,8 +17,7 @@ from enum import Enum import zenoh -from uprotocol.proto.uri_pb2 import UAuthority, UEntity, UResource -from uprotocol.uri.factory.uresource_builder import UResourceBuilder +from uprotocol.v1.uri_pb2 import UUri # Configure the logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') @@ -31,32 +30,13 @@ class ExampleType(Enum): RPC_CLIENT = "rpc_client" -def authority() -> UAuthority: - return UAuthority(name="auth_name", id=bytes([1, 2, 3, 4])) - - -def entity(example_type: ExampleType) -> UEntity: - mapping = { - ExampleType.PUBLISHER: ("publisher", 1), - ExampleType.SUBSCRIBER: ("subscriber", 2), - ExampleType.RPC_SERVER: ("rpc_server", 3), - ExampleType.RPC_CLIENT: ("rpc_client", 4), - } - name, id = mapping[example_type] - return UEntity(name=name, id=1, version_major=id) - - -def pub_resource() -> UResource: - return UResource(name="door", instance="front_left", message="Door", id=5678) - - -def rpc_resource() -> UResource: - return UResourceBuilder.for_rpc_request("getTime", 5678) +def create_method_uri(): + return UUri(authority_name="Neelam", ue_id=4, ue_version_major=1, resource_id=3) def get_zenoh_config(): # start your zenoh router and provide router ip and port - zenoh_ip = "192.168.29.79" # zenoh router ip + zenoh_ip = "10.0.3.3" # zenoh router ip zenoh_port = 9090 # zenoh router port conf = zenoh.Config() if zenoh_ip is not None: diff --git a/up_transport_zenoh/examples/publish.py b/up_transport_zenoh/examples/publish.py new file mode 100644 index 0000000..57d6d3f --- /dev/null +++ b/up_transport_zenoh/examples/publish.py @@ -0,0 +1,42 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import time + +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.v1.uri_pb2 import UUri + +from up_transport_zenoh.examples import common_uuri +from up_transport_zenoh.examples.common_uuri import get_zenoh_default_config +from up_transport_zenoh.uptransportzenoh import UPTransportZenoh + +source = UUri(authority_name="vehicle1", ue_id=18) +publisher = UPTransportZenoh.new(get_zenoh_default_config(), source) + + +async def publish_to_zenoh(): + # create uuri + uuri = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) + builder = UMessageBuilder.publish(uuri) + payload = UPayload.pack(UUri()) + umessage = builder.build_from_upayload(payload) + status = await publisher.send(umessage) + common_uuri.logging.debug(f"Publish status {status}") + time.sleep(3) + + +if __name__ == '__main__': + asyncio.run(publish_to_zenoh()) diff --git a/up_transport_zenoh/examples/rpc_client.py b/up_transport_zenoh/examples/rpc_client.py new file mode 100644 index 0000000..3442c44 --- /dev/null +++ b/up_transport_zenoh/examples/rpc_client.py @@ -0,0 +1,46 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio + +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import ( + UPayloadFormat, +) +from uprotocol.v1.uri_pb2 import UUri + +from up_transport_zenoh.examples import common_uuri +from up_transport_zenoh.examples.common_uuri import create_method_uri, get_zenoh_default_config +from up_transport_zenoh.uptransportzenoh import UPTransportZenoh + +source = UUri(authority_name="vehicle1", ue_id=18) +transport = UPTransportZenoh.new(get_zenoh_default_config(), source) + + +async def send_rpc_request_to_zenoh(): + # create uuri + uuri = create_method_uri() + # create UPayload + data = "GetCurrentTime" + payload = UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, data=bytes([ord(c) for c in data])) + # invoke RPC method + common_uuri.logging.debug(f"Send request to {uuri}") + rpc_client = InMemoryRpcClient(transport) + response_payload = await rpc_client.invoke_method(uuri, payload) + common_uuri.logging.debug(f"Response payload {response_payload}") + + +if __name__ == '__main__': + asyncio.run(send_rpc_request_to_zenoh()) diff --git a/up_transport_zenoh/examples/rpc_server.py b/up_transport_zenoh/examples/rpc_server.py new file mode 100644 index 0000000..1f4b680 --- /dev/null +++ b/up_transport_zenoh/examples/rpc_server.py @@ -0,0 +1,60 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import time +from datetime import datetime + +from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import ( + UPayloadFormat, +) +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + +from up_transport_zenoh.examples import common_uuri +from up_transport_zenoh.examples.common_uuri import create_method_uri, get_zenoh_default_config +from up_transport_zenoh.uptransportzenoh import UPTransportZenoh + +source = UUri(authority_name="vehicle1", ue_id=18) +transport = UPTransportZenoh.new(get_zenoh_default_config(), source) + + +class MyRequestHandler(RequestHandler): + def handle_request(self, msg: UMessage) -> UPayload: + common_uuri.logging.debug("Request Received by Service Request Handler") + attributes = msg.attributes + payload = msg.payload + source = attributes.source + sink = attributes.sink + common_uuri.logging.debug(f"Receive {payload} from {source} to {sink}") + response_payload = format(datetime.utcnow()).encode('utf-8') + payload = UPayload(data=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + return payload + + +async def register_rpc(): + uuri = create_method_uri() + rpc_server = InMemoryRpcServer(transport) + status = await rpc_server.register_request_handler(uuri, MyRequestHandler()) + common_uuri.logging.debug(f"Request Handler Register status {status}") + + while True: + time.sleep(1) + + +if __name__ == '__main__': + asyncio.run(register_rpc()) diff --git a/up_transport_zenoh/examples/subscribe.py b/up_transport_zenoh/examples/subscribe.py new file mode 100644 index 0000000..688ea0a --- /dev/null +++ b/up_transport_zenoh/examples/subscribe.py @@ -0,0 +1,58 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio + +from uprotocol.client.usubscription.v3.inmemoryusubcriptionclient import InMemoryUSubscriptionClient +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + +from up_transport_zenoh.examples import common_uuri +from up_transport_zenoh.examples.common_uuri import get_zenoh_default_config +from up_transport_zenoh.uptransportzenoh import UPTransportZenoh + + +class MyListener(UListener): + async def on_receive(self, msg: UMessage) -> None: + common_uuri.logging.debug('on receive called') + common_uuri.logging.debug(msg.payload) + common_uuri.logging.debug(msg.attributes.__str__()) + return UStatus(message="Received event") + + +source = UUri(authority_name="vehicle1", ue_id=18) +transport = UPTransportZenoh.new(get_zenoh_default_config(), source) +# create topic uuri +uuri = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) + + +async def subscribe_to_zenoh_if_subscription_service_is_not_running(): + status = await transport.register_listener(uuri, MyListener()) + common_uuri.logging.debug(f"Register Listener status {status}") + while True: + await asyncio.sleep(1) + + +async def subscribe_if_subscription_service_is_running(): + client = InMemoryUSubscriptionClient(transport) + status = await client.subscribe(uuri, MyListener()) + common_uuri.logging.debug(f"Register Listener status {status}") + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(subscribe_to_zenoh_if_subscription_service_is_not_running()) diff --git a/up_transport_zenoh/tests/test_zenohutils.py b/up_transport_zenoh/tests/test_zenohutils.py new file mode 100644 index 0000000..e2013b8 --- /dev/null +++ b/up_transport_zenoh/tests/test_zenohutils.py @@ -0,0 +1,75 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +import pytest +from uprotocol.uri.serializer.uriserializer import UriSerializer + +from up_transport_zenoh.zenohutils import MessageFlag, ZenohUtils + + +class TestZenohUtils(unittest.IsolatedAsyncioTestCase): + @pytest.mark.asyncio + async def test_to_zenoh_key_string(self): + authority = "192.168.1.100" + test_cases = [ + ("/10AB/3/80CD", None, "up/192.168.1.100/10AB/3/80CD/{}/{}/{}/{}"), + ("//192.168.1.100/10AB/3/80CD", None, "up/192.168.1.100/10AB/3/80CD/{}/{}/{}/{}"), + ( + "//192.168.1.100/10AB/3/80CD", + "//192.168.1.101/20EF/4/0", + "up/192.168.1.100/10AB/3/80CD/192.168.1.101/20EF/4/0", + ), + ("//*/FFFF/FF/FFFF", "//192.168.1.101/20EF/4/0", "up/*/*/*/*/192.168.1.101/20EF/4/0"), + ("//my-host1/10AB/3/0", "//my-host2/20EF/4/B", "up/my-host1/10AB/3/0/my-host2/20EF/4/B"), + ("//*/FFFF/FF/FFFF", "//my-host2/20EF/4/B", "up/*/*/*/*/my-host2/20EF/4/B"), + ] + for src_uri, sink_uri, expected_zenoh_key in test_cases: + src = UriSerializer().deserialize(src_uri) + if sink_uri: + sink = UriSerializer().deserialize(sink_uri) + result_key1 = ZenohUtils.to_zenoh_key_string(authority, src, sink) + print("result1 ", result_key1) + assert result_key1 == expected_zenoh_key + else: + result_key2 = ZenohUtils.to_zenoh_key_string(authority, src, None) + print("result2 ", result_key2) + assert result_key2 == expected_zenoh_key + + @pytest.mark.asyncio + async def test_get_listener_message_type(self): + test_cases = [ + ("//192.168.1.100/10AB/3/80CD", None, MessageFlag.PUBLISH), + ("//192.168.1.100/10AB/3/80CD", "//192.168.1.101/20EF/4/0", MessageFlag.NOTIFICATION), + ("//192.168.1.100/10AB/3/0", "//192.168.1.101/20EF/4/B", MessageFlag.REQUEST), + ("//192.168.1.101/20EF/4/B", "//192.168.1.100/10AB/3/0", MessageFlag.RESPONSE), + ("//*/FFFF/FF/FFFF", "//192.168.1.100/10AB/3/0", MessageFlag.NOTIFICATION | MessageFlag.RESPONSE), + ("//*/FFFF/FF/FFFF", "//192.168.1.101/20EF/4/B", MessageFlag.REQUEST), + ("//192.168.1.100/10AB/3/0", "//*/FFFF/FF/FFFF", MessageFlag.REQUEST), + ("//192.168.1.101/20EF/4/B", "//*/FFFF/FF/FFFF", MessageFlag.RESPONSE), + ("//192.168.1.100/10AB/3/80CD", "//*/FFFF/FF/FFFF", MessageFlag.NOTIFICATION | MessageFlag.PUBLISH), + ] + + for src_uri, sink_uri, expected_result in test_cases: + src = UriSerializer().deserialize(src_uri) + if sink_uri: + dst = UriSerializer().deserialize(sink_uri) + assert ZenohUtils.get_listener_message_type(src, dst) == expected_result + else: + assert ZenohUtils.get_listener_message_type(src, None) == expected_result + + +if __name__ == "__main__": + unittest.main() diff --git a/up_transport_zenoh/uptransportzenoh.py b/up_transport_zenoh/uptransportzenoh.py new file mode 100644 index 0000000..974c2c2 --- /dev/null +++ b/up_transport_zenoh/uptransportzenoh.py @@ -0,0 +1,381 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import logging +import threading +from threading import Lock +from typing import Dict, Tuple + +import zenoh +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.transport.validator.uattributesvalidator import Validators +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.v1.uattributes_pb2 import UAttributes, UMessageType +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus +from zenoh import Config, Query, Queryable, Sample, Session, Subscriber, Value +from zenoh.keyexpr import KeyExpr + +from up_transport_zenoh.zenohutils import MessageFlag, ZenohUtils + +# Configure the logging +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + + +class UPTransportZenoh(UTransport): + def get_source(self) -> UUri: + return self.source + + def close(self) -> None: + pass + + def __init__(self, session: Session, source: UUri): + self.session = session + self.subscriber_map: Dict[Tuple[str, UListener], Subscriber] = {} + self.queryable_map: Dict[Tuple[str, UListener], Queryable] = {} + self.query_map: Dict[str, Query] = {} + self.rpc_callback_map: Dict[str, UListener] = {} + self.source = source + self.authority_name = source.authority_name + self.rpc_callback_lock = Lock() + self.queryable_lock = Lock() + self.subscriber_lock = Lock() + + @classmethod + def new(cls, config: Config, source: UUri): + try: + session = zenoh.open(config) + except Exception: + msg = "Unable to open Zenoh session" + logging.error(msg) + raise UStatus.fail_with_code(UCode.INTERNAL, msg) + + return cls( + session=session, + source=source, + ) + + def send_publish_notification(self, zenoh_key: str, payload: bytes, attributes: UAttributes) -> UStatus: + # Transform UAttributes to user attachment in Zenoh + attachment = ZenohUtils.uattributes_to_attachment(attributes) + if not attachment: + msg = "Unable to transform UAttributes to attachment" + logging.debug(f"ERROR: {msg}") + return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) + + # Map the priority to Zenoh + priority = ZenohUtils.map_zenoh_priority(attributes.priority) + if not priority: + msg = "Unable to map to Zenoh priority" + logging.debug(f"ERROR: {msg}") + return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) + + try: + # Simulate sending data + logging.debug(f"Sending data to Zenoh with key: {zenoh_key}") + logging.debug(f"Data: {payload}") + logging.debug(f"Priority: {priority}") + logging.debug(f"Attachment: {attachment}") + + self.session.put(keyexpr=zenoh_key, value=payload, attachment=attachment, priority=priority) + msg = "Successfully sent data to Zenoh" + logging.debug(f"SUCCESS:{msg}") + return UStatus(code=UCode.OK, message=msg) + except Exception as e: + msg = f"Unable to send with Zenoh: {e}" + logging.debug(f"ERROR: {msg}") + return UStatus(code=UCode.INTERNAL, message=msg) + + def send_request(self, zenoh_key: str, payload: bytes, attributes: UAttributes) -> UStatus: + # Transform UAttributes to user attachment in Zenoh + attachment = ZenohUtils.uattributes_to_attachment(attributes) + if attachment is None: + msg = "Unable to transform UAttributes to attachment" + logging.debug(msg) + return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) + resp_callback = None + for saved_zenoh_key, listener in self.rpc_callback_map.items(): + keyexpr_zenohkey = KeyExpr.new(zenoh_key) + keyexpr_savedkey = KeyExpr.new(saved_zenoh_key) + + if keyexpr_zenohkey.intersects(keyexpr_savedkey): + resp_callback = self.rpc_callback_map.get(saved_zenoh_key) + break + if resp_callback is None: + msg = "Unable to get callback" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + def handle_response(reply: Query.reply) -> None: + try: + sample = reply.ok + # Get UAttribute from the attachment + attachment = sample.attachment + if attachment is None: + msg = "Unable to get the attachment" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + u_attribute = ZenohUtils.attachment_to_uattributes(attachment) + if u_attribute is None: + msg = "Transform attachment to UAttributes failed" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + # Create UMessage + msg = UMessage(attributes=u_attribute, payload=sample.payload) + asyncio.run(resp_callback.on_receive(msg)) + except Exception: + msg = f"Error while parsing Zenoh reply: {reply.error}" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + # Send query + ttl = attributes.ttl / 1000 if attributes.ttl is not None else 1000 + + value = Value(payload) + # Send the query + get_builder = self.session.get( + zenoh_key, + zenoh.Queue(), + target=zenoh.QueryTarget.BEST_MATCHING(), + attachment=attachment, + value=value, + timeout=ttl, + ) + + def get_response(): + try: + for reply in get_builder.receiver: + if reply.is_ok: + handle_response(reply) + break + except Exception: + pass + + thread = threading.Thread(target=get_response) + # Start the thread + thread.start() + + msg = "Successfully sent rpc request to Zenoh" + logging.debug(f"SUCCESS:{msg}") + return UStatus(code=UCode.OK, message=msg) + + def send_response(self, payload: bytes, attributes: UAttributes) -> UStatus: + # Transform attributes to user attachment in Zenoh + attachment = ZenohUtils.uattributes_to_attachment(attributes) + if attachment is None: + msg = "Unable to transform UAttributes to attachment" + logging.debug(msg) + return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) + + # Find out the corresponding query from dictionary + reqid = attributes.reqid + + query = self.query_map.pop(reqid.SerializeToString(), None) + if not query: + msg = "Query doesn't exist" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) # Send back the query + value = Value(payload) + reply = Sample(query.key_expr, value, attachment=attachment) + + try: + query.reply(reply) + msg = "Successfully sent rpc response to Zenoh" + logging.debug(f"SUCCESS:{msg}") + return UStatus(code=UCode.OK, message=msg) + + except Exception as e: + msg = "Unable to reply with Zenoh: {}".format(str(e)) + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + def register_publish_notification_listener(self, zenoh_key: str, listener: UListener) -> UStatus: + def callback(sample: Sample) -> None: + # Get the UAttribute from Zenoh user attachment + attachment = sample.attachment + if attachment is None: + msg = "Unable to get attachment" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + try: + u_attribute = ZenohUtils.attachment_to_uattributes(attachment) + except UStatusError as error: + logging.debug(error.get_message()) + return UStatus(code=error.get_code(), message=error.get_message()) + if u_attribute is None: + msg = "Unable to decode attributes" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + message = UMessage(attributes=u_attribute, payload=sample.payload) + asyncio.run(listener.on_receive(message)) + + # Create Zenoh subscriber + try: + subscriber = self.session.declare_subscriber(zenoh_key, callback) + if subscriber: + with self.subscriber_lock: + self.subscriber_map[(zenoh_key, listener)] = subscriber + except Exception: + msg = "Unable to register callback with Zenoh" + logging.debug(msg) + raise UStatus.fail_with_code(UCode.INTERNAL, msg) + + msg = "Successfully register callback with Zenoh" + logging.debug(msg) + return UStatus(code=UCode.OK, message=msg) + + def register_request_listener(self, zenoh_key: str, listener: UListener) -> UStatus: + def callback(query: Query) -> None: + nonlocal self, listener, zenoh_key + attachment = query.attachment + if not attachment: + msg = "Unable to get attachment" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + try: + u_attribute = ZenohUtils.attachment_to_uattributes(attachment) + except UStatusError as error: + logging.debug(error.get_message()) + return UStatus(code=error.get_code(), message=error.get_message()) + if u_attribute is None: + msg = "Unable to decode attributes" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + message = UMessage(attributes=u_attribute, payload=query.value.payload if query.value else None) + self.query_map[u_attribute.id.SerializeToString()] = query + asyncio.run(listener.on_receive(message)) + + try: + with self.queryable_lock: + queryable = self.session.declare_queryable(zenoh_key, callback) + self.queryable_map[(zenoh_key, listener)] = queryable + + except Exception: + msg = "Unable to register callback with Zenoh" + logging.debug(msg) + return UStatus(code=UCode.INTERNAL, message=msg) + + return UStatus(code=UCode.OK, message="Successfully register callback with Zenoh") + + def register_response_listener(self, zenoh_key: str, listener: UListener) -> UStatus: + with self.rpc_callback_lock: + self.rpc_callback_map[zenoh_key] = listener + return UStatus(code=UCode.OK, message="Successfully register response callback with Zenoh") + + async def send(self, message: UMessage) -> UStatus: + attributes = message.attributes + source = attributes.source + sink = attributes.sink + zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, source, sink) + + if not source: + return UStatus(code=UCode.INVALID_ARGUMENT, message="attributes.source shouldn't be empty") + payload = message.payload or b'' + # Check the type of UAttributes (Publish / Notification / Request / Response) + msg_type = attributes.type + if msg_type == UMessageType.UMESSAGE_TYPE_PUBLISH: + Validators.PUBLISH.validator().validate(attributes) + return self.send_publish_notification(zenoh_key, payload, attributes) + elif msg_type == UMessageType.UMESSAGE_TYPE_NOTIFICATION: + Validators.NOTIFICATION.validator().validate(attributes) + return self.send_publish_notification(zenoh_key, payload, attributes) + + elif msg_type == UMessageType.UMESSAGE_TYPE_REQUEST: + Validators.REQUEST.validator().validate(attributes) + return self.send_request(zenoh_key, payload, attributes) + + elif msg_type == UMessageType.UMESSAGE_TYPE_RESPONSE: + Validators.RESPONSE.validator().validate(attributes) + return self.send_response(payload, attributes) + + else: + return UStatus(code=UCode.INVALID_ARGUMENT, message="Wrong Message type in UAttributes") + + async def register_listener( + self, source_filter: UUri, listener: UListener, sink_filter: UUri = UriFactory.ANY + ) -> UStatus: + flag = ZenohUtils.get_listener_message_type(source_filter, sink_filter) + + # RPC request + if flag & MessageFlag.REQUEST: + # Get Zenoh key + zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, source_filter, sink_filter) + return self.register_request_listener(zenoh_key, listener) # RPC response + if flag & MessageFlag.RESPONSE: + if sink_filter is not None: + # Get Zenoh key + zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, sink_filter, source_filter) + return self.register_response_listener(zenoh_key, listener) + else: + return UStatus(code=UCode.INVALID_ARGUMENT, message="Sink should not be None in Response") + # Publish & Notification + if flag & (MessageFlag.PUBLISH | MessageFlag.NOTIFICATION): + # Get Zenoh key + zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, source_filter, sink_filter) + return self.register_publish_notification_listener(zenoh_key, listener) + + async def unregister_listener( + self, source_filter: UUri, listener: UListener, sink_filter: UUri = UriFactory.ANY + ) -> UStatus: + flag = ZenohUtils.get_listener_message_type(source_filter, sink_filter) + # Publish & Notification + if flag & (MessageFlag.PUBLISH | MessageFlag.NOTIFICATION): + # Get Zenoh key + zenoh_key = ZenohUtils.to_zenoh_key_string(source_filter, sink_filter) + return self._remove_publish_listener(zenoh_key, listener) + # RPC request + if flag & MessageFlag.REQUEST: + # Get Zenoh key + zenoh_key = ZenohUtils.to_zenoh_key_string(source_filter, sink_filter) + return self._remove_request_listener(zenoh_key, listener) # RPC response + if flag & MessageFlag.RESPONSE: + if sink_filter is not None: + # Get Zenoh key + zenoh_key = ZenohUtils.to_zenoh_key_string(sink_filter, source_filter) + return self._remove_response_listener(zenoh_key) + else: + return UStatus(code=UCode.INVALID_ARGUMENT, message="Sink should not be None in Response") + + def _remove_response_listener(self, zenoh_key: str) -> UStatus: + with self.rpc_callback_lock: + if self.rpc_callback_map.pop(zenoh_key, None) is None: + msg = "RPC response callback doesn't exist" + logging.error(msg) + return UStatus(code=UCode.NOT_FOUND, message=msg) + return UStatus(code=UCode.OK) + + def _remove_publish_listener(self, zenoh_key: str, listener: UListener) -> UStatus: + with self.subscriber_lock: + if self.subscriber_map.pop((zenoh_key, listener), None) is None: + msg = "Listener not registered for filters: {source_filter}, {sink_filter}" + logging.error(msg) + return UStatus(code=UCode.NOT_FOUND, message=msg) + + return UStatus(code=UCode.OK, message="Listener removed successfully") + + def _remove_request_listener(self, zenoh_key: str, listener: UListener) -> UStatus: + with self.queryable_lock: + if self.queryable_map.pop((zenoh_key, listener), None) is None: + msg = "RPC request listener doesn't exist" + logging.error(msg) + return UStatus(code=UCode.NOT_FOUND, message=msg) + return UStatus(code=UCode.OK, message="Listener removed successfully") diff --git a/up_transport_zenoh/zenohutils.py b/up_transport_zenoh/zenohutils.py new file mode 100644 index 0000000..e19e824 --- /dev/null +++ b/up_transport_zenoh/zenohutils.py @@ -0,0 +1,229 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import logging +from enum import IntFlag +from typing import Union + +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.v1.uattributes_pb2 import ( + UAttributes, + UPayloadFormat, + UPriority, +) +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus +from zenoh import Encoding, Priority +from zenoh.value import Attachment + +UATTRIBUTE_VERSION: int = 1 + +# Configure the logging +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + + +class MessageFlag(IntFlag): + PUBLISH = 1 + NOTIFICATION = 2 + REQUEST = 4 + RESPONSE = 8 + + +class ZenohUtils: + @staticmethod + def uri_to_zenoh_key(authority_name: str, uri: UUri) -> str: + authority = authority_name if not uri.authority_name else uri.authority_name + ue_id = "*" if uri.ue_id == UriFactory.WILDCARD_ENTITY_ID else f"{uri.ue_id:X}" + ue_version_major = ( + "*" if uri.ue_version_major == UriFactory.WILDCARD_ENTITY_VERSION else f"{uri.ue_version_major:X}" + ) + resource_id = "*" if uri.resource_id == UriFactory.WILDCARD_RESOURCE_ID else f"{uri.resource_id:X}" + return f"{authority}/{ue_id}/{ue_version_major}/{resource_id}" + + @staticmethod + def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]: + if uri.authority: + try: + authority_bytes = uri.authority.SerializeToString() + # Iterate over each byte and formate it as a two digit hexa decimal + return "".join(f"{c:02x}" for c in authority_bytes) + except Exception as e: + msg = f"Unable to transform UAuthority into micro form: {e}" + logging.debug(msg) + return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) + else: + msg = "UAuthority is empty" + logging.debug(msg) + return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) + + @staticmethod + def to_zenoh_key_string(authority_name: str, src_uri: UUri, dst_uri: UUri = None) -> str: + src = ZenohUtils.uri_to_zenoh_key(authority_name, src_uri) + dst = ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri else "{}/{}/{}/{}" + return f"up/{src}/{dst}" + + @staticmethod + def map_zenoh_priority(upriority: UPriority) -> Priority: + mapping = { + UPriority.UPRIORITY_CS0: Priority.BACKGROUND(), + UPriority.UPRIORITY_CS1: Priority.DATA_LOW(), + UPriority.UPRIORITY_CS2: Priority.DATA(), + UPriority.UPRIORITY_CS3: Priority.DATA_HIGH(), + UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW(), + UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH(), + UPriority.UPRIORITY_CS6: Priority.REAL_TIME(), + UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW(), + } + return mapping[upriority] + + @staticmethod + def to_upayload_format(encoding: Encoding) -> UPayloadFormat: + try: + value = int(encoding.suffix) + return value if UPayloadFormat.Name(value) else None + except (ValueError, AttributeError): + return None + + @staticmethod + def uattributes_to_attachment(uattributes: UAttributes): + attachment = [("", UATTRIBUTE_VERSION.to_bytes(1, byteorder='little')), ("", uattributes.SerializeToString())] + return attachment + + @staticmethod + def attachment_to_uattributes(attachment: Attachment) -> UAttributes: + try: + version = None + version_found = False + uattributes = None + + items = attachment.items() + for pair in items: + if not version_found: + version = pair[1] + version_found = True + else: + # Process UAttributes data + uattributes = UAttributes() + uattributes.ParseFromString(pair[1]) + break + + if version is None: + msg = f"UAttributes version is empty (should be {UATTRIBUTE_VERSION})" + logging.debug(msg) + raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + + if not version_found: + msg = "UAttributes version is missing in the attachment" + logging.debug(msg) + raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + + if version != UATTRIBUTE_VERSION.to_bytes(1, byteorder='little'): + msg = f"UAttributes version is {version} (should be {UATTRIBUTE_VERSION})" + logging.debug(msg) + raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + + if uattributes is None: + msg = "Unable to get the UAttributes" + logging.debug(msg) + raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + + return uattributes + except Exception as e: + msg = f"Failed to convert Attachment to UAttributes: {e}" + logging.debug(msg) + raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + + @staticmethod + def get_listener_message_type(source_uuri: UUri, sink_uuri: UUri = None) -> Union[MessageFlag, UStatusError]: + """ + The table for mapping resource ID to message type: + + | src rid | sink rid | Publish | Notification | Request | Response | + |-------------|----------|---------|--------------|---------|----------| + | [8000-FFFF) | None | V | | | | + | [8000-FFFF) | 0 | | V | | | + | 0 | (0-8000) | | | V | | + | (0-8000) | 0 | | | | V | + | FFFF | 0 | | V | | V | + | FFFF | (0-8000) | | | V | | + | 0 | FFFF | | | V | | + | (0-8000) | FFFF | | | | V | + | [8000-FFFF) | FFFF | | V | | | + | FFFF | FFFF | | V | V | V | + + Some organization: + - Publish: {[8000-FFFF), None} + - Notification: {[8000-FFFF), 0}, {[8000-FFFF), FFFF}, {FFFF, 0}, {FFFF, FFFF} + - Request: {0, (0-8000)}, {0, FFFF}, {FFFF, (0-8000)}, {FFFF, FFFF} + - Response: {(0-8000), 0}, {(0-8000), FFFF}, (FFFF, 0), {FFFF, FFFF} + + :param source_uuri: The source UUri. + :param sink_uuri: Optional sink UUri for request-response types. + :return: MessageFlag indicating the type of message. + :raises Exception: If the combination of source UUri and sink UUri is invalid. + """ + flag = MessageFlag(0) + + rpc_range = range(1, 0x7FFF) + nonrpc_range = range(0x8000, 0xFFFE) + wildcard_resource_id = UriFactory.WILDCARD_RESOURCE_ID + + src_resource = source_uuri.resource_id + + # Notification / Request / Response + if sink_uuri: + dst_resource = sink_uuri.resource_id + + if ( + (src_resource in nonrpc_range and dst_resource == 0) + or (src_resource in nonrpc_range and dst_resource == wildcard_resource_id) + or (src_resource == wildcard_resource_id and dst_resource == 0) + or (src_resource == wildcard_resource_id and dst_resource == wildcard_resource_id) + ): + flag |= MessageFlag.NOTIFICATION + + if ( + (src_resource == 0 and dst_resource in rpc_range) + or (src_resource == 0 and dst_resource == wildcard_resource_id) + or (src_resource == wildcard_resource_id and dst_resource in rpc_range) + or (src_resource == wildcard_resource_id and dst_resource == wildcard_resource_id) + ): + flag |= MessageFlag.REQUEST + + if ( + (src_resource in rpc_range and dst_resource == 0) + or (src_resource in rpc_range and dst_resource == wildcard_resource_id) + or (src_resource == wildcard_resource_id and dst_resource == 0) + or (src_resource == wildcard_resource_id and dst_resource == wildcard_resource_id) + ): + flag |= MessageFlag.RESPONSE + + if dst_resource == wildcard_resource_id and ( + src_resource in nonrpc_range or src_resource == wildcard_resource_id + ): + flag |= MessageFlag.PUBLISH + + # Publish + elif src_resource in nonrpc_range or src_resource == wildcard_resource_id: + flag |= MessageFlag.PUBLISH + + # Error handling + if flag == MessageFlag(0): + raise UStatusError.from_code_message( + code=UCode.INTERNAL, message="Wrong combination of source UUri and sink " "UUri" + ) + else: + return flag