diff --git a/tkmail/__init__.py b/tkmail/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tkmail/__main__.py b/tkmail/__main__.py deleted file mode 100644 index 5413580..0000000 --- a/tkmail/__main__.py +++ /dev/null @@ -1,56 +0,0 @@ -import logging -import argparse -import asyncore - -from tkmail.server import TKForwarder - - -def configure_logging(): - root = logging.getLogger() - file_handler = logging.FileHandler('tkmail.log', 'a') - stream_handler = logging.StreamHandler(None) - fmt = '[%(asctime)s %(levelname)s] %(message)s' - datefmt = None - formatter = logging.Formatter(fmt, datefmt, '%') - for handler in (file_handler, stream_handler): - handler.setFormatter(formatter) - root.addHandler(handler) - root.setLevel(logging.DEBUG) - - -def get_parser(): - parser = argparse.ArgumentParser() - parser.add_argument('-p', '--port', type=int, default=25, - help='Relay port') - parser.add_argument('-P', '--listen-port', type=int, default=9000, - help='Listen port') - parser.add_argument('-y', '--gf', type=int, default=2014, - help='GF year') - return parser - - -def main(): - configure_logging() - parser = get_parser() - args = parser.parse_args() - - receiver_host = '127.0.0.1' - receiver_port = args.listen_port - relay_host = '127.0.0.1' - relay_port = args.port - - server = TKForwarder( - receiver_host, receiver_port, relay_host, relay_port, - year=args.gf) - try: - asyncore.loop(timeout=0.1, use_poll=True) - except KeyboardInterrupt: - logging.info('TKForwarder exited via KeyboardInterrupt') - except: - logging.exception('TKForwarder exited via exception') - else: - logging.error('TKForwarder exited via asyncore.loop returning') - - -if __name__ == "__main__": - main() diff --git a/tkmail/address.py b/tkmail/address.py deleted file mode 100644 index 4bf9a88..0000000 --- a/tkmail/address.py +++ /dev/null @@ -1,203 +0,0 @@ -import re - -from emailtunnel import InvalidRecipient -from tkmail.database import Database -from tkmail.config import ADMINS - - -def get_admin_emails(): - """Resolve the group "admin" or fallback if the database is unavailable. - - The default set of admins is set in the tkmail.database module. - """ - - email_addresses = [] - try: - db = Database() - email_addresses = [ - addy.replace('@', '@') - for addy in db.get_admin_emails() - ] - except: - pass - - if not email_addresses: - email_addresses = list(ADMINS) - - return email_addresses - - -def translate_recipient(year, name): - """Translate recipient `name` in GF year `year`. - - >>> translate_recipient(2010, "K3FORM") - ["mathiasrav@gmail.com"] - - >>> translate_recipient(2010, "GFORM14") - ["mathiasrav@gmail.com"] - - >>> translate_recipient(2010, "BEST2013") - ["mathiasrav@gmail.com", ...] - """ - - db = Database() - recipient_ids = parse_recipient(name.upper(), db, year) - email_addresses = [ - addy.replace('@', '@').strip() - for addy in db.get_email_addresses(recipient_ids) - ] - return email_addresses - - -prefixValues = { - 'K': -1, - 'G': 1, - 'B': 2, - 'O': 3, - 'T': 1 -} - - -def parse_recipient(recipient, db, currentYear): - """ - Evaluate each address which is divided by + and -. - Collects the resulting sets of not matched and the set of spam addresses. - And return the set of person indexes that are to receive the email. - """ - - personIdOps = [] - invalid_recipients = [] - for sign, name in re.findall(r'([+-]?)([^+-]+)', recipient): - try: - personIds = parse_alias(name, db, currentYear) - personIdOps.append((sign or '+', personIds)) - except InvalidRecipient as e: - invalid_recipients.append(e.args[0]) - - if invalid_recipients: - raise InvalidRecipient(invalid_recipients) - - recipient_ids = set() - for sign, personIds in personIdOps: - if sign == '+': # union - recipient_ids = recipient_ids.union(personIds) - else: # minus - recipient_ids = recipient_ids.difference(personIds) - - return recipient_ids - - -def parse_alias(alias, db, currentYear): - """ - Evaluates the alias, returning a non-empty list of person IDs. - Raise exception if a spam or no match email. - """ - - anciprefix = r"(?P
(?:[KGBOT][KGBOT0-9]*)?)" - ancipostfix = r"(?P(?:[0-9]{2}|[0-9]{4})?)" - try: - groups = db.get_groups() - matches = [] - for row in groups: - groupId, groupRegexp, relativ, groupType = row - groupId = int(groupId) - if relativ == 1: # Relativ = true - regexp = (r'^%s(?P %s)%s$' - % (anciprefix, groupRegexp, ancipostfix)) - else: - regexp = '^(?P %s)$' % groupRegexp - result = re.match(regexp, alias) - if result: - matches.append((groupId, groupType, result)) - - if not matches: - raise InvalidRecipient(alias) - - if len(matches) > 1: - raise ValueError("The alias %r matches more than one group" - % alias) - - groupId, groupType, result = matches[0] - - if groupType == 0: # Group, without aging - personIds = db.get_group_members(groupId) - elif groupType == 1: # Group with aging - grad = getGrad( - result.group("pre"), - result.group("post"), - currentYear) - personIds = db.get_grad_group_members(groupId, grad) - elif groupType == 2: # Titel, with/without aging - grad = getGrad( - result.group("pre"), - result.group("post"), - currentYear) - personIds = db.get_user_by_title(result.group('name'), grad) - elif groupType == 3: # Direct user - personIds = db.get_user_by_id(result.group('name')[6:]) - elif groupType == 4: # BESTFU hack - grad = getGrad( - result.group("pre"), - result.group("post"), - currentYear) - personIds = ( - db.get_grad_group_members(groupId + 1, grad) - + db.get_grad_group_members(groupId - 1, grad)) - else: - raise Exception( - "Error in table gruppe, type: %s is unknown." - % groupType) - - if not personIds: - # No users in the database fit the current alias - raise InvalidRecipient(alias) - - return personIds - - finally: - pass - - -def getGrad(preFix, postFix, currentYear): - """ - CurrentYear is the year where the current BEST was elected. - Assumes currentyear <= 2056. - Returnes the corresponding grad of pre,post and currentYear. - (Calculates as the person have the prefix in year postfix) - """ - - if not postFix: - grad = 0 - else: - if len(postFix) == 4: - first, second = int(postFix[0:2]), int(postFix[2:4]) - # Note that postFix 1920, 2021 and 2122 are technically ambiguous, - # but luckily there was no BEST in 1920 and this script hopefully - # won't live until the year 2122, so they are not actually - # ambiguous. - if (first + 1) % 100 == second: - # There should be exactly one year between the two numbers - if first > 56: - grad = currentYear - (1900 + first) - else: - grad = currentYear - (2000 + first) - elif first in (19, 20): - # 19xx or 20xx - grad = currentYear - int(postFix) - else: - raise InvalidRecipient(postFix) - elif len(postFix) == 2: - year = int(postFix[0:2]) - if year > 56: # 19?? - grad = currentYear - (1900 + year) - else: # 20?? - grad = currentYear - (2000 + year) - else: - raise InvalidRecipient(postFix) - - # Now evaluate the prefix: - for base, exponent in re.findall(r"([KGBOT])([0-9]*)", preFix): - exponent = int(exponent or 1) - grad += prefixValues[base] * int(exponent) - - return grad diff --git a/tkmail/database.py b/tkmail/database.py deleted file mode 100644 index 0543e37..0000000 --- a/tkmail/database.py +++ /dev/null @@ -1,131 +0,0 @@ -# import pkg_resources ##required on my setup to use MySQLdb -# pkg_resources.require("MySQL-python") ##required on my setup to use MySQLdb -# import MySQLdb as mdb -import mysql.connector -from tkmail.config import HOSTNAME, USERNAME, PASSWORD, DATABASE - - -class Error(Exception): - """Base class of my own error types""" - def __init__(self, msg): - super(Error, self).__init__() - self.msg = msg - - -class Database(object): - tkfolk_schema = """ - id int(11) NO None primary, auto_increment - navn varchar(50) YES None - email varchar(50) YES None - accepteremail - char(3) YES ja - accepterdirektemail - char(3) NO Ja - gade varchar(50) YES None - husnr varchar(15) YES None - postnr varchar(10) YES None - postby varchar(25) YES None - land varchar(50) YES None - gone char(3) NO nej - tlf varchar(20) YES None - note text YES None - """ - - def __init__(self): - if HOSTNAME not in ('127.0.0.1', 'localhost'): - raise ValueError('Non-local hostname not supported by ' + - 'mysql.connector') - self._mysql = mysql.connector.Connect( - user=USERNAME, password=PASSWORD, database=DATABASE) - - self._cursor = self._mysql.cursor() - - def _execute(self, statement, *args): - if args: - sql = statement % args - else: - sql = statement - - self._cursor.execute(sql) - - def _fetchall(self, *args, **kwargs): - column = kwargs.pop('column', None) - self._execute(*args) - rows = self._cursor.fetchall() - if column is not None: - return [row[column] for row in rows] - else: - return list(rows) - - def get_people(self, **kwargs): - column_list = ("id navn email accepteremail accepterdirektemail " - "gade husnr postnr postby land gone tlf note".split()) - columns = ', '.join("`%s`" % column for column in column_list) - - clauses = [] - for k, v in kwargs.items(): - if k == 'id__in': - id_string = ','.join('"%s"' % each for each in v) - clauses.append(('`id` IN %s', id_string)) - else: - raise TypeError('unknown kwarg "%s"' % k) - - if clauses: - where_clause = ' AND '.join(expr for expr, param in clauses) - else: - where_clause = "1" - - format_args = [param for expr, param in clauses] - - rows = self._fetchall( - "SELECT %s FROM `tkfolk` WHERE %s" - % (columns, where_clause), - *format_args) - - return [dict(zip(column_list, row)) for row in rows] - - def get_email_addresses(self, id_list): - id_string = ','.join(str(each) for each in id_list) - return self._fetchall(""" - SELECT `email` FROM `tkfolk` - WHERE `id` IN (%s) - AND `accepterdirektemail`='Ja' - """, id_string, column=0) - - def get_admin_emails(self): - return self._fetchall(""" - SELECT `tkfolk`.`email` - FROM `tkfolk`, `grupper`,`gruppemedlemmer` - WHERE `grupper`.`navn`='admin' - AND `gruppemedlemmer`.`gruppeid`=`grupper`.`id` - AND `gruppemedlemmer`.`personid`= `tkfolk`.`id` - """, column=0) - - def get_groups(self): - return self._fetchall(""" - SELECT `id`,`regexp`,`relativ`,`type` FROM grupper - """) - - def get_group_members(self, group_id): - return self._fetchall(""" - SELECT `personid` FROM `gruppemedlemmer` - WHERE `gruppeid`='%s' - """, group_id, column=0) - - def get_grad_group_members(self, group_id, grad): - return self._fetchall(""" - SELECT `personid` FROM `gradgruppemedlemmer` - WHERE `gruppeid`='%s' AND `grad`='%s' - """, group_id, grad, column=0) - - def get_user_by_title(self, title, grad): - return self._fetchall(""" - SELECT `personid` FROM `titler` - WHERE `inttitel`='%s' AND `grad`='%s' - """, title, grad, column=0) - - def get_user_by_id(self, user_id): - return self._fetchall(""" - SELECT `id` FROM `tkfolk` - WHERE `id`='%s' - """, user_id, column=0) diff --git a/tkmail/monitor.py b/tkmail/monitor.py deleted file mode 100644 index 9e491db..0000000 --- a/tkmail/monitor.py +++ /dev/null @@ -1,184 +0,0 @@ -import os -import sys -import json -import time -import logging -import argparse -import textwrap -import smtplib - -from emailtunnel import Message -import tkmail.address - - -MAX_SIZE = 10 -MAX_DAYS = 2 - - -def configure_logging(use_tty): - root = logging.getLogger() - if use_tty: - handler = logging.StreamHandler(None) - else: - handler = logging.FileHandler('monitor.log', 'a') - fmt = '[%(asctime)s %(levelname)s] %(message)s' - datefmt = None - formatter = logging.Formatter(fmt, datefmt, '%') - handler.setFormatter(formatter) - root.addHandler(handler) - root.setLevel(logging.DEBUG) - - -def get_report(basename): - with open('error/%s.json' % basename) as fp: - metadata = json.load(fp) - - mtime = os.stat('error/%s.txt' % basename).st_mtime - - report = dict(metadata) - report['mtime'] = int(mtime) - report['basename'] = basename - return report - - -def archive_report(basename): - for ext in 'txt json mail'.split(): - filename = '%s.%s' % (basename, ext) - try: - os.rename('error/%s' % filename, 'errorarchive/%s' % filename) - except: - logging.exception('Failed to move %s' % filename) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('-n', '--dry-run', action='store_true') - args = parser.parse_args() - - configure_logging(args.dry_run) - - try: - filenames = os.listdir('error') - except OSError: - filenames = [] - - now = int(time.time()) - oldest = now - reports = [] - - for filename in sorted(filenames): - if not filename.endswith('.txt'): - continue - - basename = filename[:-4] - - try: - report = get_report(basename) - except: - exc_value = sys.exc_info()[1] - logging.exception('get_report failed') - report = { - 'subject': ' ' - % (basename, exc_value), - 'basename': basename, - } - else: - oldest = min(oldest, report['mtime']) - - reports.append(report) - - age = now - oldest - - logging.info('%s report(s) / age %s (limit is %s / %s)' % - (len(reports), age, MAX_SIZE, MAX_DAYS * 24 * 60 * 60)) - - if (not args.dry_run and (len(reports) <= MAX_SIZE and - age <= MAX_DAYS * 24 * 60 * 60)): - return - - admins = tkmail.address.get_admin_emails() - - # admins = ['mathiasrav@gmail.com'] - - keys = 'mailfrom rcpttos subject date summary mtime basename'.split() - - lists = {} - for k in keys: - if k == 'rcpttos': - sort_key = lambda x: tuple(x[1].get(k, [])) - else: - sort_key = lambda x: x[1].get(k, '') - lists[k] = '\n'.join( - '%s. %s' % (i + 1, report.get(k)) - for i, report in sorted( - enumerate(reports), - key=sort_key - )) - - body = textwrap.dedent(""" - This is the mail system of TAAGEKAMMERET. - - The following emails were not delivered to anyone. - - Reasons for failed delivery: - {lists[summary]} - - Subjects: - {lists[subject]} - - Senders: - {lists[mailfrom]} - - Recipients: - {lists[rcpttos]} - - Sent: - {lists[date]} - - Received: - {lists[mtime]} - - Local reference in errorarchive folder: - {lists[basename]} - """).format(lists=lists) - - sender = recipient = 'admin@TAAGEKAMMERET.dk' - message = Message.compose( - sender, recipient, '[TK-admin] Failed email delivery', body) - - if args.dry_run: - print("Envelope sender: %r" % sender) - print("Envelope recipients: %r" % admins) - print("Envelope message:") - print(str(message)) - return - - relay_hostname = '127.0.0.1' - relay_port = 25 - relay_host = smtplib.SMTP(relay_hostname, relay_port) - relay_host.set_debuglevel(0) - - try: - relay_host.sendmail(sender, admins, str(message)) - finally: - try: - relay_host.quit() - except smtplib.SMTPServerDisconnected: - pass - - # If no exception was raised, the following code is run - for report in reports: - archive_report(report['basename']) - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - logging.info('monitor exited via KeyboardInterrupt') - raise - except SystemExit: - raise - except: - logging.exception('monitor exited via exception') - raise diff --git a/tkmail/server.py b/tkmail/server.py deleted file mode 100644 index 7836c60..0000000 --- a/tkmail/server.py +++ /dev/null @@ -1,223 +0,0 @@ -import os -import re -import sys -import json -import logging -import datetime -import textwrap -import itertools -import traceback - -import email.header - -import emailtunnel -from emailtunnel import SMTPForwarder, Message - -import tkmail.address - - -def now_string(): - """Return the current date and time as a string.""" - return datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f") - - -class TKForwarder(SMTPForwarder): - ERROR_TEMPLATE = """ - This is the mail system of TAAGEKAMMERET. - - The following exception was raised when processing the message below: - - {traceback} - - This exception will not be reported again before the mail server has - been restarted. - - Envelope sender: {mailfrom} - Envelope recipients: {rcpttos} - Envelope message: - - {message} - """ - - ERROR_TEMPLATE_CONSTRUCTION = """ - This is the mail system of TAAGEKAMMERET. - - The following exception was raised when CONSTRUCTING AN ENVELOPE: - - {traceback} - - This exception will not be reported again before the mail server has - been restarted. - - Raw data: - - {data} - """ - - def __init__(self, *args, **kwargs): - self.year = kwargs.pop('year') - self.exceptions = set() - self.delivered = 0 - self.deliver_recipients = {} - super(TKForwarder, self).__init__(*args, **kwargs) - - def startup_log(self): - logging.info( - 'TKForwarder listening on %s:%s, relaying to port %s, GF year %s' - % (self.host, self.port, self.relay_port, self.year)) - - def log_receipt(self, peer, envelope): - mailfrom = envelope.mailfrom - rcpttos = envelope.rcpttos - message = envelope.message - - if type(mailfrom) == str: - sender = '<%s>' % mailfrom - else: - sender = repr(mailfrom) - - if type(rcpttos) == list and all(type(x) == str for x in rcpttos): - rcpttos = [re.sub(r'@(T)AAGE(K)AMMERET\.dk$', r'@@\1\2', x, - 0, re.I) - for x in rcpttos] - if len(rcpttos) == 1: - recipients = '<%s>' % rcpttos[0] - else: - recipients = ', '.join('<%s>' % x for x in rcpttos) - else: - recipients = repr(rcpttos) - - logging.info("Subject: %r From: %s To: %s" - % (str(message.subject), sender, recipients)) - - def log_delivery(self, message, recipients, sender): - recipients_string = emailtunnel.abbreviate_recipient_list(recipients) - - if len(recipients_string) > 200: - age = self.deliver_recipients.get(recipients_string) - if age is None or self.delivered - age > 40: - self.deliver_recipients[recipients_string] = self.delivered - recipients_string += ' [%d]' % self.delivered - else: - recipients_string = '%s... [%d]' % ( - recipients_string[:197], age) - - self.delivered += 1 - - logging.info('Subject: %r To: %s' - % (str(message.subject), recipients_string)) - - def reject(self, envelope): - rcpttos = tuple(r.lower() for r in envelope.rcpttos) - subject = str(envelope.message.subject) - return (rcpttos == ('admin@taagekammeret.dk',) - and 'Delayed Mail' in subject) - - def handle_envelope(self, envelope, peer): - if self.reject(envelope): - description = summary = 'Rejected due to TKForwarder.reject' - self.store_failed_envelope(envelope, description, summary) - - else: - return super(TKForwarder, self).handle_envelope(envelope, peer) - - def translate_subject(self, envelope): - subject = envelope.message.subject - subject_decoded = str(subject) - - if '[TK' in subject_decoded: - # No change - return None - - try: - chunks = subject._chunks - except AttributeError: - logging.warning('envelope.message.subject does not have _chunks') - chunks = email.header.decode_header(subject_decoded) - - # No space in '[TK]', since the chunks are joined by spaces. - subject_chunks = [('[TK]', None)] + list(chunks) - return email.header.make_header(subject_chunks) - - def translate_recipient(self, rcptto): - name, domain = rcptto.split('@') - return tkmail.address.translate_recipient(self.year, name) - - def get_envelope_mailfrom(self, envelope): - return 'admin@TAAGEKAMMERET.dk' - - def log_invalid_recipient(self, envelope, exn): - # Use logging.info instead of the default logging.error - logging.info('Invalid recipient: %r' % (exn.args,)) - - def handle_invalid_recipient(self, envelope, exn): - self.store_failed_envelope( - envelope, str(exn), 'Invalid recipient: %s' % exn) - - def handle_error(self, envelope, str_data): - exc_value = sys.exc_info()[1] - exc_typename = type(exc_value).__name__ - filename, line, function, text = traceback.extract_tb( - sys.exc_info()[2])[0] - - tb = ''.join(traceback.format_exc()) - if envelope: - self.store_failed_envelope( - envelope, str(tb), - '%s: %s' % (exc_typename, exc_value)) - - exc_key = (filename, line, exc_typename) - - if exc_key not in self.exceptions: - self.exceptions.add(exc_key) - self.forward_to_admin(envelope, str_data, tb) - - def forward_to_admin(self, envelope, str_data, tb): - # admin_emails = tkmail.address.get_admin_emails() - admin_emails = ['mathiasrav@gmail.com'] - - sender = recipient = 'admin@TAAGEKAMMERET.dk' - - if envelope: - subject = '[TK-mail] Unhandled exception in processing' - body = textwrap.dedent(self.ERROR_TEMPLATE).format( - traceback=tb, mailfrom=envelope.mailfrom, - rcpttos=envelope.rcpttos, message=envelope.message) - - else: - subject = '[TK-mail] Could not construct envelope' - body = textwrap.dedent(self.ERROR_TEMPLATE_CONSTRUCTION).format( - traceback=tb, data=str_data) - - admin_message = Message.compose( - sender, recipient, subject, body) - admin_message.add_header('Auto-Submitted', 'auto-replied') - - self.deliver(admin_message, admin_emails, sender) - - def store_failed_envelope(self, envelope, description, summary): - now = now_string() - - try: - os.mkdir('error') - except OSError: - pass - - with open('error/%s.mail' % now, 'wb') as fp: - fp.write(envelope.message.as_binary()) - - with open('error/%s.json' % now, 'w') as fp: - metadata = { - 'mailfrom': envelope.mailfrom, - 'rcpttos': envelope.rcpttos, - 'subject': str(envelope.message.subject), - 'date': envelope.message.get_header('Date'), - 'summary': summary, - } - json.dump(metadata, fp) - - with open('error/%s.txt' % now, 'w') as fp: - fp.write('From %s\n' % envelope.mailfrom) - fp.write('To %s\n\n' % envelope.rcpttos) - fp.write('%s\n' % description) - fp.write(str(envelope.message)) diff --git a/tkmail/test.py b/tkmail/test.py deleted file mode 100644 index ca661a7..0000000 --- a/tkmail/test.py +++ /dev/null @@ -1,235 +0,0 @@ -import time -import logging -import smtplib -import asyncore -import threading - -import email.header - -from emailtunnel import SMTPReceiver, Envelope -from tkmail.server import TKForwarder -import emailtunnel.send - - -envelopes = [] - - -def deliver_local(message, recipients, sender): - logging.info("deliver_local: From: %r To: %r Subject: %r" - % (sender, recipients, str(message.subject))) - for recipient in recipients: - if '@' not in recipient: - raise smtplib.SMTPDataError(0, 'No @ in %r' % recipient) - envelope = Envelope(message, sender, recipients) - envelopes.append(envelope) - - -class DumpReceiver(SMTPReceiver): - def handle_envelope(self, envelope): - envelopes.append(envelope) - - -class RecipientTest(object): - _recipients = [] - - def get_envelopes(self): - envelopes = [] - for i, recipient in enumerate(self._recipients): - envelopes.append( - ('-F', 'recipient_test@localhost', - '-T', '%s@TAAGEKAMMERET.dk' % recipient, - '-s', '%s_%s' % (id(self), i), - '-I', 'X-test-id', self.get_test_id())) - return envelopes - - def get_test_id(self): - return str(id(self)) - - def check_envelopes(self, envelopes): - recipients = [] - for i, envelope in enumerate(envelopes): - recipients += envelope.rcpttos - self.check_recipients(recipients) - - def check_recipients(self, recipients): - raise NotImplementedError() - - -class SameRecipientTest(RecipientTest): - def __init__(self, *recipients): - self._recipients = recipients - - def check_recipients(self, recipients): - if len(recipients) != len(self._recipients): - raise AssertionError( - "Bad recipient count: %r vs %r" % - (recipients, self._recipients)) - if any(x != recipients[0] for x in recipients): - raise AssertionError("Recipients not the same: %r" % recipients) - - -class MultipleRecipientTest(RecipientTest): - def __init__(self, recipient): - self._recipients = [recipient] - - def check_recipients(self, recipients): - if len(recipients) <= 1: - raise AssertionError("Only %r recipients" % len(recipients)) - - -class SubjectRewriteTest(object): - def __init__(self, subject): - self.subject = subject - - def get_envelopes(self): - return [ - ('-F', 'subject-test@localhost', - '-T', 'FORM13@TAAGEKAMMERET.dk', - '-s', self.subject, - '-I', 'X-test-id', self.get_test_id()) - ] - - def check_envelopes(self, envelopes): - message = envelopes[0].message - - try: - output_subject_raw = message.get_unique_header('Subject') - except KeyError as e: - raise AssertionError('No Subject in message') from e - - input_header = email.header.make_header( - email.header.decode_header(self.subject)) - - input_subject = str(input_header) - output_subject = str(message.subject) - - if '[TK' in input_subject: - expected_subject = input_subject - else: - expected_subject = '[TK] %s' % input_subject - - if output_subject != expected_subject: - raise AssertionError( - 'Bad subject: %r == %r turned into %r == %r, ' - 'expected %r' % (self.subject, input_subject, - output_subject_raw, output_subject, - expected_subject)) - - def get_test_id(self): - return str(id(self)) - - -class ErroneousSubjectTest(object): - def __init__(self, subject): - self.subject = subject - - def get_envelopes(self): - return [ - ('-F', 'subject-test@localhost', - '-T', 'FORM13@TAAGEKAMMERET.dk', - '-s', self.subject, - '-I', 'X-test-id', self.get_test_id()) - ] - - def check_envelopes(self, envelopes): - # If the message did not throw an error, we are happy - pass - - def get_test_id(self): - return str(id(self)) - - -class NoSubjectTest(object): - def get_envelopes(self): - return [ - ('-F', 'no-subject-test@localhost', - '-T', 'FORM13@TAAGEKAMMERET.dk', - '-I', 'X-test-id', self.get_test_id()) - ] - - def check_envelopes(self, envelopes): - # If the message did not throw an error, we are happy - pass - - def get_test_id(self): - return str(id(self)) - - -def main(): - relayer_port = 11110 - dumper_port = 11111 - relayer = TKForwarder('127.0.0.1', relayer_port, - '127.0.0.1', dumper_port, - year=2014) - # dumper = DumpReceiver('127.0.0.1', dumper_port) - relayer.deliver = deliver_local - - poller = threading.Thread( - target=asyncore.loop, - kwargs={'timeout': 0.1, 'use_poll': True}) - poller.start() - - tests = [ - SameRecipientTest('FORM13', 'FORM2013', 'FORM1314', 'gFORM14'), - SameRecipientTest('FORM', 'BEST-CERM-INKA-KASS-nf-PR-SEKR-VC'), - MultipleRecipientTest('BEST'), - MultipleRecipientTest('BESTFU'), - MultipleRecipientTest('FU'), - MultipleRecipientTest('ADMIN'), - MultipleRecipientTest('engineering'), - MultipleRecipientTest('revy+revyteknik'), - MultipleRecipientTest('tke'), - SubjectRewriteTest('=?UTF-8?Q?Gl=C3=A6delig_jul?='), - SubjectRewriteTest('=?UTF-8?Q?Re=3A_=5BTK=5D_Gl=C3=A6delig_jul?='), - # Invalid encoding a; should be skipped by ecre in email.header - ErroneousSubjectTest('=?UTF-8?a?hello_world?='), - # Invalid base64 data; email.header raises an exception - ErroneousSubjectTest('=?UTF-8?b?hello_world?='), - NoSubjectTest(), - ] - test_envelopes = { - test.get_test_id(): [] - for test in tests - } - - for test in tests: - for envelope in test.get_envelopes(): - envelope = [str(x) for x in envelope] - envelope += ['--relay', '127.0.0.1:%s' % relayer_port] - print(repr(envelope)) - emailtunnel.send.main(*envelope, body='Hej') - - logging.debug("Sleep for a bit...") - time.sleep(1) - logging.debug("%s envelopes" % len(envelopes)) - - for envelope in envelopes: - try: - header = envelope.message.get_unique_header('X-test-id') - except KeyError: - logging.error("Envelope without X-test-id") - continue - test_envelopes[header].append(envelope) - - for i, test in enumerate(tests): - for envelope in test_envelopes[test.get_test_id()]: - received_objects = envelope.message.get_all_headers('Received') - received = [str(o) for o in received_objects] - print(repr(received)) - try: - test_id = test.get_test_id() - e = test_envelopes[test_id] - if not e: - raise AssertionError("No envelopes for test id %r" % test_id) - test.check_envelopes(e) - except AssertionError as e: - logging.error("Test %s failed: %s" % (i, e)) - else: - logging.info("Test %s succeeded" % i) - - logging.info("tkmail.test finished; you may Ctrl-C") - - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - main()