-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmatch_pair.py
executable file
·58 lines (51 loc) · 1.97 KB
/
match_pair.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/bin/python
import argparse
import csv
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor as Pool
from loguru import logger
from tqdm.auto import tqdm
from matcher import match, name_preprocessing
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("file")
parser.add_argument("-o", "--output", default='match_result.csv')
parser.add_argument("-c", "--cpu", default=1, type=float)
parser.add_argument("--debug", action='store_true')
args = parser.parse_args()
def do(row):
a, b = row
if a[1] and b[1]:
if match(a[1], b[1]) > 0:
return a[0], b[0]
def process_chunk(wr, chunk, pbar):
with Pool(int(cpu_count()*args.cpu)) as p:
chunksize = len(chunk)//10000 + 1
d = list(set([x[0] for x in chunk]) | set([x[1] for x in chunk]))
d = {k: v for k, v in zip(
d, p.map(name_preprocessing, d, chunksize=chunksize))}
a = [(x[0], d[x[0]]) for x in chunk]
b = [(x[1], d[x[1]]) for x in chunk]
for res in p.map(do, zip(a, b), chunksize=chunksize):
pbar.update(1)
if res:
wr.writerow(res)
logger.info(f"running on {int(cpu_count()*args.cpu)} of CPUs")
num_lines = sum(1 for _ in open(args.file))
logger.info(f"the file has {num_lines} lines")
pbar = tqdm(total=num_lines)
with open(args.output, 'w') as wf:
wr = csv.writer(wf)
wr.writerow(['a', 'b'])
with open(args.file) as f:
rd = csv.reader(f)
chunk = []
for i, row in enumerate(rd):
chunk.append(row)
# if i % 1_000_000 == 0:
# ic(len(chunk))
if (i % 10_000_000 == 0) and (i > 0):
process_chunk(wr, chunk, pbar)
chunk = []
if chunk:
process_chunk(wr, chunk, pbar)