Skip to content

Commit

Permalink
Replace print statements with logging.debug
Browse files Browse the repository at this point in the history
  • Loading branch information
neelam-kushwah committed May 23, 2024
1 parent c0fb3f9 commit 2668a54
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 115 deletions.
33 changes: 0 additions & 33 deletions NOTICE.adoc

This file was deleted.

118 changes: 54 additions & 64 deletions up_client_zenoh/upclientzenoh.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import logging
from concurrent.futures import Future
from threading import Lock
from typing import Dict, Tuple, Any
from typing import Dict, Tuple

import zenoh
from uprotocol.proto.uattributes_pb2 import CallOptions
Expand All @@ -39,33 +39,27 @@
from uprotocol.transport.validate.uattributesvalidator import Validators
from uprotocol.uri.factory.uresource_builder import UResourceBuilder
from uprotocol.uri.validator.urivalidator import UriValidator
from zenoh import open as zenoh_open, Subscriber, Session, Encoding, Config, Query, Sample, Value, Queryable
from zenoh import open as zenoh_open, Subscriber, Encoding, Config, Query, Sample, Value, Queryable

from up_client_zenoh.zenohutils import ZenohUtils

logger = logging.getLogger(__name__)
# Configure the logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')


class UPClientZenoh(UTransport, RpcClient):

def __init__(self, session: Session, subscriber_map: Dict[Tuple[str, UListener], Subscriber],
queryable_map: Dict[Tuple[str, UListener], Queryable], query_map: Dict[str, Query],
rpc_callback_map: Dict[str, UListener], uauthority: UAuthority, uentity: UEntity) -> None:
self.session = session
self.subscriber_map = subscriber_map
self.queryable_map = queryable_map
self.query_map = query_map
self.rpc_callback_map = rpc_callback_map
def __init__(self, config: Config, uauthority: UAuthority, uentity: UEntity):
self.session = zenoh_open(config)
self.subscriber_map: Dict[Tuple[str, UListener], Subscriber] = {}
self.queryable_map: Dict[Tuple[str, UListener], Queryable] = {}
self.query_map: Dict[str, Query] = {}
self.rpc_callback_map: Dict[str, UListener] = {}
self.source_uuri = UUri(authority=uauthority, entity=uentity)
self.rpc_callback_lock = Lock()
self.queryable_lock = Lock()
self.subscriber_lock = Lock()

@classmethod
def new(cls, config: Config, uauthority: UAuthority, uentity: UEntity) -> 'UPClientZenoh':
session = zenoh_open(config)
return cls(session, {}, {}, {}, {}, uauthority, uentity)

