-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsignal_accumulator.py
68 lines (58 loc) · 2.06 KB
/
signal_accumulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/python2
# -*- coding: utf-8 -*-
#
# NightOwl - who tests your unit tests?
#
# Copyright (C) 2012 DResearch Fahrzeugelektronik GmbH
# Written and maintained by Erik Bernoth <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version
# 2 of the License, or (at your option) any later version.
#
"""
manages a group of like minded signals
"""
from __future__ import print_function
import sys
import re
import argparse
import json
from constants import *
class SignalIgnoredWarning(Warning):
pass
class SignalAccumulator(object):
"""
maintains a group of signals
"""
def __init__(self, type_, attempt=None, count=None)
self.type_ = type_
self.attempt = attempt if attempt else -1
self.count = count if count else 0
def add_signal(self,signal):
if signal.attempt != self.attempt:
raise SignalIgnoredWarning("Attempt {} isn't mine ({}) -> ignored".format(
signal.attempt,
self.attempt
))
else:
self.count +=1
def add_signals(self,signal_iterator):
map(self.add_signal,signal_iterator)
def make_many(signal_iterator):
"""
can be used, when you have a long list of signals of different attempts and
you want to automatically create a list of :py:class:SignalAccumulator
objects for each attempt.
:param signal_iterator: should deliver :py:class:signal.Signal objects
:return: a generator for all :py:class:SignalAccumulator objects
"""
accumulators = {}
for signal in signal_iterator:
if signal.attempt not in accumulators:
accumulators[signal.attempt] = SignalAccumulator(signal.attempt)
accumulators[signal.attempt].add_signal(signal)
keys = accumulators.keys()
keys.sort()
return (k, accumulators[k] for k in keys)