From a6bc2f5ecf556e6c491804fb272e70f6cb672047 Mon Sep 17 00:00:00 2001 From: Marek Dobransky Date: Fri, 17 Jan 2025 11:44:15 +0100 Subject: [PATCH 1/2] refactoring prep --- rialto/runner/config_loader.py | 1 + rialto/runner/mailer.py | 237 +++++++++++++++++++++++ rialto/runner/record.py | 33 ++++ rialto/runner/runner.py | 7 +- rialto/runner/tracker.py | 237 ++--------------------- tests/runner/test_runner.py | 18 +- tests/runner/transformations/config.yaml | 1 + 7 files changed, 304 insertions(+), 230 deletions(-) create mode 100644 rialto/runner/mailer.py create mode 100644 rialto/runner/record.py diff --git a/rialto/runner/config_loader.py b/rialto/runner/config_loader.py index 86c142d..5504e16 100644 --- a/rialto/runner/config_loader.py +++ b/rialto/runner/config_loader.py @@ -59,6 +59,7 @@ class RunnerConfig(BaseModel): watched_period_units: str watched_period_value: int mail: MailConfig + bookkeeping: Optional[str] = None class TargetConfig(BaseModel): diff --git a/rialto/runner/mailer.py b/rialto/runner/mailer.py new file mode 100644 index 0000000..4616053 --- /dev/null +++ b/rialto/runner/mailer.py @@ -0,0 +1,237 @@ +# Copyright 2022 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import smtplib +from datetime import datetime +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from typing import List + +from rialto.runner.record import Record + +__all__ = ["Mailer"] + + +class HTMLMessage: + bck_colors = ["#00ded6", "#acfcfa"] + borderless_table = 'role="presentation" style="border:0;border-spacing:0;"' + bordered_table = ( + 'role="presentation" style="background-repeat:no-repeat; margin:0;" cellpadding="1" cellspacing="1" border="1""' + ) + + @staticmethod + def _get_status_color(status: str): + if status == "Success": + return "#398f00" + elif status == "Error": + return "#ff0000" + else: + return "#ff8800" + + @staticmethod + def _make_rows(rows): + html = "" + data_options = 'align="center"' + for row, i in zip(rows, range(len(rows))): + r = f""" + + {row.job} + {row.target.split('.')[0]}.
+ {row.target.split('.')[1]}.
+ {row.target.split('.')[2]} + + {row.date} + {str(row.time).split(".")[0]} + {f'{row.records:,}'} + + {row.status} + + {row.reason} + + """ + html += r + return html + + @staticmethod + def _make_overview_header(): + return """ + + Job + Target + Date + Time elapsed + Rows created + Status + Reason + + """ + + @staticmethod + def _make_header(start: datetime): + return f""" +
+ + + + + + + +

This is is Rialto Feature Runner report

+ Jobs started {str(start).split('.')[0]} +
+
+ """ + + @staticmethod + def _make_overview(records: List[Record]): + return f""" + + + + +

Overview

+ + {HTMLMessage._make_overview_header()} + {HTMLMessage._make_rows(records)} +
+ """ + + @staticmethod + def _head(): + return """ + + + + + + + + + + + """ + + @staticmethod + def _body_open(): + return """ + +
+
+ """ + + @staticmethod + def _body_close(): + return """ +
+
+ + """ + + @staticmethod + def _make_exceptions(records: List[Record]): + html = "" + for record, i in zip(records, range(len(records))): + if record.exception is not None: + r = f""" + + + + + +
{record.job}{record.date}
+ + Expand +
+ + + + +
{record.exception}
+
+ """ + html += r + return html + + @staticmethod + def _make_insights(records: List[Record]): + return f""" + + + + +

Exceptions

