diff --git a/analytics/src/analytics/cli.py b/analytics/src/analytics/cli.py index f589818d6..d40bc5a9f 100644 --- a/analytics/src/analytics/cli.py +++ b/analytics/src/analytics/cli.py @@ -308,5 +308,5 @@ def transform_and_load( @etl_app.command(name="opportunity-load") def load_opportunity_data() -> None: - """Grabs data from s3 bucket and loads it into opportunity tables""" + """Grabs data from s3 bucket and loads it into opportunity tables.""" extract_copy_opportunity_data() diff --git a/analytics/src/analytics/integrations/etldb/etldb.py b/analytics/src/analytics/integrations/etldb/etldb.py index 0626de00f..1ccc3efe4 100644 --- a/analytics/src/analytics/integrations/etldb/etldb.py +++ b/analytics/src/analytics/integrations/etldb/etldb.py @@ -2,8 +2,6 @@ import logging from enum import Enum -from functools import wraps -from typing import Callable, Any from sqlalchemy import Connection, text diff --git a/analytics/src/analytics/integrations/extracts/__init__.py b/analytics/src/analytics/integrations/extracts/__init__.py index 4b677f8bf..9611aa0a5 100644 --- a/analytics/src/analytics/integrations/extracts/__init__.py +++ b/analytics/src/analytics/integrations/extracts/__init__.py @@ -1 +1,6 @@ -"""We use this package to extract CSV files from S3 bucket and loads them into respective Opportunity Tables""" +# pylint: disable=line-too-long +""" +We use this package to extract CSV files +from S3 bucket and loads them into respective +Opportunity Tables. +""" diff --git a/analytics/src/analytics/integrations/extracts/constants.py b/analytics/src/analytics/integrations/extracts/constants.py index 9bbb25378..7ffcbb12a 100644 --- a/analytics/src/analytics/integrations/extracts/constants.py +++ b/analytics/src/analytics/integrations/extracts/constants.py @@ -4,6 +4,8 @@ class OpportunityTables(StrEnum): + """Opportunity tables that are copied over to analytics database.""" + LK_OPPORTUNITY_STATUS = "lk_opportunity_status" LK_OPPORTUNITY_CATEGORY = "lk_opportunity_category" OPPORTUNITY = "opportunity" diff --git a/analytics/src/analytics/integrations/extracts/load_opportunity_data.py b/analytics/src/analytics/integrations/extracts/load_opportunity_data.py index 4c16e751d..5493c70e7 100644 --- a/analytics/src/analytics/integrations/extracts/load_opportunity_data.py +++ b/analytics/src/analytics/integrations/extracts/load_opportunity_data.py @@ -1,38 +1,44 @@ +# pylint: disable=invalid-name, line-too-long +"""Loads opportunity tables with opportunity data from S3.""" + import logging import os -from io import BytesIO -from urllib.parse import urlparse -import smart_open +from contextlib import ExitStack +import smart_open from analytics.integrations.etldb.etldb import EtlDb from analytics.integrations.extracts.constants import ( MAP_TABLES_TO_COLS, OpportunityTables, ) -from analytics.integrations.extracts.s3_config import S3Config, get_s3_client +from analytics.integrations.extracts.s3_config import S3Config logger = logging.getLogger(__name__) - def extract_copy_opportunity_data() -> None: - """Instantiate Etldb class and calls _fetch_insert_opportunity_data with database connection object """ + """ + Instantiate Etldb class and + calls _fetch_insert_opportunity_data with database connection object. + """ etldb_conn = EtlDb() _fetch_insert_opportunity_data(etldb_conn.connection()) logger.info("Extract opportunity data completed successfully") - -def _fetch_insert_opportunity_data(conn: EtlDb.connection ) -> None: - """Streamlines opportunity tables from S3 and insert into corresponding tables in the database.""" +def _fetch_insert_opportunity_data(conn: EtlDb.connection) -> None: + """ + Streamlines opportunity tables from S3 and + insert into corresponding tables in the database. + """ s3_config = S3Config() with conn.begin(): cursor = conn.connection.cursor() for table in OpportunityTables: - logger.info(f"Copying data for table: {table}") + logger.info("Copying data for table: %s", table) columns = MAP_TABLES_TO_COLS.get(table, []) s3_uri = f"s3://{s3_config.s3_opportunity_bucket}/{s3_config.s3_opportunity_file_path_prefix}/{table}.csv" @@ -40,10 +46,12 @@ def _fetch_insert_opportunity_data(conn: EtlDb.connection ) -> None: COPY {f"{os.getenv("DB_SCHEMA")}.{table} ({', '.join(columns)})"} FROM STDIN WITH (FORMAT CSV, DELIMITER ',', QUOTE '"', HEADER) """ - # Open the S3 object for reading - with smart_open.open(s3_uri, 'r') as file: - with cursor.copy(query) as copy: - while data := file.read(): - copy.write(data) - logger.info(f"Successfully loaded data for table: {table}") + with ExitStack() as stack: + file = stack.enter_context(smart_open.open(s3_uri, "r")) + copy = stack.enter_context(cursor.copy(query)) + + while data := file.read(): + copy.write(data) + + logger.info("Successfully loaded data for table: %S", table) diff --git a/analytics/src/analytics/integrations/extracts/s3_config.py b/analytics/src/analytics/integrations/extracts/s3_config.py index 15e9291ae..8f2b8266c 100644 --- a/analytics/src/analytics/integrations/extracts/s3_config.py +++ b/analytics/src/analytics/integrations/extracts/s3_config.py @@ -1,3 +1,5 @@ +"""Configuration for S3.""" + import boto3 import botocore from pydantic import Field @@ -5,7 +7,7 @@ class S3Config(BaseSettings): - """Configuration for S3""" + """Configure S3 properties.""" s3_opportunity_bucket: str | None = Field(None, env="S3_OPPORTUNITY_BUCKET") s3_opportunity_file_path_prefix: str | None = Field( @@ -19,6 +21,7 @@ def get_s3_client( session: boto3.Session | None = None, boto_config: botocore.config.Config | None = None, ) -> botocore.client.BaseClient: + """Instantiate S3Config class if not passed and return an S3 client.""" if s3_config is None: S3Config() diff --git a/analytics/tests/conftest.py b/analytics/tests/conftest.py index e5b71952b..0d26e0e04 100644 --- a/analytics/tests/conftest.py +++ b/analytics/tests/conftest.py @@ -273,8 +273,10 @@ def issue( # pylint: disable=too-many-locals @pytest.fixture def reset_aws_env_vars(monkeypatch): - # Reset the env vars so you can't accidentally connect - # to a real AWS account if you were doing some local testing + """ + Reset the aws env vars so you can't accidentally connect + to a real AWS account if you were doing some local testing. + """ monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing") monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing") monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing") @@ -283,26 +285,32 @@ def reset_aws_env_vars(monkeypatch): @pytest.fixture -def mock_s3(reset_aws_env_vars): +def mock_s3(reset_aws_env_vars: monkeypatch) -> boto3.resource: + """Instantiate an S3 bucket resource.""" # https://docs.getmoto.org/en/stable/docs/configuration/index.html#whitelist-services with moto.mock_aws(config={"core": {"service_whitelist": ["s3"]}}): yield boto3.resource("s3") + @pytest.fixture -def mock_s3_bucket_resource(mock_s3): +def mock_s3_bucket_resource( + mock_s3: boto3.resource, +) -> boto3.resources.factory.s3.Bucket: + """Create and return a mock S3 bucket resource.""" bucket = mock_s3.Bucket("test_bucket") bucket.create() - yield bucket + return bucket -@pytest.fixture -def mock_s3_bucket(mock_s3_bucket_resource): - yield mock_s3_bucket_resource.name +@pytest.fixture +def mock_s3_bucket(mock_s3_bucket_resource: boto3.resources.factory.s3.Bucket) -> str: + """Returns name of mock S3 bucket.""" + return mock_s3_bucket_resource.name # From https://github.com/pytest-dev/pytest/issues/363 @pytest.fixture(scope="session") -def monkeypatch_session(): +def monkeypatch_session() -> monkeypatch: """ Create a monkeypatch instance that can be used to monkeypatch global environment, objects, and attributes @@ -312,19 +320,20 @@ def monkeypatch_session(): yield mpatch mpatch.undo() + @pytest.fixture(scope="session") -def test_schema(): +def test_schema() -> str: + """Create a unique test schema.""" return f"test_schema_{uuid.uuid4().int}" @pytest.fixture(scope="session") def create_test_db(monkeypatch_session, test_schema) -> EtlDb: """ - Creates a temporary PostgreSQL schema and creates a database engine + Create a temporary PostgreSQL schema and create a database engine that connects to that schema. Drops the schema after the context manager exits. """ - etldb_conn = EtlDb() with etldb_conn.connection() as conn: @@ -338,6 +347,7 @@ def create_test_db(monkeypatch_session, test_schema) -> EtlDb: finally: _drop_schema(conn, test_schema) + def _create_schema(conn: EtlDb.connection, schema: str): """Create a database schema.""" db_test_user = "app" diff --git a/analytics/tests/integrations/extracts/test_load_opportunity_data.py b/analytics/tests/integrations/extracts/test_load_opportunity_data.py index 4195c05a9..69a45633f 100644 --- a/analytics/tests/integrations/extracts/test_load_opportunity_data.py +++ b/analytics/tests/integrations/extracts/test_load_opportunity_data.py @@ -10,7 +10,6 @@ ) from sqlalchemy import text - test_folder_path = ( pathlib.Path(__file__).parent.resolve() / "opportunity_tables_test_files" ) @@ -18,7 +17,11 @@ ### Uploads test files @pytest.fixture -def upload_opportunity_tables_s3(monkeypatch_session,mock_s3_bucket, mock_s3_bucket_resource): +def upload_opportunity_tables_s3( + monkeypatch_session, + mock_s3_bucket, + mock_s3_bucket_resource, +): monkeypatch_session.setenv("S3_OPPORTUNITY_BUCKET", mock_s3_bucket) @@ -34,16 +37,16 @@ def upload_opportunity_tables_s3(monkeypatch_session,mock_s3_bucket, mock_s3_buc mock_s3_bucket_resource.upload_fileobj(data, object_key) s3_files = list(mock_s3_bucket_resource.objects.all()) - yield len(s3_files) + return len(s3_files) + def test_extract_copy_opportunity_data( create_test_db: EtlDb, upload_opportunity_tables_s3, monkeypatch_session, - test_schema + test_schema, ): """Should upload all test files to mock s3 and have all records inserted into test database schema.""" - monkeypatch_session.setenv("DB_SCHEMA", test_schema) extract_copy_opportunity_data() @@ -52,16 +55,16 @@ def test_extract_copy_opportunity_data( # Verify that the data was inserted into the database with conn.begin(): lk_opp_sts_result = conn.execute( - text(f"SELECT COUNT(*) FROM lk_opportunity_status ;"), + text("SELECT COUNT(*) FROM lk_opportunity_status ;"), ) lk_opp_ctgry_result = conn.execute( - text(f"SELECT COUNT(*) FROM lk_opportunity_category ;"), + text("SELECT COUNT(*) FROM lk_opportunity_category ;"), ) opp_result = conn.execute( - text(f"SELECT COUNT(*) FROM opportunity ;"), + text("SELECT COUNT(*) FROM opportunity ;"), ) opp_smry_result = conn.execute( - text(f"SELECT COUNT(*) FROM opportunity_summary ;"), + text("SELECT COUNT(*) FROM opportunity_summary ;"), ) curr_opp_smry_result = conn.execute(