diff --git a/.github/ruff.toml b/.github/ruff.toml index 226dc5c..dec56e2 100644 --- a/.github/ruff.toml +++ b/.github/ruff.toml @@ -34,12 +34,4 @@ indent-style = "space" docstring-code-format = true docstring-code-line-length = 100 -[lint.per-file-ignores] -# Ignore N999 in the specified folder -"*" = ["N999"] # Invalid module name: -# Ignore E402 in specific files -"testagent.py" = ["E402"] -"environment.py" = ["E402"] -"tck_step_implementations.py" = ["E402"] - diff --git a/setup.py b/setup.py index 1edd6b6..cb8a84d 100644 --- a/setup.py +++ b/setup.py @@ -21,14 +21,12 @@ import os -from setuptools import setup, find_packages +from setuptools import find_packages, setup project_name = "up-client-zenoh-python" script_directory = os.path.realpath(os.path.dirname(__file__)) -REQUIREMENTS = [ - i.strip() for i in open(os.path.join("requirements.txt")).readlines() -] +REQUIREMENTS = [i.strip() for i in open(os.path.join("requirements.txt")).readlines()] setup( name=project_name, diff --git a/up_client_zenoh/examples/common_uuri.py b/up_client_zenoh/examples/common_uuri.py index f6fabe4..b058fdb 100644 --- a/up_client_zenoh/examples/common_uuri.py +++ b/up_client_zenoh/examples/common_uuri.py @@ -17,9 +17,7 @@ from enum import Enum import zenoh -from uprotocol.proto.uri_pb2 import UAuthority -from uprotocol.proto.uri_pb2 import UEntity -from uprotocol.proto.uri_pb2 import UResource +from uprotocol.proto.uri_pb2 import UAuthority, UEntity, UResource from uprotocol.uri.factory.uresource_builder import UResourceBuilder # Configure the logging @@ -38,8 +36,12 @@ def authority() -> UAuthority: def entity(example_type: ExampleType) -> UEntity: - mapping = {ExampleType.PUBLISHER : ("publisher", 1), ExampleType.SUBSCRIBER: ("subscriber", 2), - ExampleType.RPC_SERVER: ("rpc_server", 3), ExampleType.RPC_CLIENT: ("rpc_client", 4)} + mapping = { + ExampleType.PUBLISHER: ("publisher", 1), + ExampleType.SUBSCRIBER: ("subscriber", 2), + ExampleType.RPC_SERVER: ("rpc_server", 3), + ExampleType.RPC_CLIENT: ("rpc_client", 4), + } name, id = mapping[example_type] return UEntity(name=name, id=1, version_major=id) @@ -70,7 +72,4 @@ def get_zenoh_default_config(): # Create a Zenoh configuration object with default settings config = zenoh.Config() - # # Set the mode to Peer (or Router, Client depending on your use case) - # config = "peer" - return config diff --git a/up_client_zenoh/examples/publish.py b/up_client_zenoh/examples/publish.py index 5064d55..3af6f2d 100644 --- a/up_client_zenoh/examples/publish.py +++ b/up_client_zenoh/examples/publish.py @@ -16,19 +16,18 @@ from uprotocol.proto.uattributes_pb2 import UPriority from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload +from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat from uprotocol.proto.uri_pb2 import UUri from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \ - get_zenoh_default_config +from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, pub_resource from up_client_zenoh.upclientzenoh import UPClientZenoh publisher = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.PUBLISHER)) -def publishtoZenoh(): +def publish_to_zenoh(): # create uuri uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) cnt = 0 @@ -44,4 +43,4 @@ def publishtoZenoh(): if __name__ == '__main__': - publishtoZenoh() + publish_to_zenoh() diff --git a/up_client_zenoh/examples/rpc_client.py b/up_client_zenoh/examples/rpc_client.py index a44bac5..e39c3ad 100644 --- a/up_client_zenoh/examples/rpc_client.py +++ b/up_client_zenoh/examples/rpc_client.py @@ -17,7 +17,7 @@ from uprotocol.proto.uri_pb2 import UUri from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, get_zenoh_default_config +from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, rpc_resource from up_client_zenoh.upclientzenoh import UPClientZenoh rpc_client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_CLIENT)) diff --git a/up_client_zenoh/examples/rpc_server.py b/up_client_zenoh/examples/rpc_server.py index 49400d5..bba35ee 100644 --- a/up_client_zenoh/examples/rpc_server.py +++ b/up_client_zenoh/examples/rpc_server.py @@ -16,23 +16,20 @@ from datetime import datetime from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload -from uprotocol.proto.upayload_pb2 import UPayloadFormat +from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat from uprotocol.proto.uri_pb2 import UUri from uprotocol.proto.ustatus_pb2 import UStatus from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder from uprotocol.transport.ulistener import UListener from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, rpc_resource, \ - get_zenoh_default_config +from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, rpc_resource from up_client_zenoh.upclientzenoh import UPClientZenoh rpc_server = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.RPC_SERVER)) class RPCRequestListener(UListener): - def on_receive(self, msg: UMessage) -> UStatus: attributes = msg.attributes payload = msg.payload diff --git a/up_client_zenoh/examples/subscribe.py b/up_client_zenoh/examples/subscribe.py index cab1b83..c4b09ae 100644 --- a/up_client_zenoh/examples/subscribe.py +++ b/up_client_zenoh/examples/subscribe.py @@ -20,13 +20,11 @@ from uprotocol.transport.ulistener import UListener from up_client_zenoh.examples import common_uuri -from up_client_zenoh.examples.common_uuri import authority, entity, ExampleType, pub_resource, \ - get_zenoh_default_config +from up_client_zenoh.examples.common_uuri import ExampleType, authority, entity, get_zenoh_default_config, pub_resource from up_client_zenoh.upclientzenoh import UPClientZenoh class MyListener(UListener): - def on_receive(self, msg: UMessage) -> UStatus: common_uuri.logging.debug('on receive called') common_uuri.logging.debug(msg.payload.value) @@ -37,13 +35,13 @@ def on_receive(self, msg: UMessage) -> UStatus: client = UPClientZenoh(get_zenoh_default_config(), authority(), entity(ExampleType.SUBSCRIBER)) -def subscribeToZenoh(): +def subscribe_to_zenoh(): # create uuri uuri = UUri(entity=entity(ExampleType.PUBLISHER), resource=pub_resource()) client.register_listener(uuri, MyListener()) if __name__ == '__main__': - subscribeToZenoh() + subscribe_to_zenoh() while True: time.sleep(1) diff --git a/up_client_zenoh/upclientzenoh.py b/up_client_zenoh/upclientzenoh.py index 68533d9..71b7ce8 100644 --- a/up_client_zenoh/upclientzenoh.py +++ b/up_client_zenoh/upclientzenoh.py @@ -25,13 +25,11 @@ from typing import Dict, Tuple import zenoh -from uprotocol.proto.uattributes_pb2 import CallOptions -from uprotocol.proto.uattributes_pb2 import UAttributes, UMessageType -from uprotocol.proto.uattributes_pb2 import UPriority +from uprotocol.proto.uattributes_pb2 import CallOptions, UAttributes, UMessageType, UPriority from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload -from uprotocol.proto.uri_pb2 import UUri, UAuthority, UEntity -from uprotocol.proto.ustatus_pb2 import UStatus, UCode +from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat +from uprotocol.proto.uri_pb2 import UAuthority, UEntity, UUri +from uprotocol.proto.ustatus_pb2 import UCode, UStatus from uprotocol.rpc.rpcclient import RpcClient from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder from uprotocol.transport.ulistener import UListener @@ -39,7 +37,8 @@ from uprotocol.transport.validate.uattributesvalidator import Validators from uprotocol.uri.factory.uresource_builder import UResourceBuilder from uprotocol.uri.validator.urivalidator import UriValidator -from zenoh import open as zenoh_open, Subscriber, Encoding, Config, Query, Sample, Value, Queryable +from zenoh import Config, Encoding, Query, Queryable, Sample, Subscriber, Value +from zenoh import open as zenoh_open from up_client_zenoh.zenohutils import ZenohUtils @@ -48,7 +47,6 @@ class UPClientZenoh(UTransport, RpcClient): - def __init__(self, config: Config, uauthority: UAuthority, uentity: UEntity): self.session = zenoh_open(config) self.subscriber_map: Dict[Tuple[str, UListener], Subscriber] = {} @@ -97,8 +95,7 @@ def send_publish_notification(self, zenoh_key: str, payload: UPayload, attribute logging.debug(f"Attachment: {attachment}") encoding = Encoding.APP_CUSTOM().with_suffix(str(payload.format)) - put_builder = self.session.put(keyexpr=zenoh_key, encoding=encoding, value=buf, attachment=attachment, - priority=priority) + self.session.put(keyexpr=zenoh_key, encoding=encoding, value=buf, attachment=attachment, priority=priority) msg = "Successfully sent data to Zenoh" logging.debug(f"SUCCESS:{msg}") @@ -156,7 +153,9 @@ def zenoh_callback(reply: Query.reply) -> None: logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) # Create UMessage - msg = UMessage(attributes=u_attribute, payload=UPayload(length=0, format=encoding, value=sample.payload)) + msg = UMessage( + attributes=u_attribute, payload=UPayload(length=0, format=encoding, value=sample.payload) + ) resp_callback.on_receive(msg) else: msg = f"Error while parsing Zenoh reply: {reply.error}" @@ -167,8 +166,13 @@ def zenoh_callback(reply: Query.reply) -> None: ttl = attributes.ttl / 1000 if attributes.ttl is not None else 1000 value = Value(payload.value, encoding=Encoding.APP_CUSTOM().with_suffix(str(payload.format))) - self.session.get(zenoh_key, lambda reply: zenoh_callback(reply), target=zenoh.QueryTarget.BEST_MATCHING(), - value=value, timeout=ttl) + self.session.get( + zenoh_key, + lambda reply: zenoh_callback(reply), + target=zenoh.QueryTarget.BEST_MATCHING(), + value=value, + timeout=ttl, + ) msg = "Successfully sent rpc request to Zenoh" logging.debug(f"SUCCESS:{msg}") return UStatus(code=UCode.OK, message=msg) @@ -229,7 +233,6 @@ def callback(sample: Sample) -> None: # Create Zenoh subscriber try: - subscriber = self.session.declare_subscriber(zenoh_key, callback) if subscriber: with self.subscriber_lock: @@ -239,7 +242,7 @@ def callback(sample: Sample) -> None: msg = "Unable to register callback with Zenoh" logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) - except Exception as e: + except Exception: msg = "Unable to register callback with Zenoh" logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) @@ -285,7 +288,7 @@ def callback(query: Query) -> None: queryable = self.session.declare_queryable(zenoh_key, callback) with self.queryable_lock: self.queryable_map[(topic.SerializeToString(), listener)] = queryable - except Exception as e: + except Exception: msg = "Unable to register callback with Zenoh" logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) @@ -393,7 +396,12 @@ def _remove_request_listener(self, topic: UUri, listener: UListener) -> None: if self.queryable_map.pop((topic.SerializeToString(), listener), None) is None: raise ValueError("RPC request listener doesn't exist") - def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, ) -> Future: + def invoke_method( + self, + topic: UUri, + payload: UPayload, + options: CallOptions, + ) -> Future: try: # Validate UUri if not UriValidator.validate(topic): @@ -411,8 +419,9 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, ) zenoh_key = zenoh_key_result # Create UAttributes and put into Zenoh user attachment - uattributes = UAttributesBuilder.request(self.get_response_uuri(), topic, UPriority.UPRIORITY_CS4, - options.ttl).build() + uattributes = UAttributesBuilder.request( + self.get_response_uuri(), topic, UPriority.UPRIORITY_CS4, options.ttl + ).build() attachment = ZenohUtils.uattributes_to_attachment(uattributes) @@ -426,11 +435,16 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, ) value = Value(buf, encoding=Encoding.APP_CUSTOM().with_suffix(str(payload.format))) # Send the query - get_builder = self.session.get(zenoh_key, zenoh.Queue(), target=zenoh.QueryTarget.BEST_MATCHING(), value=value, - attachment=attachment, timeout=options.ttl / 1000) + get_builder = self.session.get( + zenoh_key, + zenoh.Queue(), + target=zenoh.QueryTarget.BEST_MATCHING(), + value=value, + attachment=attachment, + timeout=options.ttl / 1000, + ) for reply in get_builder.receiver: - if reply.is_ok: encoding = ZenohUtils.to_upayload_format(reply.ok.encoding) if not encoding: @@ -438,7 +452,9 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, ) logging.debug(f"{msg}") raise UStatus(code=UCode.INTERNAL, message=msg) - umessage = UMessage(attributes=uattributes, payload=UPayload(format=encoding, value=reply.ok.payload)) + umessage = UMessage( + attributes=uattributes, payload=UPayload(format=encoding, value=reply.ok.payload) + ) future = Future() future.set_result(umessage) return future diff --git a/up_client_zenoh/zenohutils.py b/up_client_zenoh/zenohutils.py index 4cea3c7..201d00a 100644 --- a/up_client_zenoh/zenohutils.py +++ b/up_client_zenoh/zenohutils.py @@ -22,11 +22,11 @@ import logging from typing import Union -from uprotocol.proto.uattributes_pb2 import UPriority, UAttributes +from uprotocol.proto.uattributes_pb2 import UAttributes, UPriority from uprotocol.proto.upayload_pb2 import UPayloadFormat from uprotocol.proto.uri_pb2 import UUri -from uprotocol.proto.ustatus_pb2 import UStatus, UCode -from zenoh import Priority, Encoding +from uprotocol.proto.ustatus_pb2 import UCode, UStatus +from zenoh import Encoding, Priority from zenoh.value import Attachment UATTRIBUTE_VERSION: int = 1 @@ -36,7 +36,6 @@ class ZenohUtils: - @staticmethod def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]: if uri.authority: @@ -83,12 +82,16 @@ def to_zenoh_key_string(uri: UUri) -> Union[str, UStatus]: @staticmethod def map_zenoh_priority(upriority: UPriority) -> Priority: - mapping = {UPriority.UPRIORITY_CS0 : Priority.BACKGROUND(), UPriority.UPRIORITY_CS1: Priority.DATA_LOW(), - UPriority.UPRIORITY_CS2 : Priority.DATA(), UPriority.UPRIORITY_CS3: Priority.DATA_HIGH(), - UPriority.UPRIORITY_CS4 : Priority.INTERACTIVE_LOW(), - UPriority.UPRIORITY_CS5 : Priority.INTERACTIVE_HIGH(), - UPriority.UPRIORITY_CS6 : Priority.REAL_TIME(), - UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW()} + mapping = { + UPriority.UPRIORITY_CS0: Priority.BACKGROUND(), + UPriority.UPRIORITY_CS1: Priority.DATA_LOW(), + UPriority.UPRIORITY_CS2: Priority.DATA(), + UPriority.UPRIORITY_CS3: Priority.DATA_HIGH(), + UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW(), + UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH(), + UPriority.UPRIORITY_CS6: Priority.REAL_TIME(), + UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW(), + } return mapping[upriority] @staticmethod @@ -128,7 +131,7 @@ def attachment_to_uattributes(attachment: Attachment) -> UAttributes: raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg) if not version_found: - msg = f"UAttributes version is missing in the attachment" + msg = "UAttributes version is missing in the attachment" logging.debug(msg) raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg)