-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHPC.py
34 lines (27 loc) · 918 Bytes
/
HPC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import collections
import itertools
import multiprocessing
class HPC(object):
"""
HPC
"""
def __init__(self, map_func, reduce_func, num_workers=None):
"""
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, mapped_values):
"""
"""
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
def __call__(self, inputs, chunksize=1):
"""
"""
map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize)
partitioned_data = self.partition(itertools.chain(*map_responses))
reduced_values = self.pool.map(self.reduce_func, partitioned_data)
return reduced_values