+ {HTMLMessage._make_exceptions(records)} + """ + + @staticmethod + def make_report(start: datetime, records: List[Record]) -> str: + """Create html email report""" + html = [ + """ + """, + HTMLMessage._head(), + HTMLMessage._body_open(), + HTMLMessage._make_header(start), + HTMLMessage._make_overview(records), + HTMLMessage._make_insights(records), + HTMLMessage._body_close(), + ] + return "\n".join(html) + + +class Mailer: + """Send email reports""" + + @staticmethod + def create_message(subject: str, sender: str, receiver: str, body: str) -> MIMEMultipart: + """ + Create email message + + :param subject: email subject + :param sender: email sender + :param receiver: email receiver + :param body: email body + """ + msg = MIMEMultipart() + msg["Subject"] = subject + msg["From"] = sender + msg["To"] = receiver + body = MIMEText(body, "html") + msg.attach(body) + return msg + + @staticmethod + def send_mail(smtp: str, message: MIMEMultipart): + """ + Send email + + :param smtp: smtp server + :param message: email message + """ + s = smtplib.SMTP(host=smtp, port=25) + s.sendmail(from_addr=message["From"], to_addrs=message["To"], msg=message.as_string()) + s.quit() diff --git a/rialto/runner/record.py b/rialto/runner/record.py new file mode 100644 index 0000000..cbfa1f9 --- /dev/null +++ b/rialto/runner/record.py @@ -0,0 +1,33 @@ +# Copyright 2022 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = ["Record"] + +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Optional + + +@dataclass +class Record: + """Dataclass with information about one run of one pipeline.""" + + job: str + target: str + date: datetime.date + time: timedelta + records: int + status: str + reason: str + exception: Optional[str] = None diff --git a/rialto/runner/runner.py b/rialto/runner/runner.py index 49280b6..979ee9e 100644 --- a/rialto/runner/runner.py +++ b/rialto/runner/runner.py @@ -26,8 +26,9 @@ from rialto.common import TableReader from rialto.runner.config_loader import PipelineConfig, get_pipelines_config from rialto.runner.date_manager import DateManager +from rialto.runner.record import Record from rialto.runner.table import Table -from rialto.runner.tracker import Record, Tracker +from rialto.runner.tracker import Tracker from rialto.runner.transformation import Transformation @@ -50,7 +51,7 @@ def __init__( self.rerun = rerun self.skip_dependencies = skip_dependencies self.op = op - self.tracker = Tracker() + self.tracker = Tracker(mail_cfg=self.config.runner.mail, bookkeeping=self.config.runner.bookkeeping) if run_date: run_date = DateManager.str_to_date(run_date) @@ -297,5 +298,5 @@ def __call__(self): self._run_pipeline(pipeline) finally: print(self.tracker.records) - self.tracker.report(self.config.runner.mail) + self.tracker.report_by_mail() logger.info("Execution finished") diff --git a/rialto/runner/tracker.py b/rialto/runner/tracker.py index 57a24e6..e0943a5 100644 --- a/rialto/runner/tracker.py +++ b/rialto/runner/tracker.py @@ -12,249 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ["Record", "Tracker"] +__all__ = ["Tracker"] -import smtplib -from dataclasses import dataclass -from datetime import datetime, timedelta -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from typing import List, Optional +from datetime import datetime from rialto.runner.config_loader import MailConfig - - -@dataclass -class Record: - """Dataclass with information about one run of one pipeline.""" - - job: str - target: str - date: datetime.date - time: timedelta - records: int - status: str - reason: str - exception: Optional[str] = None +from rialto.runner.mailer import HTMLMessage, Mailer +from rialto.runner.record import Record class Tracker: """Collect information about runs and sent them out via email""" - def __init__(self): + def __init__(self, mail_cfg: MailConfig, bookkeeping: str = None): self.records = [] self.last_error = None self.pipeline_start = datetime.now() self.exceptions = [] + self.mail_cfg = mail_cfg + self.bookkeeping = bookkeeping def add(self, record: Record) -> None: """Add record for one run""" self.records.append(record) - def report(self, mail_cfg: MailConfig): + def report_by_mail(self): """Create and send html report""" - if len(self.records) or mail_cfg.sent_empty: + if len(self.records) or self.mail_cfg.sent_empty: report = HTMLMessage.make_report(self.pipeline_start, self.records) - for receiver in mail_cfg.to: + for receiver in self.mail_cfg.to: message = Mailer.create_message( - subject=mail_cfg.subject, sender=mail_cfg.sender, receiver=receiver, body=report + subject=self.mail_cfg.subject, sender=self.mail_cfg.sender, receiver=receiver, body=report ) - Mailer.send_mail(mail_cfg.smtp, message) - - -class HTMLMessage: - bck_colors = ["#00ded6", "#acfcfa"] - borderless_table = 'role="presentation" style="border:0;border-spacing:0;"' - bordered_table = ( - 'role="presentation" style="background-repeat:no-repeat; margin:0;" cellpadding="1" cellspacing="1" border="1""' - ) - - @staticmethod - def _get_status_color(status: str): - if status == "Success": - return "#398f00" - elif status == "Error": - return "#ff0000" - else: - return "#ff8800" - - @staticmethod - def _make_rows(rows): - html = "" - data_options = 'align="center"' - for row, i in zip(rows, range(len(rows))): - r = f""" - - {row.job} - {row.target.split('.')[0]}.
- {row.target.split('.')[1]}.
- {row.target.split('.')[2]} - - {row.date} - {str(row.time).split(".")[0]} - {f'{row.records:,}'} - - {row.status} - - {row.reason} - - """ - html += r - return html - - @staticmethod - def _make_overview_header(): - return """ - - Job - Target - Date - Time elapsed - Rows created - Status - Reason - - """ - - @staticmethod - def _make_header(start: datetime): - return f""" -
- - - - - - - -