def get_response_uuri(self) -> UUri:
new_source = self.source_uuri
new_source.resource.CopyFrom(UResourceBuilder.for_rpc_response())
Expand All @@ -76,7 +70,7 @@ def send_publish_notification(self, zenoh_key: str, payload: UPayload, attribute

if not payload.value:
msg = "The data in UPayload should be Data::Value"
print(f"ERROR: {msg}")
logging.debug(f"ERROR: {msg}")
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

buf = payload.value
Expand All @@ -85,59 +79,59 @@ def send_publish_notification(self, zenoh_key: str, payload: UPayload, attribute
attachment = ZenohUtils.uattributes_to_attachment(attributes)
if not attachment:
msg = "Unable to transform UAttributes to attachment"
print(f"ERROR: {msg}")
logging.debug(f"ERROR: {msg}")
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

# Map the priority to Zenoh
priority = ZenohUtils.map_zenoh_priority(attributes.priority)
if not priority:
msg = "Unable to map to Zenoh priority"
print(f"ERROR: {msg}")
logging.debug(f"ERROR: {msg}")
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

try:
# Simulate sending data
print(f"Sending data to Zenoh with key: {zenoh_key}")
print(f"Data: {buf}")
print(f"Priority: {priority}")
print(f"Attachment: {attachment}")
logging.debug(f"Sending data to Zenoh with key: {zenoh_key}")
logging.debug(f"Data: {buf}")
logging.debug(f"Priority: {priority}")
logging.debug(f"Attachment: {attachment}")
encoding = Encoding.APP_CUSTOM().with_suffix(str(payload.format))

put_builder = self.session.put(keyexpr=zenoh_key, encoding=encoding, value=buf, attachment=attachment,
priority=priority)

msg = "Successfully sent data to Zenoh"
print(f"SUCCESS:{msg}")
logging.debug(f"SUCCESS:{msg}")
return UStatus(code=UCode.OK, message=msg)
except Exception as e:
msg = f"Unable to send with Zenoh: {e}"
print(f"ERROR: {msg}")
logging.debug(f"ERROR: {msg}")
return UStatus(code=UCode.INTERNAL, message=msg)

def send_request(self, zenoh_key: str, payload: UPayload, attributes: UAttributes) -> UStatus:
data = payload.value
if not data:
msg = "The data in UPayload should be Data::Value"
print(f"ERROR: {msg}")
logging.debug(f"ERROR: {msg}")
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)
# Transform UAttributes to user attachment in Zenoh
attachment = ZenohUtils.uattributes_to_attachment(attributes)
if attachment is None:
msg = "Unable to transform UAttributes to attachment"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

# Retrieve the callback

if attributes.source is None:
msg = "Lack of source address"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

resp_callback = self.rpc_callback_map.get(attributes.source.SerializeToString())
if resp_callback is None:
msg = "Unable to get callback"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

def zenoh_callback(reply: Query.reply) -> None:
Expand All @@ -147,26 +141,26 @@ def zenoh_callback(reply: Query.reply) -> None:
encoding = ZenohUtils.to_upayload_format(sample.encoding)
if encoding is None:
msg = "Unable to get the encoding"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
# Get UAttribute from the attachment
attachment = sample.attachment
if attachment is None:
msg = "Unable to get the attachment"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

u_attribute = ZenohUtils.attachment_to_uattributes(attachment)
if u_attribute is None:
msg = "Transform attachment to UAttributes failed"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
# Create UMessage
msg = UMessage(attributes=u_attribute, payload=UPayload(length=0, format=encoding, value=sample.payload))
resp_callback.on_receive(msg)
else:
msg = f"Error while parsing Zenoh reply: {reply.error}"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

# Send query
Expand All @@ -176,7 +170,7 @@ def zenoh_callback(reply: Query.reply) -> None:
self.session.get(zenoh_key, lambda reply: zenoh_callback(reply), target=zenoh.QueryTarget.BEST_MATCHING(),
value=value, timeout=ttl)
msg = "Successfully sent rpc request to Zenoh"
print(f"SUCCESS:{msg}")
logging.debug(f"SUCCESS:{msg}")
return UStatus(code=UCode.OK, message=msg)

def send_response(self, payload: UPayload, attributes: UAttributes) -> UStatus:
Expand All @@ -187,20 +181,20 @@ def send_response(self, payload: UPayload, attributes: UAttributes) -> UStatus:
query = self.query_map.pop(reqid.SerializeToString(), None)
if not query:
msg = "Query doesn't exist"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg) # Send back the query
value = Value(payload.value, Encoding.APP_CUSTOM().with_suffix(str(payload.format)))
reply = Sample(query.key_expr, value, attachment=attachment)

try:
query.reply(reply)
msg = "Successfully sent rpc response to Zenoh"
print(f"SUCCESS:{msg}")
logging.debug(f"SUCCESS:{msg}")
return UStatus(code=UCode.OK, message=msg)

except Exception as e:
msg = "Unable to reply with Zenoh: {}".format(str(e))
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

def register_publish_notification_listener(self, topic: UUri, listener: UListener) -> UStatus:
Expand All @@ -213,13 +207,13 @@ def callback(sample: Sample) -> None:
attachment = sample.attachment
if attachment is None:
msg = "Unable to get attachment"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

u_attribute = ZenohUtils.attachment_to_uattributes(attachment)
if u_attribute is None:
msg = "Unable to decode attributes"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
# Create UPayload
format = ZenohUtils.to_upayload_format(sample.encoding)
Expand All @@ -230,7 +224,7 @@ def callback(sample: Sample) -> None:
listener.on_receive(msg)
else:
msg = "Unable to get payload encoding"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

