Skip to content

Commit

Permalink
adding doc strings
Browse files Browse the repository at this point in the history
  • Loading branch information
babebe committed Dec 16, 2024
1 parent 1c45c41 commit b12032e
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 42 deletions.
2 changes: 1 addition & 1 deletion analytics/src/analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,5 +308,5 @@ def transform_and_load(

@etl_app.command(name="opportunity-load")
def load_opportunity_data() -> None:
"""Grabs data from s3 bucket and loads it into opportunity tables"""
"""Grabs data from s3 bucket and loads it into opportunity tables."""
extract_copy_opportunity_data()
2 changes: 0 additions & 2 deletions analytics/src/analytics/integrations/etldb/etldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import logging
from enum import Enum
from functools import wraps
from typing import Callable, Any

from sqlalchemy import Connection, text

Expand Down
7 changes: 6 additions & 1 deletion analytics/src/analytics/integrations/extracts/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
"""We use this package to extract CSV files from S3 bucket and loads them into respective Opportunity Tables"""
# pylint: disable=line-too-long
"""
We use this package to extract CSV files
from S3 bucket and loads them into respective
Opportunity Tables.
"""
2 changes: 2 additions & 0 deletions analytics/src/analytics/integrations/extracts/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@


class OpportunityTables(StrEnum):
"""Opportunity tables that are copied over to analytics database."""

LK_OPPORTUNITY_STATUS = "lk_opportunity_status"
LK_OPPORTUNITY_CATEGORY = "lk_opportunity_category"
OPPORTUNITY = "opportunity"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,57 @@
# pylint: disable=invalid-name, line-too-long
"""Loads opportunity tables with opportunity data from S3."""

import logging
import os
from io import BytesIO
from urllib.parse import urlparse
import smart_open
from contextlib import ExitStack

import smart_open

from analytics.integrations.etldb.etldb import EtlDb
from analytics.integrations.extracts.constants import (
MAP_TABLES_TO_COLS,
OpportunityTables,
)
from analytics.integrations.extracts.s3_config import S3Config, get_s3_client
from analytics.integrations.extracts.s3_config import S3Config

logger = logging.getLogger(__name__)



def extract_copy_opportunity_data() -> None:
"""Instantiate Etldb class and calls _fetch_insert_opportunity_data with database connection object """
"""
Instantiate Etldb class and
calls _fetch_insert_opportunity_data with database connection object.
"""
etldb_conn = EtlDb()
_fetch_insert_opportunity_data(etldb_conn.connection())

logger.info("Extract opportunity data completed successfully")



def _fetch_insert_opportunity_data(conn: EtlDb.connection ) -> None:
"""Streamlines opportunity tables from S3 and insert into corresponding tables in the database."""
def _fetch_insert_opportunity_data(conn: EtlDb.connection) -> None:
"""
Streamlines opportunity tables from S3 and
insert into corresponding tables in the database.
"""
s3_config = S3Config()

with conn.begin():
cursor = conn.connection.cursor()
for table in OpportunityTables:
logger.info(f"Copying data for table: {table}")
logger.info("Copying data for table: %s", table)

columns = MAP_TABLES_TO_COLS.get(table, [])
s3_uri = f"s3://{s3_config.s3_opportunity_bucket}/{s3_config.s3_opportunity_file_path_prefix}/{table}.csv"
query = f"""
COPY {f"{os.getenv("DB_SCHEMA")}.{table} ({', '.join(columns)})"}
FROM STDIN WITH (FORMAT CSV, DELIMITER ',', QUOTE '"', HEADER)
"""
# Open the S3 object for reading
with smart_open.open(s3_uri, 'r') as file:
with cursor.copy(query) as copy:
while data := file.read():
copy.write(data)

logger.info(f"Successfully loaded data for table: {table}")
with ExitStack() as stack:
file = stack.enter_context(smart_open.open(s3_uri, "r"))
copy = stack.enter_context(cursor.copy(query))

while data := file.read():
copy.write(data)

logger.info("Successfully loaded data for table: %S", table)
5 changes: 4 additions & 1 deletion analytics/src/analytics/integrations/extracts/s3_config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Configuration for S3."""

import boto3
import botocore
from pydantic import Field
from pydantic_settings import BaseSettings


class S3Config(BaseSettings):
"""Configuration for S3"""
"""Configure S3 properties."""

s3_opportunity_bucket: str | None = Field(None, env="S3_OPPORTUNITY_BUCKET")
s3_opportunity_file_path_prefix: str | None = Field(
Expand All @@ -19,6 +21,7 @@ def get_s3_client(
session: boto3.Session | None = None,
boto_config: botocore.config.Config | None = None,
) -> botocore.client.BaseClient:
"""Instantiate S3Config class if not passed and return an S3 client."""
if s3_config is None:
S3Config()

Expand Down
34 changes: 22 additions & 12 deletions analytics/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@ def issue( # pylint: disable=too-many-locals

@pytest.fixture
def reset_aws_env_vars(monkeypatch):
# Reset the env vars so you can't accidentally connect
# to a real AWS account if you were doing some local testing
"""
Reset the aws env vars so you can't accidentally connect
to a real AWS account if you were doing some local testing.
"""
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
Expand All @@ -283,26 +285,32 @@ def reset_aws_env_vars(monkeypatch):


@pytest.fixture
def mock_s3(reset_aws_env_vars):
def mock_s3(reset_aws_env_vars: monkeypatch) -> boto3.resource:
"""Instantiate an S3 bucket resource."""
# https://docs.getmoto.org/en/stable/docs/configuration/index.html#whitelist-services
with moto.mock_aws(config={"core": {"service_whitelist": ["s3"]}}):
yield boto3.resource("s3")


@pytest.fixture
def mock_s3_bucket_resource(mock_s3):
def mock_s3_bucket_resource(
mock_s3: boto3.resource,
) -> boto3.resources.factory.s3.Bucket:
"""Create and return a mock S3 bucket resource."""
bucket = mock_s3.Bucket("test_bucket")
bucket.create()
yield bucket
return bucket

@pytest.fixture
def mock_s3_bucket(mock_s3_bucket_resource):
yield mock_s3_bucket_resource.name

@pytest.fixture
def mock_s3_bucket(mock_s3_bucket_resource: boto3.resources.factory.s3.Bucket) -> str:
"""Returns name of mock S3 bucket."""
return mock_s3_bucket_resource.name


# From https://github.com/pytest-dev/pytest/issues/363
@pytest.fixture(scope="session")
def monkeypatch_session():
def monkeypatch_session() -> monkeypatch:
"""
Create a monkeypatch instance that can be used to
monkeypatch global environment, objects, and attributes
Expand All @@ -312,19 +320,20 @@ def monkeypatch_session():
yield mpatch
mpatch.undo()


@pytest.fixture(scope="session")
def test_schema():
def test_schema() -> str:
"""Create a unique test schema."""
return f"test_schema_{uuid.uuid4().int}"


@pytest.fixture(scope="session")
def create_test_db(monkeypatch_session, test_schema) -> EtlDb:
"""
Creates a temporary PostgreSQL schema and creates a database engine
Create a temporary PostgreSQL schema and create a database engine
that connects to that schema. Drops the schema after the context manager
exits.
"""

etldb_conn = EtlDb()

with etldb_conn.connection() as conn:
Expand All @@ -338,6 +347,7 @@ def create_test_db(monkeypatch_session, test_schema) -> EtlDb:
finally:
_drop_schema(conn, test_schema)


def _create_schema(conn: EtlDb.connection, schema: str):
"""Create a database schema."""
db_test_user = "app"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
)
from sqlalchemy import text