This is is Rialto Feature Runner report

- Jobs started {str(start).split('.')[0]} -
-
- """ - - @staticmethod - def _make_overview(records: List[Record]): - return f""" - - - - -

Overview

- - {HTMLMessage._make_overview_header()} - {HTMLMessage._make_rows(records)} -
- """ - - @staticmethod - def _head(): - return """ - - - - - - - - - - - """ - - @staticmethod - def _body_open(): - return """ - -
-
- """ - - @staticmethod - def _body_close(): - return """ -
-
- - """ - - @staticmethod - def _make_exceptions(records: List[Record]): - html = "" - for record, i in zip(records, range(len(records))): - if record.exception is not None: - r = f""" - - - - - -
{record.job}{record.date}
- - Expand -
- - - - -
{record.exception}
-
- """ - html += r - return html - - @staticmethod - def _make_insights(records: List[Record]): - return f""" - - - - -

Exceptions

- {HTMLMessage._make_exceptions(records)} - """ - - @staticmethod - def make_report(start: datetime, records: List[Record]) -> str: - """Create html email report""" - html = [ - """ - """, - HTMLMessage._head(), - HTMLMessage._body_open(), - HTMLMessage._make_header(start), - HTMLMessage._make_overview(records), - HTMLMessage._make_insights(records), - HTMLMessage._body_close(), - ] - return "\n".join(html) - - -class Mailer: - @staticmethod - def create_message(subject: str, sender: str, receiver: str, body: str) -> MIMEMultipart: - msg = MIMEMultipart() - msg["Subject"] = subject - msg["From"] = sender - msg["To"] = receiver - body = MIMEText(body, "html") - msg.attach(body) - return msg - - @staticmethod - def send_mail(smtp: str, message: MIMEMultipart): - s = smtplib.SMTP(host=smtp, port=25) - s.sendmail(from_addr=message["From"], to_addrs=message["To"], msg=message.as_string()) - s.quit() + Mailer.send_mail(self.mail_cfg.smtp, message) diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index e23eee8..875215b 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -256,7 +256,7 @@ def test_select_dates_all_done(spark, mocker): def test_op_selected(spark, mocker): - mocker.patch("rialto.runner.tracker.Tracker.report") + mocker.patch("rialto.runner.tracker.Tracker.report_by_mail") run = mocker.patch("rialto.runner.runner.Runner._run_pipeline") runner = Runner(spark, config_path="tests/runner/transformations/config.yaml", op="SimpleGroup") @@ -266,7 +266,7 @@ def test_op_selected(spark, mocker): def test_op_bad(spark, mocker): - mocker.patch("rialto.runner.tracker.Tracker.report") + mocker.patch("rialto.runner.tracker.Tracker.report_by_mail") mocker.patch("rialto.runner.runner.Runner._run_pipeline") runner = Runner(spark, config_path="tests/runner/transformations/config.yaml", op="BadOp") @@ -274,3 +274,17 @@ def test_op_bad(spark, mocker): with pytest.raises(ValueError) as exception: runner() assert str(exception.value) == "Unknown operation selected: BadOp" + + +def test_bookkeeping_active(spark, mocker): + mocker.patch("rialto.runner.runner.Runner._run_pipeline") + + runner = Runner(spark, config_path="tests/runner/transformations/config.yaml") + assert runner.config.runner.bookkeeping == "some.test.location" + + +def test_bookkeeping_inactive(spark, mocker): + mocker.patch("rialto.runner.runner.Runner._run_pipeline") + + runner = Runner(spark, config_path="tests/runner/transformations/config2.yaml") + assert runner.config.runner.bookkeeping is None diff --git a/tests/runner/transformations/config.yaml b/tests/runner/transformations/config.yaml index 3b72107..11f9fc9 100644 --- a/tests/runner/transformations/config.yaml +++ b/tests/runner/transformations/config.yaml @@ -22,6 +22,7 @@ runner: - developer@testing.org - developer2@testing.org subject: test report + bookkeeping: some.test.location pipelines: - name: SimpleGroup module: From 57d44a8397d838ef8380ad35beeebd54383d548c Mon Sep 17 00:00:00 2001 From: Marek Dobransky Date: Wed, 22 Jan 2025 13:44:09 +0100 Subject: [PATCH 2/2] bookkeeping --- rialto/runner/bookkeeper.py | 47 ++++++++++++++++++++++++++++++++ rialto/runner/record.py | 31 ++++++++++++++++++++- rialto/runner/runner.py | 4 ++- rialto/runner/tracker.py | 12 ++++++-- tests/runner/test_bookkeeping.py | 26 ++++++++++++++++++ 5 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 rialto/runner/bookkeeper.py create mode 100644 tests/runner/test_bookkeeping.py diff --git a/rialto/runner/bookkeeper.py b/rialto/runner/bookkeeper.py new file mode 100644 index 0000000..93012f3 --- /dev/null +++ b/rialto/runner/bookkeeper.py @@ -0,0 +1,47 @@ +# Copyright 2022 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyspark.sql import DataFrame, SparkSession + +from rialto.runner.record import Record + + +class BookKeeper: + """Class to store and update records of runs in a table in the spark catalog.""" + + def __init__(self, table: str, spark: SparkSession): + self.spark = spark + self.table = table + + def _load(self) -> DataFrame | None: + if self.spark.catalog.tableExists(self.table): + return self.spark.read.table(self.table) + else: + return None + + def _write(self, df: DataFrame) -> None: + df.write.mode("overwrite").saveAsTable(self.table) + + def add(self, record: Record) -> None: + """ + Add a record to the table. + + :param record: Record to add to the table. + """ + new = self.spark.createDataFrame([record.to_spark_row()], Record.schema) + db = self._load() + if db: + db = db.unionByName(new) + self._write(db) + else: + self._write(new) diff --git a/rialto/runner/record.py b/rialto/runner/record.py index cbfa1f9..181963f 100644 --- a/rialto/runner/record.py +++ b/rialto/runner/record.py @@ -16,7 +16,10 @@ from dataclasses import dataclass from datetime import datetime, timedelta -from typing import Optional +from typing import ClassVar, Optional + +from pyspark.sql import Row +from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType @dataclass @@ -31,3 +34,29 @@ class Record: status: str reason: str exception: Optional[str] = None + + schema: ClassVar[StructType] = StructType( + [ + StructField("job", StringType(), nullable=False), + StructField("target", StringType(), nullable=False), + StructField("date", DateType(), nullable=False), + StructField("time", StringType(), nullable=False), + StructField("records", IntegerType(), nullable=False), + StructField("status", StringType(), nullable=False), + StructField("reason", StringType(), nullable=False), + StructField("exception", StringType(), nullable=True), + ] + ) + + def to_spark_row(self) -> Row: + """Convert Record to Spark Row""" + return Row( + job=self.job, + target=self.target, + date=self.date, + time=str(self.time), + records=self.records, + status=self.status, + reason=self.reason, + exception=self.exception, + ) diff --git a/rialto/runner/runner.py b/rialto/runner/runner.py index 979ee9e..6541b1b 100644 --- a/rialto/runner/runner.py +++ b/rialto/runner/runner.py @@ -51,7 +51,9 @@ def __init__( self.rerun = rerun self.skip_dependencies = skip_dependencies self.op = op - self.tracker = Tracker(mail_cfg=self.config.runner.mail, bookkeeping=self.config.runner.bookkeeping) + self.tracker = Tracker( + mail_cfg=self.config.runner.mail, bookkeeping=self.config.runner.bookkeeping, spark=spark + ) if run_date: run_date = DateManager.str_to_date(run_date) diff --git a/rialto/runner/tracker.py b/rialto/runner/tracker.py index e0943a5..3032788 100644 --- a/rialto/runner/tracker.py +++ b/rialto/runner/tracker.py @@ -16,6 +16,9 @@ from datetime import datetime +from pyspark.sql import SparkSession + +from rialto.runner.bookkeeper import BookKeeper from rialto.runner.config_loader import MailConfig from rialto.runner.mailer import HTMLMessage, Mailer from rialto.runner.record import Record @@ -24,17 +27,22 @@ class Tracker: """Collect information about runs and sent them out via email""" - def __init__(self, mail_cfg: MailConfig, bookkeeping: str = None): + def __init__(self, mail_cfg: MailConfig, bookkeeping: str = None, spark: SparkSession = None): self.records = [] self.last_error = None self.pipeline_start = datetime.now() self.exceptions = [] self.mail_cfg = mail_cfg - self.bookkeeping = bookkeeping + self.bookkeeper = None + + if bookkeeping: + self.bookkeeper = BookKeeper(table=bookkeeping, spark=spark) def add(self, record: Record) -> None: """Add record for one run""" self.records.append(record) + if self.bookkeeper: + self.bookkeeper.add(record) def report_by_mail(self): """Create and send html report""" diff --git a/tests/runner/test_bookkeeping.py b/tests/runner/test_bookkeeping.py new file mode 100644 index 0000000..260c7e6 --- /dev/null +++ b/tests/runner/test_bookkeeping.py @@ -0,0 +1,26 @@ +from datetime import timedelta + +from rialto.runner.date_manager import DateManager +from rialto.runner.record import Record + +record = Record( + "job", + "target", + DateManager.str_to_date("2024-01-01"), + timedelta(days=0, hours=1, minutes=2, seconds=3), + 1, + "status", + "reason", +) + + +def test_record_to_spark(spark): + row = record.to_spark_row() + assert row.job == "job" + assert row.target == "target" + assert row.date == DateManager.str_to_date("2024-01-01") + assert row.time == "1:02:03" + assert row.records == 1 + assert row.status == "status" + assert row.reason == "reason" + assert row.exception is None