-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsvcomm
executable file
·236 lines (204 loc) · 7.52 KB
/
csvcomm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python3
"""Find the first common cell in a column shared between two CSV files
Assumptions:
1. The CSV files are very large (millions of records)
2. The two CSV files are almost the same
3. The common row is near the beginning of each CSV file
4. Small substrings of records are likely to be unique (enables using a sliding window)
Example workflow:
$ csvcomm -f trace1.csv trace2.csv -c data
Found common row indices:
trace1.csv: 0
trace2.csv: 723
# will also drop header, so prepend it again to the output
$ echo "timestamp,id,data" >trace2-common.csv
$ csvtool drop 724 trace2.csv >>trace2-common.csv
# Grab just the id and data columns for use in diff
$ csvtool namedcol id,data trace1-common.csv >trace1-data.csv
$ csvtool namedcol id,data trace2-common.csv >trace2-data.csv
# Use delta for better word diffing, side-by-side, and colored diffs
$ diff -u trace1-data.csv trace2-data.csv | delta
"""
import argparse
import csv
import logging
import sys
import unittest
from typing import List, Optional, T, Tuple
from csvutils import column_name_to_index, detect_dialect
def parse_args():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--log-level",
"-l",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging output level. Defaults to INFO.",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--files",
"-f",
type=argparse.FileType("r"),
nargs=2,
metavar=("FILE1", "FILE2"),
help="The CSV files to compare",
)
group.add_argument(
"--self-test",
default=None,
nargs="*",
help="Run builtin unit tests",
)
parser.add_argument(
"--column",
"-c",
default=1,
help="The data column to compare for equality, defaults to the second column",
)
parser.add_argument(
"--window-size",
"-w",
type=int,
default=10,
help="The sliding window size used to find the first common row",
)
parser.add_argument(
"--early-search-size",
type=int,
help="Assume it's probably the common row is within the first N records of each file",
)
parser.add_argument(
"--no-header",
action="store_true",
default=False,
help="Whether the CSV file has a header",
)
parser.add_argument(
"--delimiter",
"-d",
default=",",
help="Specify the column delimiter. Default is ','",
)
return parser.parse_args()
def csv_reader(args, input):
dialect, has_header, input = detect_dialect(args, input)
reader = csv.reader(input, dialect)
header = None
if has_header:
header = next(reader)
column_index = column_name_to_index(args.column, header)
if has_header:
if column_index >= len(header):
logging.critical(
"Given column '%s' not found in header '%s'", args.column, ",".join(header)
)
sys.exit(1)
return reader, column_index
def find_first_common_row(
col1: List[T],
col2: List[T],
window_size: int,
early_chunk_size: Optional[int] = None,
) -> Optional[Tuple[int, int]]:
# The data I care about has the following properties that could be taken advantage of to make
# this test cheaper:
#
# 1. Filter out timestamps that are clearly not common
# 2. The first row in one of the files is likely to be common
# 3. The first common row is likely to be early on in the data set
# Do O(n^2) search on small n, because it's likely the match is early on in the columns
if early_chunk_size is not None:
lim1, lim2 = min(len(col1), early_chunk_size), min(len(col2), early_chunk_size)
for idx1 in range(0, lim1):
if lim1 - idx1 < window_size:
break
for idx2 in range(0, lim2):
if lim2 - idx2 < window_size:
break
if col1[idx1 : idx1 + window_size] == col2[idx2 : idx2 + window_size]:
return (idx1, idx2)
logging.warning(
"Early search optimization failed (N=%d). Falling back on brute force", early_chunk_size
)
# Fall back on starting O(n^2) over from the beginning with the full range. We have to start
# over, so that we hit every possibility
lim1, lim2 = len(col1), len(col2)
for idx1 in range(0, lim1):
if lim1 - idx1 < window_size:
break
for idx2 in range(0, lim2):
if lim2 - idx2 < window_size:
break
# slices allocate references to original list, but don't copy the list itself
if col1[idx1 : idx1 + window_size] == col2[idx2 : idx2 + window_size]:
return (idx1, idx2)
return None
class FindCommonRowTests(unittest.TestCase):
def test_common_is_first(self):
col1 = [1, 2, 3, 4]
col2 = [1, 2, 3, 4]
actual = find_first_common_row(col1, col2, window_size=2)
self.assertEqual(actual, (0, 0))
def test_no_common(self):
col1 = [1, 2, 3, 4]
col2 = [1, 3, 2, 4]
actual = find_first_common_row(col1, col2, window_size=2)
self.assertEqual(actual, None)
def test_common_on_early_search_boundary(self):
# fmt: off
# |---early search chunk-|
# |match|
col1 = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
col2 = ['a', 'b', 'c', 'd', 4, 5, 6, 'h', 'i', 'j']
# fmt: on
actual = find_first_common_row(col1, col2, window_size=3, early_chunk_size=6)
self.assertEqual(actual, (4, 4))
def test_ragged(self):
# fmt: off
col1 = ['z', 1, 2, 3, 4]
col2 = ["a", "b", "c", "d", "e", "f", "g", "h", 0, 1, 2, 3, 4]
# fmt: on
expected = (1, 9)
actual = find_first_common_row(col1, col2, window_size=2, early_chunk_size=3)
self.assertEqual(actual, expected)
def main(args):
file1, file2 = args.files
name1, name2 = file1.name, file2.name
rows1, col_idx1 = csv_reader(args, file1)
rows2, col_idx2 = csv_reader(args, file2)
logging.debug("Reading data ...")
col1, col2 = [r[col_idx1] for r in rows1], [r[col_idx2] for r in rows2]
min_length = min(len(col1), len(col2))
window_size = min(args.window_size, min_length)
logging.debug("Finding first common row")
common = find_first_common_row(col1, col2, window_size, args.early_search_size)
if common is None:
logging.critical("Failed to find common row between %s and %s", name1, name2)
sys.exit(1)
common1, common2 = common
print(f"Found common row indices:\n\t{name1}: {common1}\n\t{name2}: {common2}")
if __name__ == "__main__":
args = parse_args()
fmt = "%(module)s - %(levelname)s: %(message)s"
logging.basicConfig(
format=fmt,
level=args.log_level,
stream=sys.stderr,
)
# Color log output if possible, because I'm a sucker
try:
import coloredlogs
coloredlogs.install(fmt=fmt, level=args.log_level)
except ImportError:
pass
if args.self_test is not None:
argv = sys.argv[0:1]
if len(args.self_test) > 0:
argv = sys.argv[0:1] + ["-k"] + args.self_test
unittest.main(argv=argv)
else:
main(args)