-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRpiGpioObject.py
91 lines (67 loc) · 2.17 KB
/
RpiGpioObject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import RPi.GPIO as GPIO
from GpioExceptions import *
import threading
GPIO.setmode(GPIO.BOARD)
INPUT = True
OUTPUT = False
class RpiGpioPin(object):
"""The wrapper for an individual pin on the RPi"""
def __init__(self,pinNum,ioType):
self.pinNum = pinNum
self.ioType = ioType
GPIO.setup(pinNum, ioType)
def get(self):
if self.ioType:
return GPIO.input(self.pinNum)
else:
raise InvalidPinAccessException(self.pinNum,ioType)
def set(self,val):
if self.ioType:
raise InvalidPinAccessException(self.pinNum,ioType)
else:
GPIO.output(self.pinNum,val)
class PinManager(object):
"""The manages the instantiation and destruction of pins"""
validPins = {8, 10, 12, 16, 18, 22, 24, 26, 3, 5, 7, 11, 13, 15, 19, 21, 23}
def __init__(self):
self.pins = dict()
def registerDeviceOnPin(self, pinNum, ioType):
#test against valid pins
if pinNum not in self.validPins:
raise InvalidPinException(pinNum)
#test for IO conflicts
if (pinNum in self.pins) and (self.pins[pinNum].ioType is ioType):
raise PinConflictException(pinNum,ioType)
#register the device
self.pins[pinNum] = self.pins.get(pinNum,[RpiGpioPin(pinNum,ioType),0])
self.pins[pinNum][1] += 1
#TODO perhaps in the future all of the GPIO devices will have a device id
#and we will actually track each of these device ids
return self.pins[pinNum][0]
def unregisterDeviceOnPin(self, pinNum):
#test against instantiated pins
if pinNum not in self.pins:
raise GhostPinException
self.pins[pinNum][1] -= 1
#if the key is unregistered from all of its users delete it
if self.pins[pinNum][1] == 0:
del self.pins[pinNum]
class RpiGpioDevice(object):
"""The parent object for all GPIO devices"""
manager = PinManager()
def __init__(self, pinLayout):
self.pins = []
self.thread = None
for pin, ioType in pinLayout.items():
self.pins.append(self.manager.registerDeviceOnPin(pin,ioType))
def __call__(self):
self.run()
def run(self):
raise NotImplementedError
def start(self):
self.thread = threading.Thread(target = self)
self.thread.start()
def __del__(self):
#unregister all of the pins
for pin in self.pins:
self.manager.unregisterDeviceOnPin(pin.pinNum)