The following folder contains implementations of the L2 Communication Layer APIs, as per the uProtocol L2 API Specifications.
Interface | Implementation(s) | Description |
---|---|---|
Producers API to send publish or notification messages |
||
Client interface to invoke a method |
||
Server interface to register a listener for incoming RPC requests and automatically send a response |
||
Notification communication pattern APIs to notify and register a listener to receive the notifications |
||
All the above |
Single class that Implements all the interfaces above using the various implementations also from above |
The uP-L2 interfaces are designed to be used by uEs (applications and services) that rely on the communication patterns to talk with other uEs. The interfaces only require an implementation of uTransport passed by reference to the various APIs to perform the various communication patterns.
Note
|
Examples below will be using the UClient implementation.
|
transport = # your UTransport instance
#topic to publish
topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000)
publisher : Publisher = UClient(transport)
#send the publish message
publisher.publish(topic)
transport = # your UTransport instance
#URI of the method to be invoked
method_uri= UUri(ue_id=10, ue_version_major=1, resource_id=3)
payload = UPayload.pack_to_any(UUri())
options = CallOptions(2000, UPriority.UPRIORITY_CS5)
rpc_client : RpcClient = UClient(transport)
#Returns the asyncio Future with the response payload or raises an exception
with the failure reason as UStatus
await rpc_client.invoke_method(method_uri, payload, options)
transport = # your UTransport instance
#URI of the method to be invoked
uri= UUri(ue_id=10, ue_version_major=1, resource_id=3)
#Handler for processing requests
class MyRequestHandler(RequestHandler):
def handle_request(self, message: UMessage) -> UPayload:
# If your processing of the request was successful, you return the response message like
# return UPayload.EMPTY;
# If your processing of the request failed, you can raise a UStatusException passing the
# appropriate UCode and message such as:
# raise UStatusException(UCode.FAILED_PRECONDITION, "Failed to process the request")
# For this example, we will return an empty response
return UPayload.EMPTY
rpc_server: RpcServer = UClient(transport)
#Returns the asyncio Future with the response payload or raises an exception
#with the failure reason as UStatus
await rpc_server.register_request_handler(uri, MyRequestHandler())
transport = # your UTransport instance
#Notification topic
uri : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000)
#Destination for the notification
destination_uri : UUri = UUri( ue_id=3, ue_version_major=1)
notifier: Notifier = UClient(transport)
# Send the notification (without payload)
await notifier.notify(uri, destination_uri)
transport = # your UTransport instance
#Notification topic
uri : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000)
#Listener to process incoming events on the topic
class MyListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
# Handle receiving notifications here
pass
listener = MyListener()
notifier: Notifier = UClient(transport)
# Register listener to recieve notifications
await notifier.registerNotificationListener(uri, listener)