# Create Zenoh subscriber
Expand All @@ -243,14 +237,14 @@ def callback(sample: Sample) -> None:

else:
msg = "Unable to register callback with Zenoh"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
except Exception as e:
msg = "Unable to register callback with Zenoh"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)
msg = "Successfully register callback with Zenoh"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.OK, message=msg)

def register_request_listener(self, topic: UUri, listener: UListener) -> UStatus:
Expand All @@ -261,21 +255,21 @@ def callback(query: Query) -> None:
attachment = query.attachment
if not attachment:
msg = "Unable to get attachment"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

u_attribute = ZenohUtils.attachment_to_uattributes(attachment)
if isinstance(u_attribute, UStatus):
msg = f"Unable to transform user attachment to UAttributes: {u_attribute}"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

value = query.value
if value:
encoding = ZenohUtils.to_upayload_format(value.encoding)
if not encoding:
msg = "Unable to get payload encoding"
print(msg)
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

u_payload = UPayload(format=encoding, value=value.payload)
Expand All @@ -287,18 +281,14 @@ def callback(query: Query) -> None:
self.query_map[u_attribute.id.SerializeToString()] = query
listener.on_receive(msg)

def declare_queryable(callback: Any) -> Any:
nonlocal self, zenoh_key, topic, listener
try:
queryable = self.session.declare_queryable(zenoh_key, callback)
with self.queryable_lock:
self.queryable_map[(topic.SerializeToString(), listener)] = queryable
except Exception as e:
msg = "Unable to register callback with Zenoh"
print(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

declare_queryable(callback)
try:
queryable = self.session.declare_queryable(zenoh_key, callback)
with self.queryable_lock:
self.queryable_map[(topic.SerializeToString(), listener)] = queryable
except Exception as e:
msg = "Unable to register callback with Zenoh"
logging.debug(msg)
return UStatus(code=UCode.INTERNAL, message=msg)

return UStatus(code=UCode.OK, message="Successfully register callback with Zenoh")

Expand Down Expand Up @@ -408,14 +398,14 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, )
# Validate UUri
if not UriValidator.validate(topic):
msg = "Invalid UUri for invoke_method"
print(f"{msg}")
logging.debug(f"{msg}")
raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

# Get Zenoh key
zenoh_key_result = ZenohUtils.to_zenoh_key_string(topic)
if isinstance(zenoh_key_result, UStatus):
msg = "Unable to transform UUri to Zenoh key"
print(f"{msg}")
logging.debug(f"{msg}")
raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

zenoh_key = zenoh_key_result
Expand All @@ -429,7 +419,7 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, )
# Get the data from UPayload
if not payload.value:
msg = "The data in UPayload should be Data::Value"
print(f"{msg}")
logging.debug(f"{msg}")
raise UStatus(code=UCode.INVALID_ARGUMENT, message=msg)

buf = payload.value
Expand All @@ -445,7 +435,7 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, )
encoding = ZenohUtils.to_upayload_format(reply.ok.encoding)
if not encoding:
msg = "Error while parsing Zenoh encoding"
print(f"{msg}")
logging.debug(f"{msg}")
raise UStatus(code=UCode.INTERNAL, message=msg)

umessage = UMessage(attributes=uattributes, payload=UPayload(format=encoding, value=reply.ok.payload))
Expand All @@ -454,10 +444,10 @@ def invoke_method(self, topic: UUri, payload: UPayload, options: CallOptions, )
return future
else:
msg = f"Error while parsing Zenoh reply: {reply.err}"
print(f"{msg}")
logging.debug(f"{msg}")
raise UStatus(code=UCode.INTERNAL, message=msg)

except Exception as e:
msg = f"Unexpected error: {e}"
print(f"{msg}")
logging.debug(f"{msg}")
raise UStatus(code=UCode.INTERNAL, message=msg)
Loading

0 comments on commit 2668a54

Please sign in to comment.