From 6611411fdb7fbba01abbd743c94262755ad78197 Mon Sep 17 00:00:00 2001 From: Radek Vyhnal Date: Tue, 8 Oct 2024 11:10:00 +0200 Subject: [PATCH] reorganize tests (#41) reorganize tests, add test data use raw strings for regex (fix SyntaxWarning: invalid escape sequence '\d') --- .github/workflows/run-unittest.yml | 2 +- .gitignore | 4 +- tests/__init__.py | 0 tests/shared.py | 203 +++++++ tests/test_action.py | 173 ++++++ tests/test_columns.py | 23 + tests/{ => test_data}/bare_template.eml | 0 tests/{ => test_data}/black.gif | Bin .../{ => test_data}/combined_list_method.csv | 0 .../{ => test_data}/combined_sheet_person.csv | 0 tests/{ => test_data}/consumption.csv | 0 tests/{ => test_data}/email_template.eml | 0 tests/{ => test_data}/external_pick_base.py | 0 tests/{ => test_data}/gif.csv | 0 tests/{ => test_data}/person.csv | 0 tests/{ => test_data}/person.ods | Bin tests/{ => test_data}/person.xls | Bin tests/{ => test_data}/person.xlsx | Bin tests/{ => test_data}/person_gif.csv | 0 tests/{ => test_data}/person_header.csv | 0 tests/{ => test_data}/red-permission.gif | Bin tests/{ => test_data}/sheet.csv | 0 tests/{ => test_data}/sheet_duplicated.csv | 0 tests/{ => test_data}/sheet_header.csv | 0 tests/{ => test_data}/sheet_header_itself.csv | 0 tests/{ => test_data}/sheet_header_person.csv | 0 tests/{ => test_data}/sheet_person.csv | 0 tests/{ => test_data}/white.gif | Bin tests/test_dialect.py | 11 + tests/test_externals.py | 37 ++ tests/test_fields.py | 74 +++ tests/test_filter.py | 18 + tests/test_internal.py | 24 + tests/test_launching.py | 58 ++ tests/test_sending.py | 76 +++ tests/tests.py | 561 ------------------ 36 files changed, 701 insertions(+), 563 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/shared.py create mode 100644 tests/test_action.py create mode 100644 tests/test_columns.py rename tests/{ => test_data}/bare_template.eml (100%) rename tests/{ => test_data}/black.gif (100%) rename tests/{ => test_data}/combined_list_method.csv (100%) rename tests/{ => test_data}/combined_sheet_person.csv (100%) rename tests/{ => test_data}/consumption.csv (100%) rename tests/{ => test_data}/email_template.eml (100%) rename tests/{ => test_data}/external_pick_base.py (100%) rename tests/{ => test_data}/gif.csv (100%) rename tests/{ => test_data}/person.csv (100%) rename tests/{ => test_data}/person.ods (100%) rename tests/{ => test_data}/person.xls (100%) rename tests/{ => test_data}/person.xlsx (100%) rename tests/{ => test_data}/person_gif.csv (100%) rename tests/{ => test_data}/person_header.csv (100%) rename tests/{ => test_data}/red-permission.gif (100%) rename tests/{ => test_data}/sheet.csv (100%) rename tests/{ => test_data}/sheet_duplicated.csv (100%) rename tests/{ => test_data}/sheet_header.csv (100%) rename tests/{ => test_data}/sheet_header_itself.csv (100%) rename tests/{ => test_data}/sheet_header_person.csv (100%) rename tests/{ => test_data}/sheet_person.csv (100%) rename tests/{ => test_data}/white.gif (100%) create mode 100644 tests/test_dialect.py create mode 100644 tests/test_externals.py create mode 100644 tests/test_fields.py create mode 100644 tests/test_filter.py create mode 100644 tests/test_internal.py create mode 100644 tests/test_launching.py create mode 100644 tests/test_sending.py delete mode 100644 tests/tests.py diff --git a/.github/workflows/run-unittest.yml b/.github/workflows/run-unittest.yml index e4eba91..999d6bd 100644 --- a/.github/workflows/run-unittest.yml +++ b/.github/workflows/run-unittest.yml @@ -19,4 +19,4 @@ jobs: - name: Set up config files run: mkdir -p /home/runner/.config/convey && cp convey/defaults/* "$_" - name: Run tests - run: python3 -m unittest tests.tests \ No newline at end of file + run: python3 -m unittest discover -s tests -p 'test_*.py' \ No newline at end of file diff --git a/.gitignore b/.gitignore index c1bcc7c..e4ab96e 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,9 @@ /config.ini /tests/** !/tests/* +!/tests/test_data/* /tests/statistics.txt convey.log output_* -*@example* \ No newline at end of file +*@example* +.venv \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/shared.py b/tests/shared.py new file mode 100644 index 0000000..9863d34 --- /dev/null +++ b/tests/shared.py @@ -0,0 +1,203 @@ +from contextlib import redirect_stdout +from io import StringIO +import shlex +from subprocess import PIPE, run +import sys +import os +import logging +from pathlib import Path +from stat import S_IRGRP, S_IRUSR +from typing import List, Union +from unittest import TestCase + +sys.path.append(str(Path(__file__).parent.parent)) + +from convey.controller import Controller +from convey.dialogue import Cancelled + +logging.basicConfig(stream=sys.stderr, level=logging.WARNING) + +# to evade project folder pollution, chdir to a temp folder +PROJECT_DIR = Path.cwd() +# temp = TemporaryDirectory() XX As the output folder appears in the file folder, this has diminished effect. +# os.chdir(temp.name) +# os.chdir("tests") + +TESTDATA_DIR = Path("tests") / Path("test_data") + +def p(s): + """all mentioned resources files are in the tests folder""" + return PROJECT_DIR / TESTDATA_DIR / Path(s) + + +HELLO_B64 = "aGVsbG8=" +SHEET_CSV = p("sheet.csv") +GIF_CSV = p("gif.csv") +PERSON_CSV = p("person.csv") +PERSON_XLS = p("person.xls") +PERSON_XLSX = p("person.xlsx") +PERSON_ODS = p("person.ods") +COMBINED_SHEET_PERSON = p("combined_sheet_person.csv") +PERSON_HEADER_CSV = p("person_header.csv") +COMBINED_LIST_METHOD = p("combined_list_method.csv") +SHEET_DUPLICATED_CSV = p("sheet_duplicated.csv") +SHEET_HEADER_CSV = p("sheet_header.csv") +SHEET_HEADER_ITSELF_CSV = p("sheet_header_itself.csv") +SHEET_HEADER_PERSON_CSV = p("sheet_header_person.csv") +SHEET_PERSON_CSV = p("sheet_person.csv") +PERSON_GIF_CSV = p("person_gif.csv") +CONSUMPTION = p("consumption.csv") +p("red-permission.gif").chmod(S_IRUSR | S_IRGRP) # make file unreadable to others + + +class Convey: + """While we prefer to check the results with .check method + (quicker, directly connected with the internals of the library), + this method is able to test piping and interprocess communication. + """ + + def __init__( + self, + *args, + filename: Union[str, Path] = None, + text=None, + whois=False, + debug=None, + ): + """It is important that an input is flagged with --file or --input when performing tests + because otherwise, main() would hang on `not sys.stdin.isatty() -> sys.stdin.read()` + :type args: object + """ + self.debug = debug + + # XX travis will not work will daemon=true (which imposes slow testing) + self.cmd = [ + str(PROJECT_DIR / "convey.py"), + "--output", + "--reprocess", + "--headless", + "--daemon", + "false", + "--debug", + "false", + "--crash-post-mortem", + "false", + ] + if ( + filename is None + and not text + and len(args) == 1 + and not str(args[0]).startswith("-") + ): + filename = args[0] + args = None + if filename: + if not Path(filename).exists(): + raise FileNotFoundError(filename) + self.cmd.extend(("--file", str(filename))) + if text: + self.cmd.extend(("--input", text)) + + self.has_filename = bool(filename) + self.has_text = bool(text) + if not whois: + self.cmd.extend(("--whois-cache", "false")) + if args: + self.cmd.extend(args) + + def __call__(self, cmd="", text=None, debug=None, piped_text=None): + if debug is not None: + self.debug = debug + if not any( + (self.has_filename, self.has_text, piped_text) + ) and not cmd.startswith("-"): + cmd = "--input " + cmd + + cmd = [*self.cmd, *shlex.split(cmd)] + if text: + cmd.extend(("--input", text)) + if self.debug: + print(" ".join(cmd)) + # run: blocking, output + input_ = piped_text.encode() if piped_text else None + lines = ( + run(cmd, input=input_, stdout=PIPE, timeout=3) + .stdout.decode("utf-8") + .splitlines() + ) + if self.debug: + print(lines) + if lines and lines[-1] == "\x1b[0m": + # colorama put this reset string at the end. I am not able to reproduce it in bash, only in Python piping. + lines = lines[:-1] + return lines + + +class TestAbstract(TestCase): + maxDiff = None + + + + def check( + self, + check: Union[List, str, None], + cmd: str = "", + text=None, + filename: Union[str, Path] = None, + debug=None, + ): + # o = Convey(filename=filename, text=text, debug=debug)(cmd) + args = [ + "--output", + "--reprocess", + "--headless", + "--daemon", + "false", + "--debug", + "false", + "--crash-post-mortem", + "false", + ] + if filename: + args.extend(("--file", str(filename))) + if text: + args.extend(("--input", text)) + args.extend(shlex.split(cmd)) + + if isinstance(check, Path): + check = Path(check).read_text().splitlines() + if debug: + print("convey", " ".join(args)) + print(check) + info = ("Cmd", "convey " + " ".join(args), "Check", check) + + with redirect_stdout(StringIO()) as buf: + c = Controller() + try: + c.run(given_args=args) + except SystemExit as e: + if e.code: + raise AssertionError(f"Bad exit code: {e.code}") + except Cancelled as e: + print(str(e)) + except Exception as e: + raise Exception(*info) from e + finally: + c.cleanup() + o = buf.getvalue().splitlines() + + try: + if isinstance(check, list): + self.assertListEqual(check, o) + elif check == "": # check empty output + self.assertFalse(o) + elif check is None: # we do not want to do any checks + pass + elif not len(o): + raise AssertionError(f"Output too short: {o}") + else: + self.assertEqual(check, o[0]) + except AssertionError as e: + raise AssertionError(*info) from e + + return c diff --git a/tests/test_action.py b/tests/test_action.py new file mode 100644 index 0000000..c5e0ff6 --- /dev/null +++ b/tests/test_action.py @@ -0,0 +1,173 @@ +import os +from pathlib import Path +from shared import ( + CONSUMPTION, + COMBINED_LIST_METHOD, + COMBINED_SHEET_PERSON, + GIF_CSV, + PERSON_CSV, + PERSON_GIF_CSV, + PERSON_HEADER_CSV, + SHEET_CSV, + SHEET_HEADER_CSV, + SHEET_HEADER_ITSELF_CSV, + SHEET_HEADER_PERSON_CSV, + SHEET_PERSON_CSV, + TestAbstract, + Convey, + p, +) + + +class TestAction(TestAbstract): + def test_aggregate(self): + self.check( + ["sum(price)", "972.0"], f"--aggregate price,sum", filename=CONSUMPTION + ) + self.check( + ["category,sum(price)", "total,972.0", "kettle,602.0", "bulb,370.0"], + f"--aggregate price,sum,category", + filename=CONSUMPTION, + ) + self.check( + [ + "category,sum(price),avg(consumption)", + "total,972.0,41.0", + "kettle,602.0,75.0", + "bulb,370.0,18.33", + ], + f"--aggregate price,sum,consumption,avg,category", + filename=CONSUMPTION, + ) + self.check( + [ + "category,sum(price),list(price)", + "total,972.0,(all)", + '''kettle,602.0,"['250', '352']"''', + '''bulb,370.0,"['100', '150', '120']"''', + ], + f"--aggregate price,sum,price,list,category", + filename=CONSUMPTION, + ) + + # XX this will correctly split the files, + # however, the output is poor and for a reason not readable by the check. + # self.check(['','Split location: bulb','','Split location: kettle'], + # "--agg price,sum --split category", filename=CONSUMPTION) + # Until then, following substitution is used to generate the files at least + Convey(filename=CONSUMPTION)("--agg price,sum --split category") + + # Check the contents of the files that just have been split + check1 = False + check2 = False + for f in Path().rglob("consumption.csv_convey*/*"): + if f.name == "kettle" and f.read_text() == "sum(price)\n602.0\n": + check1 = True + if f.name == "bulb" and f.read_text() == "sum(price)\n370.0\n": + check2 = True + self.assertTrue(check1) + self.assertTrue(check2) + + def test_aggregate_group_col(self): + # group by a column without any additional info means counting + self.check( + [ + "price,count(price)", + "total,5", + "100,1", + "150,1", + "250,1", + "352,1", + "120,1", + ], + f"-a price", + filename=CONSUMPTION, + ) + + # group by the same column works + self.check( + [ + "price,sum(price)", + "total,972.0", + "352,352.0", + "250,250.0", + "150,150.0", + "120,120.0", + "100,100.0", + ], + f"--aggregate price,sum,price", + filename=CONSUMPTION, + ) + + self.check( + [ + "price,count(price)", + "total,5", + "100,1", + "150,1", + "250,1", + "352,1", + "120,1", + ], + f"--aggregate price,count,price", + filename=CONSUMPTION, + ) + + # group by a different column when counting does not make sense + msg = "ERROR:convey.action_controller:Count column 'price' must be the same as the grouping column 'consumption'." + with self.assertLogs(level="WARNING") as cm: + self.check("", f"--aggregate price,count,consumption", filename=CONSUMPTION) + self.assertEqual([msg], cm.output) + + def test_merge(self): + # merging generally works + self.check( + COMBINED_SHEET_PERSON, f"--merge {PERSON_CSV},2,1", filename=SHEET_CSV + ) + + # rows can be duplicated due to other fields + self.check( + COMBINED_LIST_METHOD, + f"--merge {PERSON_CSV},2,1 -f external,1," + str(p("external_pick_base.py")) + ",list_method", + filename=SHEET_CSV, + ) + + # merging file with header and with a missing value + self.check( + SHEET_PERSON_CSV, f"--merge {PERSON_HEADER_CSV},2,1", filename=SHEET_CSV + ) + + # merge on a column type + self.check( + PERSON_GIF_CSV, f"--merge {GIF_CSV},email,email", filename=PERSON_CSV + ) + + # merge by a column number + self.check(PERSON_GIF_CSV, f"--merge {GIF_CSV},email,1", filename=PERSON_CSV) + + # invalid column definition + msg = "ERROR:convey.identifier:Cannot identify COLUMN invalid, put there an exact column name, its type, the numerical order starting with 1, or with -1." + with self.assertLogs(level="WARNING") as cm: + self.check("", f"--merge {GIF_CSV},email,invalid", filename=PERSON_CSV) + self.assertEqual([msg], cm.output) + # merging a file with itself + self.check( + SHEET_HEADER_ITSELF_CSV, + f"--merge {SHEET_HEADER_CSV},4,2", + filename=SHEET_HEADER_CSV, + ) + + # only local file has header; different dialects + self.check( + SHEET_HEADER_PERSON_CSV, + f"--merge {PERSON_CSV},2,1", + filename=SHEET_HEADER_CSV, + ) + + def test_compute_from_merge(self): + """Computing a new column from another file currenlty being merged was not implemented.""" + self.check( + "Column ID 6 does not exist. We have these so far: foo, red, second.example.com", + f"--merge {PERSON_CSV},2,1 -f base64,6", + filename=SHEET_CSV, + ) diff --git a/tests/test_columns.py b/tests/test_columns.py new file mode 100644 index 0000000..a2721a7 --- /dev/null +++ b/tests/test_columns.py @@ -0,0 +1,23 @@ +from tests.shared import SHEET_CSV, Convey, TestAbstract + + +class TestColumns(TestAbstract): + def test_column_selection(self): + """ Allows specify index (even negative) """ + + self.check("com", "-f tld,-1", "one.com") + self.check('"one.com","com"', "-f tld,-1 -C", "one.com") + self.check("com", "-f tld,1", "one.com") + self.check("Column ID 2 does not exist. We have these so far: one.com", "-f tld,2", "one.com") + self.check('two.cz,one.com,com', "-f tld,2 -C", "two.cz,one.com") + + c = Convey(filename=SHEET_CSV) + self.assertEqual('foo,green,first.example.com,com', c("-f tld,-1")[1]) + self.assertEqual('foo,green,first.example.com,com', c("-f tld,3")[1]) + self.assertEqual('foo,green,first.example.com,com,comA', c("-f tld,-1 -f code,-1,'x+=\"A\"'")[1]) + self.assertEqual('foo,green,first.example.com,com,first.example.comA', c("-f tld,-1 -f code,-2,'x+=\"A\"'")[1]) + + def test_split(self): + lines = Convey()("--split email", "one@example.com\nsecond@example.com") + [self.assertIn(s, lines) for s in + ('* Saved to second@example.com', '"second@example.com"', '* Saved to one@example.com', '"one@example.com"')] diff --git a/tests/bare_template.eml b/tests/test_data/bare_template.eml similarity index 100% rename from tests/bare_template.eml rename to tests/test_data/bare_template.eml diff --git a/tests/black.gif b/tests/test_data/black.gif similarity index 100% rename from tests/black.gif rename to tests/test_data/black.gif diff --git a/tests/combined_list_method.csv b/tests/test_data/combined_list_method.csv similarity index 100% rename from tests/combined_list_method.csv rename to tests/test_data/combined_list_method.csv diff --git a/tests/combined_sheet_person.csv b/tests/test_data/combined_sheet_person.csv similarity index 100% rename from tests/combined_sheet_person.csv rename to tests/test_data/combined_sheet_person.csv diff --git a/tests/consumption.csv b/tests/test_data/consumption.csv similarity index 100% rename from tests/consumption.csv rename to tests/test_data/consumption.csv diff --git a/tests/email_template.eml b/tests/test_data/email_template.eml similarity index 100% rename from tests/email_template.eml rename to tests/test_data/email_template.eml diff --git a/tests/external_pick_base.py b/tests/test_data/external_pick_base.py similarity index 100% rename from tests/external_pick_base.py rename to tests/test_data/external_pick_base.py diff --git a/tests/gif.csv b/tests/test_data/gif.csv similarity index 100% rename from tests/gif.csv rename to tests/test_data/gif.csv diff --git a/tests/person.csv b/tests/test_data/person.csv similarity index 100% rename from tests/person.csv rename to tests/test_data/person.csv diff --git a/tests/person.ods b/tests/test_data/person.ods similarity index 100% rename from tests/person.ods rename to tests/test_data/person.ods diff --git a/tests/person.xls b/tests/test_data/person.xls similarity index 100% rename from tests/person.xls rename to tests/test_data/person.xls diff --git a/tests/person.xlsx b/tests/test_data/person.xlsx similarity index 100% rename from tests/person.xlsx rename to tests/test_data/person.xlsx diff --git a/tests/person_gif.csv b/tests/test_data/person_gif.csv similarity index 100% rename from tests/person_gif.csv rename to tests/test_data/person_gif.csv diff --git a/tests/person_header.csv b/tests/test_data/person_header.csv similarity index 100% rename from tests/person_header.csv rename to tests/test_data/person_header.csv diff --git a/tests/red-permission.gif b/tests/test_data/red-permission.gif similarity index 100% rename from tests/red-permission.gif rename to tests/test_data/red-permission.gif diff --git a/tests/sheet.csv b/tests/test_data/sheet.csv similarity index 100% rename from tests/sheet.csv rename to tests/test_data/sheet.csv diff --git a/tests/sheet_duplicated.csv b/tests/test_data/sheet_duplicated.csv similarity index 100% rename from tests/sheet_duplicated.csv rename to tests/test_data/sheet_duplicated.csv diff --git a/tests/sheet_header.csv b/tests/test_data/sheet_header.csv similarity index 100% rename from tests/sheet_header.csv rename to tests/test_data/sheet_header.csv diff --git a/tests/sheet_header_itself.csv b/tests/test_data/sheet_header_itself.csv similarity index 100% rename from tests/sheet_header_itself.csv rename to tests/test_data/sheet_header_itself.csv diff --git a/tests/sheet_header_person.csv b/tests/test_data/sheet_header_person.csv similarity index 100% rename from tests/sheet_header_person.csv rename to tests/test_data/sheet_header_person.csv diff --git a/tests/sheet_person.csv b/tests/test_data/sheet_person.csv similarity index 100% rename from tests/sheet_person.csv rename to tests/test_data/sheet_person.csv diff --git a/tests/white.gif b/tests/test_data/white.gif similarity index 100% rename from tests/white.gif rename to tests/test_data/white.gif diff --git a/tests/test_dialect.py b/tests/test_dialect.py new file mode 100644 index 0000000..806adba --- /dev/null +++ b/tests/test_dialect.py @@ -0,0 +1,11 @@ +from unittest import TestCase + +from tests.shared import SHEET_CSV, Convey + + +class TestDialect(TestCase): + def test_dialect(self): + convey = Convey(SHEET_CSV) + self.assertIn("foo|red|second.example.com", convey("--delimiter-output '|'")) + self.assertIn("foo|red|second.example.com", convey("--delimiter-output '|' --header-output false")) + self.assertNotIn("foo|red|second.example.com", convey("--delimiter-output '|' --header-output false --header")) diff --git a/tests/test_externals.py b/tests/test_externals.py new file mode 100644 index 0000000..f5fb6f0 --- /dev/null +++ b/tests/test_externals.py @@ -0,0 +1,37 @@ +from tests.shared import SHEET_CSV, SHEET_DUPLICATED_CSV, Convey, TestAbstract, p + + +class TestExternals(TestAbstract): + + def test_list_in_result(self): + """The method returns a list while CSV processing.""" + self.check(SHEET_DUPLICATED_CSV, "-f external," + str(p("external_pick_base.py")) + ",list_method", filename=SHEET_CSV) + + def test_bare_method(self): + # single value converted by the external + self.check("-foo-", "--field external," + str(p("external_pick_base.py")) + ",dumb_method --input 'foo'") + + # by default, first column is used + self.check("foo,bar,-foo-", "--field external," + str(p("external_pick_base.py")) + ",dumb_method -C --input 'foo,bar'") + + # specify 2nd column is the source for the external field + self.check("foo,bar,-bar-", "--field external,2,plaintext," + str(p("external_pick_base.py")) + ",dumb_method -C --input 'foo,bar'") + + def test_pick_input(self): + convey = Convey() + lines = convey("--field external," + str(p("external_pick_base.py")) + ",time_format --input '2016-08-08 12:00'") + self.assertEqual(lines, ["12:00"]) + + def test_pick_method(self): + convey = Convey() + # method "all" (default one) is used and "1" passes + self.assertEqual(["1"], convey("--field external," + str(p("external_pick_base.py")) + ",PickMethodTest --input '1'")) + # method "filtered" is used and "a" passes + self.assertEqual(["a"], convey("--field external," + str(p("external_pick_base.py")) + ",PickMethodTest,filtered --input 'a'")) + # method "filtered" is used which excludes "1" + self.assertNotEqual(["1"], convey("--field external," + str(p("external_pick_base.py")) + ",PickMethodTest,filtered --input '1'")) + + # XX does not work, error when resolving path Unit -> Plaintext -> External. Identifier.get_methods_from + # `lambda_ = lambda_.get_lambda(custom.pop(0) if custom is not None else None)` + # -> get_module_from_path(custom[0], ...) does not contain a path + # self.assertEqual(["a"], convey("--field external,external_pick_base.py,PickMethodTest --input 'mm'")) diff --git a/tests/test_fields.py b/tests/test_fields.py new file mode 100644 index 0000000..dc76738 --- /dev/null +++ b/tests/test_fields.py @@ -0,0 +1,74 @@ +from base64 import b64encode +from datetime import datetime + +from tests.shared import HELLO_B64, Convey, TestAbstract + +convey = Convey() + +class TestFields(TestAbstract): + + def test_base64_detection(self): + """Base64 detection should work if majority of the possibly decoded characters are not gibberish.""" + self.check("base64", "--single-detect", HELLO_B64) + self.check("", "--single-detect", "ahojahoj") + + def test_base64_charset(self): + """Base64 detection should work even if encoded with another charset""" + s = "Žluťoučký kůň pěl ďábelské ódy." + encoded = b64encode(s.encode("iso-8859-2")) + convey = Convey() + self.assertIn(s, convey("-f charset,,,iso-8859-2", text=encoded.decode("utf-8"))) + + def test_base64_disambiguation(self): + """Base64 must not mix up with I.E. hostname""" + c = Convey("--single-detect") + self.assertIn("hostname", c("example.com")) # hostname must not be confounded with base64 + self.assertFalse(c("base")) # 'base' is plaintext + + c = Convey("--single-query") + self.assertIn("m«", c("base -t base64")) # 'base' can be base64 if explicitly told + + def test_phone_detection(self): + """Various phone formats must pass.""" + c = Convey("--single-detect") + self.assertIn("timestamp", c("2020-02-29")) # date value must not be confused with the phone regex + for phone in ("+420123456789", "+1-541-754-3010", "1-541-754-3010", "001-541-754-3010", "+49-89-636-4801"): + self.assertIn("phone", c(phone), phone) + + def test_pint(self): + """Test unit conversion works""" + c = Convey() + self.assertIn("unit", c("--single-detect", text="1 kg")) + self.assertIn("2.6792288807189983 troy_pound", c("-f unit[troy_pound]", text="1 kg")) + + def test_wrong_url(self): + c = Convey() + self.assertEqual("http://example.com", c("-f url", text="hXXp://example.com")[0]) + self.assertEqual("https://an.eXAmple.com", c("-f url", text="hxxps://an[.]eXAmple[.]com")[0]) + self.assertEqual("http://185.33.144.243/main_content/", c("-f url", text="hxxp://185.33.144[.]243/main_content/")[0]) + self.assertEqual("http://80.211.218.7/fb/", c("-f url", text="80.211.218.7/fb/")[0]) + + def test_hostname(self): + self.assertIn("hostname", convey("--single-detect", text="_spf.google.com")) + + def test_timestamp(self): + self.assertIn("timestamp", convey("--single-detect", text="26. 03. 1999")) + time = datetime.now() + self.assertIn("timestamp", convey("--single-detect", text=str(time))) + self.assertIn("timestamp", convey("--single-detect", text=str(int(time.timestamp())))) + + # as of Python3.7 use: + # distant_future = datetime.fromisoformat("3000-01-01") # it is less probable distant dates are dates + distant_future = datetime.fromtimestamp(32503680000.0) + + self.assertIn("timestamp", convey("--single-detect", text=str(distant_future))) + self.assertIn("phone", convey("--single-detect", text=str(int(distant_future.timestamp())))) + # there is no path to datetype date from a phone + self.assertIn("No suitable column found for field 'date'", convey("-S -f date", text=str(distant_future.timestamp()))) + # however, is is possible to get a date if specified + self.assertIn("3000-01-01", convey("-t timestamp -f date", text=str(int(distant_future.timestamp())))) + # works for float numbers too + self.assertIn("3000-01-01", convey("-t timestamp -f date", text=str(distant_future.timestamp()))) + + # short number is not considered a timestamp from the beginning of the Unix epoch (1970) + self.assertEqual([], convey("--single-detect", text="12345")) diff --git a/tests/test_filter.py b/tests/test_filter.py new file mode 100644 index 0000000..ae9f233 --- /dev/null +++ b/tests/test_filter.py @@ -0,0 +1,18 @@ +from unittest import TestCase + +from tests.shared import SHEET_CSV, Convey + + +class TestFilter(TestCase): + def test_filter(self): + convey = Convey(SHEET_CSV) + self.assertEqual(3, len(convey("--include-filter 1,foo"))) + self.assertEqual(2, len(convey("--exclude-filter 1,foo"))) + self.assertEqual(2, len(convey("--unique 1"))) + + def test_post_filter(self): + """Filter after a field was generated.""" + convey = Convey(SHEET_CSV) + self.assertEqual(3, len(convey("--field base64,1 --include-filter base64,Zm9v"))) + self.assertEqual(2, len(convey("--field base64,1 --exclude-filter base64,Zm9v"))) + self.assertEqual(2, len(convey("--field base64,1 --unique base64"))) diff --git a/tests/test_internal.py b/tests/test_internal.py new file mode 100644 index 0000000..8c90469 --- /dev/null +++ b/tests/test_internal.py @@ -0,0 +1,24 @@ +from tests.shared import GIF_CSV, PERSON_CSV, PERSON_GIF_CSV, TestAbstract + + +class TestInternal(TestAbstract): + def test_similar_fields(self): + """Recommending of the similar columns""" + c1 = self.check(None, f"--merge {PERSON_CSV},2,1", filename=GIF_CSV) + parser1A, parser1B = c1.parser, c1.parser.settings["merge"][0].remote_parser + fields1A = c1.parser.fields + fields1B = parser1B.fields + self.assertListEqual([fields1A[0]], parser1A.get_similar(fields1B)) + self.assertListEqual([fields1B[0]], parser1B.get_similar(fields1A)) + self.assertListEqual([fields1B[0]], parser1B.get_similar(fields1A[0])) + self.assertListEqual([], parser1B.get_similar(fields1A[1])) + + c2 = self.check(None, f"--merge {PERSON_GIF_CSV},2,1", filename=GIF_CSV) + parser2A, parser2B = c2.parser, c2.parser.settings["merge"][0].remote_parser + fields2A = c2.parser.fields + fields2B = parser2B.fields + self.assertListEqual([fields2A[0], fields2A[1]], parser2A.get_similar(fields2B)) + self.assertListEqual([fields2A[0]], parser2A.get_similar(fields2B[0])) + self.assertListEqual([], parser2A.get_similar(fields2B[1])) + self.assertListEqual([fields2A[1]], parser2A.get_similar(fields2B[3])) + self.assertListEqual([fields2B[0], fields2B[3]], parser2B.get_similar(fields2A)) diff --git a/tests/test_launching.py b/tests/test_launching.py new file mode 100644 index 0000000..de9e50f --- /dev/null +++ b/tests/test_launching.py @@ -0,0 +1,58 @@ +from base64 import b64encode +from pathlib import Path +import shutil +from tempfile import TemporaryDirectory +from tests.shared import COMBINED_SHEET_PERSON, HELLO_B64, PERSON_ODS, PERSON_XLS, PERSON_XLSX, SHEET_CSV, Convey, TestAbstract + + +class TestLaunching(TestAbstract): + def test_piping_in(self): + convey = Convey() + # just string specified, nothing else + lines = convey(piped_text="3 kg") + self.assertTrue(len(lines) == 1) + self.assertTrue("'1.806642228624337e+27 dalton'" in lines[0]) + + # field base64 specified + self.assertListEqual([HELLO_B64], convey("-f base64", piped_text="hello")) + self.assertListEqual(["hello"], convey(piped_text=HELLO_B64)) + self.assertListEqual([], convey(piped_text="hello")) + + def test_single_query_processing(self): + """Stable --single-query parsing""" + + # Single field containing a comma, still must be fully converted (comma must not be mistaken for a CSV delimiter) + self.check("aGVsbG8sIGhlbGxv", "-f base64", "hello, hello", debug=True) + self.check("V=C3=A1=C5=BEen=C3=A1 Ad=C3=A9lo, ra=C4=8Dte vstoupit", "-f quoted_printable", "Vážená Adélo, račte vstoupit") + self.check([], "", "hello, hello") + + # multiline base64 string + multiline = "ahoj\nahoj" + multiline_64 = b64encode(multiline.encode()).decode() + word = "ahoj" + word_64 = b64encode(word.encode()).decode() + + self.check(multiline_64, "--single-query -f base64", multiline) + self.check([f'"{word}","{word_64}"'] * 2, "-f base64", multiline) + + def test_conversion(self): + lines = ["john@example.com", "mary@example.com", "hyacint@example.com"] + with TemporaryDirectory() as temp: + for pattern in (PERSON_XLS, PERSON_XLSX, PERSON_ODS): + f = Path(temp, pattern.name) + f_converted = Path(temp, pattern.name + ".csv") + # XX as of Python3.8, use this line: shutil.copy(pattern, f) + shutil.copy(str(pattern), str(f)) # move to temp dir to not pollute the tests folder + + self.assertFalse(f_converted.exists()) + self.check(lines, f"-s 1", filename=f) + self.assertTrue(f_converted.exists()) + # try again, as the file now exists + self.check(lines, f"-s 1", filename=f) + + # clean the converted file up and use it not as the main file but + # as a secondary Wrapper – in a merge action + f_converted.unlink() + self.check(COMBINED_SHEET_PERSON, f"--merge {f},2,1", filename=SHEET_CSV) + self.assertTrue(f_converted.exists()) + self.check(COMBINED_SHEET_PERSON, f"--merge {f},2,1", filename=SHEET_CSV) diff --git a/tests/test_sending.py b/tests/test_sending.py new file mode 100644 index 0000000..cf79ea2 --- /dev/null +++ b/tests/test_sending.py @@ -0,0 +1,76 @@ +import os +from pathlib import Path +from unittest import TestCase + +from tests.shared import GIF_CSV, PROJECT_DIR, SHEET_CSV, TESTDATA_DIR, Convey, p + + +class TestSending(TestCase): + def test_dynamic_template(self): + convey = Convey("--output", "False", filename=SHEET_CSV) + cmd = """--field code,3,'x="example@example.com" if "example.com" in x else x+"@example.com"'""" \ + " --split code --send-test {mail} 'email_template.eml' --headless" + + os.chdir(PROJECT_DIR / TESTDATA_DIR) + + lines = convey(cmd.format(mail="example@example.com")) + self.assertIn('Subject: My cool dynamic template demonstrating a long amount of lines!', lines) + self.assertIn('We send you lots of colours: red, green, yellow.', lines) + self.assertIn('foo,green,first.example.com,example@example.com', lines) + # attachment must not be present because we called attachment() in the template + self.assertNotIn('Attachment', lines[0]) + + lines = convey(cmd.format(mail="wikipedia.com@example.com")) + self.assertIn('Subject: My cool dynamic template demonstrating a short amount of lines!', lines) + self.assertIn('We send you single colour: orange.', lines) + self.assertIn('Attachment sheet.csv (text/csv):', lines[0]) + + cmd += " --header" # we force first row to be a header + lines = convey(cmd.format(mail="wikipedia.com@example.com")) + # even though there is header in the file, we should still get single value + self.assertIn('We send you single colour: orange.', lines) + + # XX we should test body, subject, references flag + # def test_body_flag(self): + # convey = Convey(FILTER_FILE) + # cmd = """--body "body text" """ + + def test_send(self): + BLACK = "Attachment black.gif (image/gif)" + WHITE = "Attachment white.gif (image/gif)" + COLOURS = "Attachment gif.csv (text/csv)" + convey = Convey("--output", "False", filename=GIF_CSV) + cmd_pattern = "-t abusemail,path --split abusemail --send-test {mail} 'bare_template.eml' " + + cmd = cmd_pattern + "--attach-files False --attach-paths-from-path-column True" + + os.chdir(PROJECT_DIR / TESTDATA_DIR) + + # Single image is attached + lines = convey(cmd.format(mail="john@example.com")) + self.assertIn(BLACK, lines[0]) + lines = convey(cmd.format(mail="mary@example.com")) + self.assertIn(WHITE, lines[0]) + + # Two images are attached + lines = convey(cmd.format(mail="jack@example.com")) + self.assertIn(BLACK, lines[0]) + self.assertIn(WHITE, lines[4]) + + # Image cannot be attached + lines = convey(cmd.format(mail="hyacint@example.com")) + self.assertIn( + "Convey crashed at For security reasons, path must be readable to others: red-permission.gif", lines[0]) + + # Flags controlling attachments work + lines = convey(cmd_pattern.format(mail="john@example.com") + + "--attach-files True --attach-paths-from-path-column False") + self.assertIn(COLOURS, lines[0]) + lines = convey(cmd_pattern.format(mail="john@example.com") + + "--attach-files True --attach-paths-from-path-column True") + self.assertIn(COLOURS, lines[0]) + self.assertIn(BLACK, lines[1]) + lines = convey(cmd_pattern.format(mail="john@example.com") + + "--attach-files False --attach-paths-from-path-column False") + self.assertNotIn(COLOURS, lines[0]) + self.assertNotIn(BLACK, lines[1]) diff --git a/tests/tests.py b/tests/tests.py deleted file mode 100644 index 9ba8d86..0000000 --- a/tests/tests.py +++ /dev/null @@ -1,561 +0,0 @@ -from contextlib import redirect_stdout -from io import StringIO -import logging -import os -import shlex -import shutil -from stat import S_IRGRP, S_IRUSR -import sys -from base64 import b64encode -from datetime import datetime -from pathlib import Path -from subprocess import run, PIPE -from tempfile import TemporaryDirectory -from typing import Union, List -from unittest import TestCase, main - -from convey.controller import Controller -from convey.dialogue import Cancelled - -logging.basicConfig(stream=sys.stderr, level=logging.WARNING) - -# to evade project folder pollution, chdir to a temp folder -PROJECT_DIR = Path.cwd() -# temp = TemporaryDirectory() XX As the output folder appears in the file folder, this has diminished effect. -# os.chdir(temp.name) -os.chdir("tests") - -def p(s): - """ all mentioned resources files are in the tests folder """ - return PROJECT_DIR / "tests" / Path(s) - -HELLO_B64 = 'aGVsbG8=' -SHEET_CSV = p("sheet.csv") -GIF_CSV = p("gif.csv") -PERSON_CSV = p("person.csv") -PERSON_XLS = p("person.xls") -PERSON_XLSX = p("person.xlsx") -PERSON_ODS = p("person.ods") -COMBINED_SHEET_PERSON = p("combined_sheet_person.csv") -PERSON_HEADER_CSV = p("person_header.csv") -COMBINED_LIST_METHOD = p("combined_list_method.csv") -SHEET_DUPLICATED_CSV = p("sheet_duplicated.csv") -SHEET_HEADER_CSV = p("sheet_header.csv") -SHEET_HEADER_ITSELF_CSV = p("sheet_header_itself.csv") -SHEET_HEADER_PERSON_CSV = p("sheet_header_person.csv") -SHEET_PERSON_CSV = p("sheet_person.csv") -PERSON_GIF_CSV = p("person_gif.csv") -CONSUMPTION = p("consumption.csv") -p("red-permission.gif").chmod(S_IRUSR | S_IRGRP) # make file unreadable to others - - -class Convey: - """ While we prefer to check the results with .check method - (quicker, directly connected with the internals of the library), - this method is able to test piping and interprocess communication. - """ - - def __init__(self, *args, filename: Union[str, Path] = None, text=None, whois=False, debug=None): - """ It is important that an input is flagged with --file or --input when performing tests - because otherwise, main() would hang on `not sys.stdin.isatty() -> sys.stdin.read()` - :type args: object - """ - self.debug = debug - - # XX travis will not work will daemon=true (which imposes slow testing) - self.cmd = [str(PROJECT_DIR / "convey.py"), "--output", "--reprocess", "--headless", - "--daemon", "false", "--debug", "false", "--crash-post-mortem", "false"] - if filename is None and not text and len(args) == 1 and not str(args[0]).startswith("-"): - filename = args[0] - args = None - if filename: - if not Path(filename).exists(): - raise FileNotFoundError(filename) - self.cmd.extend(("--file", str(filename))) - if text: - self.cmd.extend(("--input", text)) - - self.has_filename = bool(filename) - self.has_text = bool(text) - if not whois: - self.cmd.extend(("--whois-cache", "false")) - if args: - self.cmd.extend(args) - - def __call__(self, cmd="", text=None, debug=None, piped_text=None): - if debug is not None: - self.debug = debug - if not any((self.has_filename, self.has_text, piped_text)) and not cmd.startswith("-"): - cmd = "--input " + cmd - - cmd = [*self.cmd, *shlex.split(cmd)] - if text: - cmd.extend(("--input", text)) - if self.debug: - print(" ".join(cmd)) - # run: blocking, output - input_ = piped_text.encode() if piped_text else None - lines = run(cmd, input=input_, stdout=PIPE, timeout=3).stdout.decode("utf-8").splitlines() - if self.debug: - print(lines) - if lines and lines[-1] == '\x1b[0m': - # colorama put this reset string at the end. I am not able to reproduce it in bash, only in Python piping. - lines = lines[:-1] - return lines - - -convey = Convey() - - -class TestAbstract(TestCase): - maxDiff = None - - def check(self, check: Union[List, str, None], cmd: str = "", text=None, filename: Union[str, Path] = None, debug=None): - # o = Convey(filename=filename, text=text, debug=debug)(cmd) - args = ["--output", "--reprocess", "--headless", "--daemon", - "false", "--debug", "false", "--crash-post-mortem", "false"] - if filename: - args.extend(("--file", str(filename))) - if text: - args.extend(("--input", text)) - args.extend(shlex.split(cmd)) - - if isinstance(check, Path): - check = Path(check).read_text().splitlines() - if debug: - print("convey", " ".join(args)) - print(check) - info = ("Cmd", "convey " + " ".join(args), "Check", check) - - with redirect_stdout(StringIO()) as buf: - c = Controller() - try: - c.run(given_args=args) - except SystemExit as e: - if e.code: - raise AssertionError(f"Bad exit code: {e.code}") - except Cancelled as e: - print(str(e)) - except Exception as e: - raise Exception(*info) from e - finally: - c.cleanup() - o = buf.getvalue().splitlines() - - try: - if isinstance(check, list): - self.assertListEqual(check, o) - elif check == "": # check empty output - self.assertFalse(o) - elif check is None: # we do not want to do any checks - pass - elif not len(o): - raise AssertionError(f"Output too short: {o}") - else: - self.assertEqual(check, o[0]) - except AssertionError as e: - raise AssertionError(*info) from e - - return c - - -class TestFilter(TestCase): - def test_filter(self): - convey = Convey(SHEET_CSV) - self.assertEqual(3, len(convey("--include-filter 1,foo"))) - self.assertEqual(2, len(convey("--exclude-filter 1,foo"))) - self.assertEqual(2, len(convey("--unique 1"))) - - def test_post_filter(self): - """ Filter after a field was generated. """ - convey = Convey(SHEET_CSV) - self.assertEqual(3, len(convey("--field base64,1 --include-filter base64,Zm9v"))) - self.assertEqual(2, len(convey("--field base64,1 --exclude-filter base64,Zm9v"))) - self.assertEqual(2, len(convey("--field base64,1 --unique base64"))) - - -class TestDialect(TestCase): - def test_dialect(self): - convey = Convey(SHEET_CSV) - self.assertIn("foo|red|second.example.com", convey("--delimiter-output '|'")) - self.assertIn("foo|red|second.example.com", convey("--delimiter-output '|' --header-output false")) - self.assertNotIn("foo|red|second.example.com", convey("--delimiter-output '|' --header-output false --header")) - - -class TestColumns(TestAbstract): - def test_column_selection(self): - """ Allows specify index (even negative) """ - - self.check("com", "-f tld,-1", "one.com") - self.check('"one.com","com"', "-f tld,-1 -C", "one.com") - self.check("com", "-f tld,1", "one.com") - self.check("Column ID 2 does not exist. We have these so far: one.com", "-f tld,2", "one.com") - self.check('two.cz,one.com,com', "-f tld,2 -C", "two.cz,one.com") - - c = Convey(filename=SHEET_CSV) - self.assertEqual('foo,green,first.example.com,com', c("-f tld,-1")[1]) - self.assertEqual('foo,green,first.example.com,com', c("-f tld,3")[1]) - self.assertEqual('foo,green,first.example.com,com,comA', c("-f tld,-1 -f code,-1,'x+=\"A\"'")[1]) - self.assertEqual('foo,green,first.example.com,com,first.example.comA', c("-f tld,-1 -f code,-2,'x+=\"A\"'")[1]) - - def test_split(self): - lines = Convey()("--split email", "one@example.com\nsecond@example.com") - [self.assertIn(s, lines) for s in - ('* Saved to second@example.com', '"second@example.com"', '* Saved to one@example.com', '"one@example.com"')] - - -class TestFields(TestAbstract): - - def test_base64_detection(self): - """ Base64 detection should work if majority of the possibly decoded characters are not gibberish.""" - self.check("base64", "--single-detect", HELLO_B64) - self.check("", "--single-detect", "ahojahoj") - - def test_base64_charset(self): - """ Base64 detection should work even if encoded with another charset """ - s = "Žluťoučký kůň pěl ďábelské ódy." - encoded = b64encode(s.encode("iso-8859-2")) - convey = Convey() - self.assertIn(s, convey("-f charset,,,iso-8859-2", text=encoded.decode("utf-8"))) - - def test_base64_disambiguation(self): - """ Base64 must not mix up with I.E. hostname """ - c = Convey("--single-detect") - self.assertIn("hostname", c("example.com")) # hostname must not be confounded with base64 - self.assertFalse(c("base")) # 'base' is plaintext - - c = Convey("--single-query") - self.assertIn("m«", c("base -t base64")) # 'base' can be base64 if explicitly told - - def test_phone_detection(self): - """ Various phone formats must pass. """ - c = Convey("--single-detect") - self.assertIn("timestamp", c("2020-02-29")) # date value must not be confused with the phone regex - for phone in ("+420123456789", "+1-541-754-3010", "1-541-754-3010", "001-541-754-3010", "+49-89-636-4801"): - self.assertIn("phone", c(phone), phone) - - def test_pint(self): - """ Test unit conversion works """ - c = Convey() - self.assertIn("unit", c("--single-detect", text="1 kg")) - self.assertIn("2.6792288807189983 troy_pound", c("-f unit[troy_pound]", text="1 kg")) - - def test_wrong_url(self): - c = Convey() - self.assertEqual("http://example.com", c("-f url", text="hXXp://example.com")[0]) - self.assertEqual("https://an.eXAmple.com", c("-f url", text="hxxps://an[.]eXAmple[.]com")[0]) - self.assertEqual("http://185.33.144.243/main_content/", - c("-f url", text="hxxp://185.33.144[.]243/main_content/")[0]) - self.assertEqual("http://80.211.218.7/fb/", c("-f url", text="80.211.218.7/fb/")[0]) - - def test_hostname(self): - self.assertIn("hostname", convey("--single-detect", text="_spf.google.com")) - - def test_timestamp(self): - self.assertIn("timestamp", convey("--single-detect", text="26. 03. 1999")) - time = datetime.now() - self.assertIn("timestamp", convey("--single-detect", text=str(time))) - self.assertIn("timestamp", convey("--single-detect", text=str(int(time.timestamp())))) - - # as of Python3.7 use: - # distant_future = datetime.fromisoformat("3000-01-01") # it is less probable distant dates are dates - distant_future = datetime.fromtimestamp(32503680000.0) - - self.assertIn("timestamp", convey("--single-detect", text=str(distant_future))) - self.assertIn("phone", convey("--single-detect", text=str(int(distant_future.timestamp())))) - # there is no path to datetype date from a phone - self.assertIn("No suitable column found for field 'date'", - convey("-S -f date", text=str(distant_future.timestamp()))) - # however, is is possible to get a date if specified - self.assertIn("3000-01-01", convey("-t timestamp -f date", text=str(int(distant_future.timestamp())))) - # works for float numbers too - self.assertIn("3000-01-01", convey("-t timestamp -f date", text=str(distant_future.timestamp()))) - - # short number is not considered a timestamp from the beginning of the Unix epoch (1970) - self.assertEqual([], convey("--single-detect", text="12345")) - - -class TestAction(TestAbstract): - def test_aggregate(self): - self.check(["sum(price)", "972.0"], f"--aggregate price,sum", filename=CONSUMPTION) - self.check(['category,sum(price)', 'total,972.0', 'kettle,602.0', 'bulb,370.0'], - f"--aggregate price,sum,category", filename=CONSUMPTION) - self.check(['category,sum(price),avg(consumption)', - 'total,972.0,41.0', - 'kettle,602.0,75.0', - 'bulb,370.0,18.33'], f"--aggregate price,sum,consumption,avg,category", filename=CONSUMPTION) - self.check(['category,sum(price),list(price)', - 'total,972.0,(all)', - '''kettle,602.0,"['250', '352']"''', - '''bulb,370.0,"['100', '150', '120']"'''], f"--aggregate price,sum,price,list,category", filename=CONSUMPTION) - - # XX this will correctly split the files, - # however, the output is poor and for a reason not readable by the check. - # self.check(['','Split location: bulb','','Split location: kettle'], - # "--agg price,sum --split category", filename=CONSUMPTION) - # Until then, following substitution is used to generate the files at least - Convey(filename=CONSUMPTION)("--agg price,sum --split category") - - # Check the contents of the files that just have been split - check1 = False - check2 = False - for f in Path().glob("consumption.csv_convey*/*"): - if f.name == "kettle" and f.read_text() == "sum(price)\n602.0\n": - check1 = True - if f.name == "bulb" and f.read_text() == "sum(price)\n370.0\n": - check2 = True - print("ZDEEEEEEEEE", list(Path().glob("consumption.csv_convey*/*"))) - print(check1) - self.assertTrue(check1) - self.assertTrue(check2) - - def test_aggregate_group_col(self): - # group by a column without any additional info means counting - self.check(["price,count(price)", - "total,5", - "100,1", - "150,1", - "250,1", - "352,1", - "120,1"], f"-a price", filename=CONSUMPTION) - - # group by the same column works - self.check([ - "price,sum(price)", - "total,972.0", - "352,352.0", - "250,250.0", - "150,150.0", - "120,120.0", - "100,100.0"], f"--aggregate price,sum,price", filename=CONSUMPTION) - - self.check([ - "price,count(price)", - "total,5", - "100,1", - "150,1", - "250,1", - "352,1", - "120,1"], f"--aggregate price,count,price", filename=CONSUMPTION) - - # group by a different column when counting does not make sense - msg = "ERROR:convey.action_controller:Count column 'price' must be the same as the grouping column 'consumption'." - with self.assertLogs(level='WARNING') as cm: - self.check("", f"--aggregate price,count,consumption", filename=CONSUMPTION) - self.assertEqual([msg], cm.output) - - def test_merge(self): - # merging generally works - self.check(COMBINED_SHEET_PERSON, f"--merge {PERSON_CSV},2,1", filename=SHEET_CSV) - - # rows can be duplicated due to other fields - self.check(COMBINED_LIST_METHOD, - f"--merge {PERSON_CSV},2,1 -f external,1,external_pick_base.py,list_method", filename=SHEET_CSV) - - # merging file with header and with a missing value - self.check(SHEET_PERSON_CSV, f"--merge {PERSON_HEADER_CSV},2,1", filename=SHEET_CSV) - - # merge on a column type - self.check(PERSON_GIF_CSV, f"--merge {GIF_CSV},email,email", filename=PERSON_CSV) - - # merge by a column number - self.check(PERSON_GIF_CSV, f"--merge {GIF_CSV},email,1", filename=PERSON_CSV) - - # invalid column definition - msg = "ERROR:convey.identifier:Cannot identify COLUMN invalid, put there an exact column name, its type, the numerical order starting with 1, or with -1." - with self.assertLogs(level='WARNING') as cm: - self.check("", f"--merge {GIF_CSV},email,invalid", filename=PERSON_CSV) - self.assertEqual([msg], cm.output) - # merging a file with itself - self.check(SHEET_HEADER_ITSELF_CSV, f"--merge {SHEET_HEADER_CSV},4,2", filename=SHEET_HEADER_CSV) - - # only local file has header; different dialects - self.check(SHEET_HEADER_PERSON_CSV, f"--merge {PERSON_CSV},2,1", filename=SHEET_HEADER_CSV) - - def test_compute_from_merge(self): - """ Computing a new column from another file currenlty being merged was not implemented. """ - self.check('Column ID 6 does not exist. We have these so far: foo, red, second.example.com', - f"--merge {PERSON_CSV},2,1 -f base64,6", filename=SHEET_CSV) - - -class TestInternal(TestAbstract): - def test_similar_fields(self): - """ Recommending of the similar columns """ - c1 = self.check(None, f"--merge {PERSON_CSV},2,1", filename=GIF_CSV) - parser1A, parser1B = c1.parser, c1.parser.settings["merge"][0].remote_parser - fields1A = c1.parser.fields - fields1B = parser1B.fields - self.assertListEqual([fields1A[0]], parser1A.get_similar(fields1B)) - self.assertListEqual([fields1B[0]], parser1B.get_similar(fields1A)) - self.assertListEqual([fields1B[0]], parser1B.get_similar(fields1A[0])) - self.assertListEqual([], parser1B.get_similar(fields1A[1])) - - c2 = self.check(None, f"--merge {PERSON_GIF_CSV},2,1", filename=GIF_CSV) - parser2A, parser2B = c2.parser, c2.parser.settings["merge"][0].remote_parser - fields2A = c2.parser.fields - fields2B = parser2B.fields - self.assertListEqual([fields2A[0], fields2A[1]], parser2A.get_similar(fields2B)) - self.assertListEqual([fields2A[0]], parser2A.get_similar(fields2B[0])) - self.assertListEqual([], parser2A.get_similar(fields2B[1])) - self.assertListEqual([fields2A[1]], parser2A.get_similar(fields2B[3])) - self.assertListEqual([fields2B[0], fields2B[3]], parser2B.get_similar(fields2A)) - - -class TestLaunching(TestAbstract): - def test_piping_in(self): - convey = Convey() - # just string specified, nothing else - lines = convey(piped_text="3 kg") - self.assertTrue(len(lines) == 1) - self.assertTrue("'1.806642228624337e+27 dalton'" in lines[0]) - - # field base64 specified - self.assertListEqual([HELLO_B64], convey("-f base64", piped_text="hello")) - self.assertListEqual(['hello'], convey(piped_text=HELLO_B64)) - self.assertListEqual([], convey(piped_text="hello")) - - def test_single_query_processing(self): - """ Stable --single-query parsing """ - - # Single field containing a comma, still must be fully converted (comma must not be mistaken for a CSV delimiter) - self.check("aGVsbG8sIGhlbGxv", "-f base64", "hello, hello", debug=True) - self.check("V=C3=A1=C5=BEen=C3=A1 Ad=C3=A9lo, ra=C4=8Dte vstoupit", - "-f quoted_printable", "Vážená Adélo, račte vstoupit") - self.check([], "", "hello, hello") - - # multiline base64 string - multiline = "ahoj\nahoj" - multiline_64 = b64encode(multiline.encode()).decode() - word = "ahoj" - word_64 = b64encode(word.encode()).decode() - - self.check(multiline_64, "--single-query -f base64", multiline) - self.check([f'"{word}","{word_64}"']*2, "-f base64", multiline) - - def test_conversion(self): - lines = ["john@example.com", "mary@example.com", "hyacint@example.com"] - with TemporaryDirectory() as temp: - for pattern in (PERSON_XLS, PERSON_XLSX, PERSON_ODS): - f = Path(temp, pattern.name) - f_converted = Path(temp, pattern.name + ".csv") - # XX as of Python3.8, use this line: shutil.copy(pattern, f) - shutil.copy(str(pattern), str(f)) # move to temp dir to not pollute the tests folder - - self.assertFalse(f_converted.exists()) - self.check(lines, f"-s 1", filename=f) - self.assertTrue(f_converted.exists()) - # try again, as the file now exists - self.check(lines, f"-s 1", filename=f) - - # clean the converted file up and use it not as the main file but - # as a secondary Wrapper – in a merge action - f_converted.unlink() - self.check(COMBINED_SHEET_PERSON, f"--merge {f},2,1", filename=SHEET_CSV) - self.assertTrue(f_converted.exists()) - self.check(COMBINED_SHEET_PERSON, f"--merge {f},2,1", filename=SHEET_CSV) - - -class TestSending(TestCase): - def test_dynamic_template(self): - convey = Convey("--output", "False", filename=SHEET_CSV) - cmd = """--field code,3,'x="example@example.com" if "example.com" in x else x+"@example.com"'""" \ - " --split code --send-test {mail} 'email_template.eml' --headless" - lines = convey(cmd.format(mail="example@example.com")) - self.assertIn('Subject: My cool dynamic template demonstrating a long amount of lines!', lines) - self.assertIn('We send you lots of colours: red, green, yellow.', lines) - self.assertIn('foo,green,first.example.com,example@example.com', lines) - # attachment must not be present because we called attachment() in the template - self.assertNotIn('Attachment', lines[0]) - - lines = convey(cmd.format(mail="wikipedia.com@example.com")) - self.assertIn('Subject: My cool dynamic template demonstrating a short amount of lines!', lines) - self.assertIn('We send you single colour: orange.', lines) - self.assertIn('Attachment sheet.csv (text/csv):', lines[0]) - - cmd += " --header" # we force first row to be a header - lines = convey(cmd.format(mail="wikipedia.com@example.com")) - # even though there is header in the file, we should still get single value - self.assertIn('We send you single colour: orange.', lines) - - # XX we should test body, subject, references flag - # def test_body_flag(self): - # convey = Convey(FILTER_FILE) - # cmd = """--body "body text" """ - - def test_send(self): - BLACK = "Attachment black.gif (image/gif)" - WHITE = "Attachment white.gif (image/gif)" - COLOURS = "Attachment gif.csv (text/csv)" - convey = Convey("--output", "False", filename=GIF_CSV) - cmd_pattern = """-t abusemail,path --split abusemail --send-test {mail} 'bare_template.eml' """ - - cmd = cmd_pattern + "--attach-files False --attach-paths-from-path-column True" - - # Single image is attached - lines = convey(cmd.format(mail="john@example.com")) - self.assertIn(BLACK, lines[0]) - lines = convey(cmd.format(mail="mary@example.com")) - self.assertIn(WHITE, lines[0]) - - # Two images are attached - lines = convey(cmd.format(mail="jack@example.com")) - self.assertIn(BLACK, lines[0]) - self.assertIn(WHITE, lines[4]) - - # Image cannot be attached - lines = convey(cmd.format(mail="hyacint@example.com")) - self.assertIn( - "Convey crashed at For security reasons, path must be readable to others: red-permission.gif", lines[0]) - - # Flags controlling attachments work - lines = convey(cmd_pattern.format(mail="john@example.com") + - "--attach-files True --attach-paths-from-path-column False") - self.assertIn(COLOURS, lines[0]) - lines = convey(cmd_pattern.format(mail="john@example.com") + - "--attach-files True --attach-paths-from-path-column True") - self.assertIn(COLOURS, lines[0]) - self.assertIn(BLACK, lines[1]) - lines = convey(cmd_pattern.format(mail="john@example.com") + - "--attach-files False --attach-paths-from-path-column False") - self.assertNotIn(COLOURS, lines[0]) - self.assertNotIn(BLACK, lines[1]) - - -class TestExternals(TestAbstract): - - def test_list_in_result(self): - """ The method returns a list while CSV processing. """ - self.check(SHEET_DUPLICATED_CSV, "-f external,external_pick_base.py,list_method", filename=SHEET_CSV) - - def test_bare_method(self): - # single value converted by the external - self.check("-foo-", "--field external,external_pick_base.py,dumb_method --input 'foo'") - - # by default, first column is used - self.check("foo,bar,-foo-", "--field external,external_pick_base.py,dumb_method -C --input 'foo,bar'") - - # specify 2nd column is the source for the external field - self.check("foo,bar,-bar-", "--field external,2,plaintext,external_pick_base.py,dumb_method -C --input 'foo,bar'") - - def test_pick_input(self): - convey = Convey() - lines = convey("--field external,external_pick_base.py,time_format --input '2016-08-08 12:00'") - self.assertEqual(lines, ['12:00']) - - def test_pick_method(self): - convey = Convey() - # method "all" (default one) is used and "1" passes - self.assertEqual(["1"], convey("--field external,external_pick_base.py,PickMethodTest --input '1'")) - # method "filtered" is used and "a" passes - self.assertEqual(["a"], convey("--field external,external_pick_base.py,PickMethodTest,filtered --input 'a'")) - # method "filtered" is used which excludes "1" - self.assertNotEqual(["1"], convey("--field external,external_pick_base.py,PickMethodTest,filtered --input '1'")) - - # XX does not work, error when resolving path Unit -> Plaintext -> External. Identifier.get_methods_from - # `lambda_ = lambda_.get_lambda(custom.pop(0) if custom is not None else None)` - # -> get_module_from_path(custom[0], ...) does not contain a path - # self.assertEqual(["a"], convey("--field external,external_pick_base.py,PickMethodTest --input 'mm'")) - - -if __name__ == '__main__': - main()