-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvera_control_point.py
120 lines (81 loc) · 2.82 KB
/
vera_control_point.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Licensed under the MIT license
# http://opensource.org/licenses/mit-license.php or see LICENSE file.
# Copyright 2007-2008 Brisa Team <[email protected]>
# modified by Guido De Rosa
import time
from brisa.core.reactors import SelectReactor
reactor = SelectReactor()
from brisa.upnp.control_point import ControlPoint
from brisa.core.threaded_call import run_async_function
devices = []
def on_new_device(dev):
print 'Got new device:', dev
if dev.udn not in [d.udn for d in devices]:
devices.append(dev)
def on_removed_device(udn):
print 'Device is gone:', udn
for dev in devices:
if dev.udn == udn:
devices.remove(dev)
def create():
c = ControlPoint()
c.subscribe('new_device_event', on_new_device)
c.subscribe('removed_device_event', on_removed_device)
return c
def list_devices(devices):
count = 0
for d in devices:
print 'Device number: ', count
print_device(d, 1)
print
count += 1
def print_device(dev, indent=1):
print '\t'*indent, 'UDN (id): ', dev.udn
print '\t'*indent, 'Name: ', dev.friendly_name
print '\t'*indent, 'Type: ', dev.device_type
print '\t'*indent, 'Services: ', dev.services.keys()
print '\t'*indent, 'Embedded devices: '
for d in dev.devices.values():
if d.device_type == 'urn:schemas-upnp-org:device:BinaryLight:1':
turn_on_off(d)
if d.device_type == 'urn:schemas-micasaverde-com:device:TemperatureSensor:1':
print_temerature(d)
print_device(d, indent+1)
def print_temerature(d):
hadev = d.get_service_by_type('urn:schemas-micasaverde-com:service:HaDevice:1')
if (hasattr(hadev, 'Poll')):
print 'Poll'
print hadev.Poll()
time.sleep(2)
def turn_on_off(d):
switch_power = d.get_service_by_type('urn:schemas-upnp-org:service:SwitchPower:1')
if (hasattr(switch_power,'SetTarget')):
print "\nNow I am turning on/off the light!\n"
time.sleep(1)
switch_power.SetTarget(newTargetValue='1')
print switch_power.GetTarget()
time.sleep(2)
switch_power.SetTarget(newTargetValue='0')
print switch_power.GetTarget()
time.sleep(1)
def list_services(dev):
count = 0
for k, serv in dev.services.items():
print 'Service number: ', count
print 'Service id: ' + serv.id
print
count += 1
def main():
c = create() # create control point
c.start()
reactor.add_after_stop_func(c.stop)
run_async_function(run, (c, ))
reactor.main()
def run(c):
c.start_search(600, 'urn:schemas-micasaverde-com:device:HomeAutomationGateway:1') # async / threaded
time.sleep(1) # necessary...
list_devices(devices)
c.stop_search()
reactor.main_quit()
if __name__ == '__main__':
main()