test_folder_path = (
pathlib.Path(__file__).parent.resolve() / "opportunity_tables_test_files"
)


### Uploads test files
@pytest.fixture
def upload_opportunity_tables_s3(monkeypatch_session,mock_s3_bucket, mock_s3_bucket_resource):
def upload_opportunity_tables_s3(
monkeypatch_session,
mock_s3_bucket,
mock_s3_bucket_resource,
):

monkeypatch_session.setenv("S3_OPPORTUNITY_BUCKET", mock_s3_bucket)

Expand All @@ -34,16 +37,16 @@ def upload_opportunity_tables_s3(monkeypatch_session,mock_s3_bucket, mock_s3_buc
mock_s3_bucket_resource.upload_fileobj(data, object_key)

s3_files = list(mock_s3_bucket_resource.objects.all())
yield len(s3_files)
return len(s3_files)


def test_extract_copy_opportunity_data(
create_test_db: EtlDb,
upload_opportunity_tables_s3,
monkeypatch_session,
test_schema
test_schema,
):
"""Should upload all test files to mock s3 and have all records inserted into test database schema."""

monkeypatch_session.setenv("DB_SCHEMA", test_schema)

extract_copy_opportunity_data()
Expand All @@ -52,16 +55,16 @@ def test_extract_copy_opportunity_data(
# Verify that the data was inserted into the database
with conn.begin():
lk_opp_sts_result = conn.execute(
text(f"SELECT COUNT(*) FROM lk_opportunity_status ;"),
text("SELECT COUNT(*) FROM lk_opportunity_status ;"),
)
lk_opp_ctgry_result = conn.execute(
text(f"SELECT COUNT(*) FROM lk_opportunity_category ;"),
text("SELECT COUNT(*) FROM lk_opportunity_category ;"),
)
opp_result = conn.execute(
text(f"SELECT COUNT(*) FROM opportunity ;"),
text("SELECT COUNT(*) FROM opportunity ;"),
)
opp_smry_result = conn.execute(
text(f"SELECT COUNT(*) FROM opportunity_summary ;"),
text("SELECT COUNT(*) FROM opportunity_summary ;"),
)

curr_opp_smry_result = conn.execute(
Expand Down

0 comments on commit b12032e

Please sign in to comment.