forked from bluez/bluez
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: Add test app for Battery Provider API
The python test app simulates an application registering to BlueZ as a Battery Provider providing three fake batteries drained periodically. Reviewed-by: Miao-chen Chou <[email protected]>
- Loading branch information
Showing
1 changed file
with
232 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
#!/usr/bin/env python3 | ||
# SPDX-License-Identifier: LGPL-2.1-or-later | ||
|
||
import dbus | ||
import dbus.exceptions | ||
import dbus.mainloop.glib | ||
import dbus.service | ||
|
||
try: | ||
from gi.repository import GObject | ||
except ImportError: | ||
import gobject as GObject | ||
import sys | ||
|
||
mainloop = None | ||
app = None | ||
bus = None | ||
|
||
BLUEZ_SERVICE_NAME = 'org.bluez' | ||
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' | ||
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' | ||
|
||
BATTERY_PROVIDER_MANAGER_IFACE = 'org.bluez.BatteryProviderManager1' | ||
BATTERY_PROVIDER_IFACE = 'org.bluez.BatteryProvider1' | ||
BATTERY_PROVIDER_PATH = '/path/to/provider' | ||
|
||
BATTERY_PATH1 = '11_11_11_11_11_11' | ||
BATTERY_PATH2 = '22_22_22_22_22_22' | ||
BATTERY_PATH3 = '33_33_33_33_33_33' | ||
|
||
class InvalidArgsException(dbus.exceptions.DBusException): | ||
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' | ||
|
||
|
||
class Application(dbus.service.Object): | ||
def __init__(self, bus): | ||
self.path = BATTERY_PROVIDER_PATH | ||
self.services = [] | ||
self.batteries = [] | ||
dbus.service.Object.__init__(self, bus, self.path) | ||
|
||
def get_path(self): | ||
return dbus.ObjectPath(self.path) | ||
|
||
def add_battery(self, battery): | ||
self.batteries.append(battery) | ||
self.InterfacesAdded(battery.get_path(), battery.get_properties()) | ||
GObject.timeout_add(1000, drain_battery, battery) | ||
|
||
def remove_battery(self, battery): | ||
self.batteries.remove(battery) | ||
self.InterfacesRemoved(battery.get_path(), [BATTERY_PROVIDER_IFACE]) | ||
|
||
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') | ||
def GetManagedObjects(self): | ||
response = {} | ||
print('GetManagedObjects called') | ||
|
||
for battery in self.batteries: | ||
response[battery.get_path()] = battery.get_properties() | ||
|
||
return response | ||
|
||
@dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}') | ||
def InterfacesAdded(self, object_path, interfaces_and_properties): | ||
return | ||
|
||
@dbus.service.signal(DBUS_OM_IFACE, signature='oas') | ||
def InterfacesRemoved(self, object_path, interfaces): | ||
return | ||
|
||
|
||
class Battery(dbus.service.Object): | ||
""" | ||
org.bluez.BatteryProvider1 interface implementation | ||
""" | ||
def __init__(self, bus, dev, percentage, source = None): | ||
self.path = BATTERY_PROVIDER_PATH + '/dev_' + dev | ||
self.dev_path = '/org/bluez/hci0/dev_' + dev | ||
self.bus = bus | ||
self.percentage = percentage | ||
self.source = source | ||
dbus.service.Object.__init__(self, bus, self.path) | ||
|
||
def get_battery_properties(self): | ||
properties = {} | ||
if self.percentage != None: | ||
properties['Percentage'] = dbus.Byte(self.percentage) | ||
if self.source != None: | ||
properties['Source'] = self.source | ||
properties['Device'] = dbus.ObjectPath(self.dev_path) | ||
return properties | ||
|
||
def get_properties(self): | ||
return { BATTERY_PROVIDER_IFACE: self.get_battery_properties() } | ||
|
||
def get_path(self): | ||
return dbus.ObjectPath(self.path) | ||
|
||
def set_percentage(self, percentage): | ||
if percentage < 0 or percentage > 100: | ||
print('percentage not valid') | ||
return | ||
|
||
self.percentage = percentage | ||
print('battery %s percentage %d' % (self.path, self.percentage)) | ||
self.PropertiesChanged( | ||
BATTERY_PROVIDER_IFACE, self.get_battery_properties()) | ||
|
||
@dbus.service.method(DBUS_PROP_IFACE, | ||
in_signature='s', | ||
out_signature='a{sv}') | ||
def GetAll(self, interface): | ||
if interface != BATTERY_PROVIDER_IFACE: | ||
raise InvalidArgsException() | ||
|
||
return self.get_properties()[BATTERY_PROVIDER_IFACE] | ||
|
||
@dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}') | ||
def PropertiesChanged(self, interface, properties): | ||
return | ||
|
||
|
||
def add_late_battery(): | ||
app.add_battery(Battery(bus, BATTERY_PATH3, 70, 'Protocol 2')) | ||
|
||
|
||
def drain_battery(battery): | ||
new_percentage = 100 | ||
if battery.percentage != None: | ||
new_percentage = battery.percentage - 5 | ||
if new_percentage < 0: | ||
new_percentage = 0 | ||
|
||
battery.set_percentage(new_percentage) | ||
|
||
if new_percentage <= 0: | ||
return False | ||
|
||
return True | ||
|
||
def register_provider_cb(): | ||
print('Battery Provider registered') | ||
|
||
# Battery added early right after RegisterBatteryProvider succeeds | ||
app.add_battery(Battery(bus, BATTERY_PATH2, None)) | ||
# Battery added later | ||
GObject.timeout_add(5000, add_late_battery) | ||
|
||
|
||
def register_provider_error_cb(error): | ||
print('Failed to register Battery Provider: ' + str(error)) | ||
mainloop.quit() | ||
|
||
|
||
def find_manager(bus): | ||
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), | ||
DBUS_OM_IFACE) | ||
objects = remote_om.GetManagedObjects() | ||
|
||
for o, props in objects.items(): | ||
if BATTERY_PROVIDER_MANAGER_IFACE in props.keys(): | ||
return o | ||
|
||
return None | ||
|
||
|
||
def unregister_provider_cb(): | ||
print('Battery Provider unregistered') | ||
|
||
|
||
def unregister_provider_error_cb(error): | ||
print('Failed to unregister Battery Provider: ' + str(error)) | ||
|
||
|
||
def unregister_battery_provider(battery_provider_manager): | ||
battery_provider_manager.UnregisterBatteryProvider(BATTERY_PROVIDER_PATH, | ||
reply_handler=unregister_provider_cb, | ||
error_handler=unregister_provider_error_cb) | ||
|
||
|
||
def remove_battery(app, battery): | ||
app.remove_battery(battery) | ||
|
||
|
||
""" | ||
Simulates an application registering to BlueZ as a Battery Provider providing | ||
fake batteries drained periodically. | ||
""" | ||
def main(): | ||
global mainloop, bus, app | ||
|
||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | ||
|
||
bus = dbus.SystemBus() | ||
|
||
manager_path = find_manager(bus) | ||
if not manager_path: | ||
print('BatteryProviderManager1 interface not found') | ||
return | ||
|
||
print('BatteryProviderManager1 path = ', manager_path) | ||
|
||
battery_provider_manager = dbus.Interface( | ||
bus.get_object(BLUEZ_SERVICE_NAME, manager_path), | ||
BATTERY_PROVIDER_MANAGER_IFACE) | ||
|
||
app = Application(bus) | ||
|
||
# Battery pre-added before RegisterBatteryProvider | ||
battery1 = Battery(bus, BATTERY_PATH1, 87, 'Protocol 1') | ||
app.add_battery(battery1) | ||
|
||
mainloop = GObject.MainLoop() | ||
|
||
print('Registering Battery Provider...') | ||
|
||
battery_provider_manager.RegisterBatteryProvider(BATTERY_PROVIDER_PATH, | ||
reply_handler=register_provider_cb, | ||
error_handler=register_provider_error_cb) | ||
|
||
# Unregister the Battery Provider after an arbitrary amount of time | ||
GObject.timeout_add( | ||
12000, unregister_battery_provider, battery_provider_manager) | ||
# Simulate battery removal by a provider | ||
GObject.timeout_add(8000, remove_battery, app, battery1) | ||
|
||
mainloop.run() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |