-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
reorganize tests, add test data use raw strings for regex (fix SyntaxWarning: invalid escape sequence '\d')
- Loading branch information
Showing
36 changed files
with
701 additions
and
563 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
from contextlib import redirect_stdout | ||
from io import StringIO | ||
import shlex | ||
from subprocess import PIPE, run | ||
import sys | ||
import os | ||
import logging | ||
from pathlib import Path | ||
from stat import S_IRGRP, S_IRUSR | ||
from typing import List, Union | ||
from unittest import TestCase | ||
|
||
sys.path.append(str(Path(__file__).parent.parent)) | ||
|
||
from convey.controller import Controller | ||
from convey.dialogue import Cancelled | ||
|
||
logging.basicConfig(stream=sys.stderr, level=logging.WARNING) | ||
|
||
# to evade project folder pollution, chdir to a temp folder | ||
PROJECT_DIR = Path.cwd() | ||
# temp = TemporaryDirectory() XX As the output folder appears in the file folder, this has diminished effect. | ||
# os.chdir(temp.name) | ||
# os.chdir("tests") | ||
|
||
TESTDATA_DIR = Path("tests") / Path("test_data") | ||
|
||
def p(s): | ||
"""all mentioned resources files are in the tests folder""" | ||
return PROJECT_DIR / TESTDATA_DIR / Path(s) | ||
|
||
|
||
HELLO_B64 = "aGVsbG8=" | ||
SHEET_CSV = p("sheet.csv") | ||
GIF_CSV = p("gif.csv") | ||
PERSON_CSV = p("person.csv") | ||
PERSON_XLS = p("person.xls") | ||
PERSON_XLSX = p("person.xlsx") | ||
PERSON_ODS = p("person.ods") | ||
COMBINED_SHEET_PERSON = p("combined_sheet_person.csv") | ||
PERSON_HEADER_CSV = p("person_header.csv") | ||
COMBINED_LIST_METHOD = p("combined_list_method.csv") | ||
SHEET_DUPLICATED_CSV = p("sheet_duplicated.csv") | ||
SHEET_HEADER_CSV = p("sheet_header.csv") | ||
SHEET_HEADER_ITSELF_CSV = p("sheet_header_itself.csv") | ||
SHEET_HEADER_PERSON_CSV = p("sheet_header_person.csv") | ||
SHEET_PERSON_CSV = p("sheet_person.csv") | ||
PERSON_GIF_CSV = p("person_gif.csv") | ||
CONSUMPTION = p("consumption.csv") | ||
p("red-permission.gif").chmod(S_IRUSR | S_IRGRP) # make file unreadable to others | ||
|
||
|
||
class Convey: | ||
"""While we prefer to check the results with .check method | ||
(quicker, directly connected with the internals of the library), | ||
this method is able to test piping and interprocess communication. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
*args, | ||
filename: Union[str, Path] = None, | ||
text=None, | ||
whois=False, | ||
debug=None, | ||
): | ||
"""It is important that an input is flagged with --file or --input when performing tests | ||
because otherwise, main() would hang on `not sys.stdin.isatty() -> sys.stdin.read()` | ||
:type args: object | ||
""" | ||
self.debug = debug | ||
|
||
# XX travis will not work will daemon=true (which imposes slow testing) | ||
self.cmd = [ | ||
str(PROJECT_DIR / "convey.py"), | ||
"--output", | ||
"--reprocess", | ||
"--headless", | ||
"--daemon", | ||
"false", | ||
"--debug", | ||
"false", | ||
"--crash-post-mortem", | ||
"false", | ||
] | ||
if ( | ||
filename is None | ||
and not text | ||
and len(args) == 1 | ||
and not str(args[0]).startswith("-") | ||
): | ||
filename = args[0] | ||
args = None | ||
if filename: | ||
if not Path(filename).exists(): | ||
raise FileNotFoundError(filename) | ||
self.cmd.extend(("--file", str(filename))) | ||
if text: | ||
self.cmd.extend(("--input", text)) | ||
|
||
self.has_filename = bool(filename) | ||
self.has_text = bool(text) | ||
if not whois: | ||
self.cmd.extend(("--whois-cache", "false")) | ||
if args: | ||
self.cmd.extend(args) | ||
|
||
def __call__(self, cmd="", text=None, debug=None, piped_text=None): | ||
if debug is not None: | ||
self.debug = debug | ||
if not any( | ||
(self.has_filename, self.has_text, piped_text) | ||
) and not cmd.startswith("-"): | ||
cmd = "--input " + cmd | ||
|
||
cmd = [*self.cmd, *shlex.split(cmd)] | ||
if text: | ||
cmd.extend(("--input", text)) | ||
if self.debug: | ||
print(" ".join(cmd)) | ||
# run: blocking, output | ||
input_ = piped_text.encode() if piped_text else None | ||
lines = ( | ||
run(cmd, input=input_, stdout=PIPE, timeout=3) | ||
.stdout.decode("utf-8") | ||
.splitlines() | ||
) | ||
if self.debug: | ||
print(lines) | ||
if lines and lines[-1] == "\x1b[0m": | ||
# colorama put this reset string at the end. I am not able to reproduce it in bash, only in Python piping. | ||
lines = lines[:-1] | ||
return lines | ||
|
||
|
||
class TestAbstract(TestCase): | ||
maxDiff = None | ||
|
||
|
||
|
||
def check( | ||
self, | ||
check: Union[List, str, None], | ||
cmd: str = "", | ||
text=None, | ||
filename: Union[str, Path] = None, | ||
debug=None, | ||
): | ||
# o = Convey(filename=filename, text=text, debug=debug)(cmd) | ||
args = [ | ||
"--output", | ||
"--reprocess", | ||
"--headless", | ||
"--daemon", | ||
"false", | ||
"--debug", | ||
"false", | ||
"--crash-post-mortem", | ||
"false", | ||
] | ||
if filename: | ||
args.extend(("--file", str(filename))) | ||
if text: | ||
args.extend(("--input", text)) | ||
args.extend(shlex.split(cmd)) | ||
|
||
if isinstance(check, Path): | ||
check = Path(check).read_text().splitlines() | ||
if debug: | ||
print("convey", " ".join(args)) | ||
print(check) | ||
info = ("Cmd", "convey " + " ".join(args), "Check", check) | ||
|
||
with redirect_stdout(StringIO()) as buf: | ||
c = Controller() | ||
try: | ||
c.run(given_args=args) | ||
except SystemExit as e: | ||
if e.code: | ||
raise AssertionError(f"Bad exit code: {e.code}") | ||
except Cancelled as e: | ||
print(str(e)) | ||
except Exception as e: | ||
raise Exception(*info) from e | ||
finally: | ||
c.cleanup() | ||
o = buf.getvalue().splitlines() | ||
|
||
try: | ||
if isinstance(check, list): | ||
self.assertListEqual(check, o) | ||
elif check == "": # check empty output | ||
self.assertFalse(o) | ||
elif check is None: # we do not want to do any checks | ||
pass | ||
elif not len(o): | ||
raise AssertionError(f"Output too short: {o}") | ||
else: | ||
self.assertEqual(check, o[0]) | ||
except AssertionError as e: | ||
raise AssertionError(*info) from e | ||
|
||
return c |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
import os | ||
from pathlib import Path | ||
from shared import ( | ||
CONSUMPTION, | ||
COMBINED_LIST_METHOD, | ||
COMBINED_SHEET_PERSON, | ||
GIF_CSV, | ||
PERSON_CSV, | ||
PERSON_GIF_CSV, | ||
PERSON_HEADER_CSV, | ||
SHEET_CSV, | ||
SHEET_HEADER_CSV, | ||
SHEET_HEADER_ITSELF_CSV, | ||
SHEET_HEADER_PERSON_CSV, | ||
SHEET_PERSON_CSV, | ||
TestAbstract, | ||
Convey, | ||
p, | ||
) | ||
|
||
|
||
class TestAction(TestAbstract): | ||
def test_aggregate(self): | ||
self.check( | ||
["sum(price)", "972.0"], f"--aggregate price,sum", filename=CONSUMPTION | ||
) | ||
self.check( | ||
["category,sum(price)", "total,972.0", "kettle,602.0", "bulb,370.0"], | ||
f"--aggregate price,sum,category", | ||
filename=CONSUMPTION, | ||
) | ||
self.check( | ||
[ | ||
"category,sum(price),avg(consumption)", | ||
"total,972.0,41.0", | ||
"kettle,602.0,75.0", | ||
"bulb,370.0,18.33", | ||
], | ||
f"--aggregate price,sum,consumption,avg,category", | ||
filename=CONSUMPTION, | ||
) | ||
self.check( | ||
[ | ||
"category,sum(price),list(price)", | ||
"total,972.0,(all)", | ||
'''kettle,602.0,"['250', '352']"''', | ||
'''bulb,370.0,"['100', '150', '120']"''', | ||
], | ||
f"--aggregate price,sum,price,list,category", | ||
filename=CONSUMPTION, | ||
) | ||
|
||
# XX this will correctly split the files, | ||
# however, the output is poor and for a reason not readable by the check. | ||
# self.check(['','Split location: bulb','','Split location: kettle'], | ||
# "--agg price,sum --split category", filename=CONSUMPTION) | ||
# Until then, following substitution is used to generate the files at least | ||
Convey(filename=CONSUMPTION)("--agg price,sum --split category") | ||
|
||
# Check the contents of the files that just have been split | ||
check1 = False | ||
check2 = False | ||
for f in Path().rglob("consumption.csv_convey*/*"): | ||
if f.name == "kettle" and f.read_text() == "sum(price)\n602.0\n": | ||
check1 = True | ||
if f.name == "bulb" and f.read_text() == "sum(price)\n370.0\n": | ||
check2 = True | ||
self.assertTrue(check1) | ||
self.assertTrue(check2) | ||
|
||
def test_aggregate_group_col(self): | ||
# group by a column without any additional info means counting | ||
self.check( | ||
[ | ||
"price,count(price)", | ||
"total,5", | ||
"100,1", | ||
"150,1", | ||
"250,1", | ||
"352,1", | ||
"120,1", | ||
], | ||
f"-a price", | ||
filename=CONSUMPTION, | ||
) | ||
|
||
# group by the same column works | ||
self.check( | ||
[ | ||
"price,sum(price)", | ||
"total,972.0", | ||
"352,352.0", | ||
"250,250.0", | ||
"150,150.0", | ||
"120,120.0", | ||
"100,100.0", | ||
], | ||
f"--aggregate price,sum,price", | ||
filename=CONSUMPTION, | ||
) | ||
|
||
self.check( | ||
[ | ||
"price,count(price)", | ||
"total,5", | ||
"100,1", | ||
"150,1", | ||
"250,1", | ||
"352,1", | ||
"120,1", | ||
], | ||
f"--aggregate price,count,price", | ||
filename=CONSUMPTION, | ||
) | ||
|
||
# group by a different column when counting does not make sense | ||
msg = "ERROR:convey.action_controller:Count column 'price' must be the same as the grouping column 'consumption'." | ||
with self.assertLogs(level="WARNING") as cm: | ||
self.check("", f"--aggregate price,count,consumption", filename=CONSUMPTION) | ||
self.assertEqual([msg], cm.output) | ||
|
||
def test_merge(self): | ||
# merging generally works | ||
self.check( | ||
COMBINED_SHEET_PERSON, f"--merge {PERSON_CSV},2,1", filename=SHEET_CSV | ||
) | ||
|
||
# rows can be duplicated due to other fields | ||
self.check( | ||
COMBINED_LIST_METHOD, | ||
f"--merge {PERSON_CSV},2,1 -f external,1," + str(p("external_pick_base.py")) + ",list_method", | ||
filename=SHEET_CSV, | ||
) | ||
|
||
# merging file with header and with a missing value | ||
self.check( | ||
SHEET_PERSON_CSV, f"--merge {PERSON_HEADER_CSV},2,1", filename=SHEET_CSV | ||
) | ||
|
||
# merge on a column type | ||
self.check( | ||
PERSON_GIF_CSV, f"--merge {GIF_CSV},email,email", filename=PERSON_CSV | ||
) | ||
|
||
# merge by a column number | ||
self.check(PERSON_GIF_CSV, f"--merge {GIF_CSV},email,1", filename=PERSON_CSV) | ||
|
||
# invalid column definition | ||
msg = "ERROR:convey.identifier:Cannot identify COLUMN invalid, put there an exact column name, its type, the numerical order starting with 1, or with -1." | ||
with self.assertLogs(level="WARNING") as cm: | ||
self.check("", f"--merge {GIF_CSV},email,invalid", filename=PERSON_CSV) | ||
self.assertEqual([msg], cm.output) | ||
# merging a file with itself | ||
self.check( | ||
SHEET_HEADER_ITSELF_CSV, | ||
f"--merge {SHEET_HEADER_CSV},4,2", | ||
filename=SHEET_HEADER_CSV, | ||
) | ||
|
||
# only local file has header; different dialects | ||
self.check( | ||
SHEET_HEADER_PERSON_CSV, | ||
f"--merge {PERSON_CSV},2,1", | ||
filename=SHEET_HEADER_CSV, | ||
) | ||
|
||
def test_compute_from_merge(self): | ||
"""Computing a new column from another file currenlty being merged was not implemented.""" | ||
self.check( | ||
"Column ID 6 does not exist. We have these so far: foo, red, second.example.com", | ||
f"--merge {PERSON_CSV},2,1 -f base64,6", | ||
filename=SHEET_CSV, | ||
) |
Oops, something went wrong.