Skip to content

Commit

Permalink
Merge branch 'main' into issue-2130-update-analytics-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidDudas-Intuitial authored Jan 17, 2025
2 parents 7e1b372 + abd1111 commit 9e86430
Show file tree
Hide file tree
Showing 20 changed files with 758 additions and 100 deletions.
23 changes: 15 additions & 8 deletions api/bin/setup_localstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import src.logging
from src.adapters.aws import S3Config, get_s3_client
from src.util import file_util
from src.util.local import error_if_not_local

logger = logging.getLogger(__name__)
Expand All @@ -26,6 +27,14 @@ def does_s3_bucket_exist(s3_client: botocore.client.BaseClient, bucket_name: str
return False


def create_bucket_if_not_exists(s3_client: botocore.client.BaseClient, bucket_name: str) -> None:
if not does_s3_bucket_exist(s3_client, bucket_name):
logger.info("Creating S3 bucket %s", bucket_name)
s3_client.create_bucket(Bucket=bucket_name)
else:
logger.info("S3 bucket %s already exists - skipping creation", bucket_name)


def setup_s3() -> None:
s3_config = S3Config()
# This is only used locally - to avoid any accidental running of commands here
Expand All @@ -35,14 +44,12 @@ def setup_s3() -> None:
s3_config, boto3.Session(aws_access_key_id="NO_CREDS", aws_secret_access_key="NO_CREDS")
)

if s3_config.s3_opportunity_bucket is None:
raise Exception("S3_OPPORTUNITY_BUCKET env var must be set")

if not does_s3_bucket_exist(s3_client, s3_config.s3_opportunity_bucket):
logger.info("Creating S3 bucket %s", s3_config.s3_opportunity_bucket)
s3_client.create_bucket(Bucket=s3_config.s3_opportunity_bucket)
else:
logger.info("S3 bucket %s already exists - skipping", s3_config.s3_opportunity_bucket)
create_bucket_if_not_exists(
s3_client, file_util.get_s3_bucket(s3_config.public_files_bucket_path)
)
create_bucket_if_not_exists(
s3_client, file_util.get_s3_bucket(s3_config.draft_files_bucket_path)
)


def main() -> None:
Expand Down
4 changes: 3 additions & 1 deletion api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ S3_ENDPOINT_URL=http://localstack:4566
# S3
############################

S3_OPPORTUNITY_BUCKET=local-opportunities
# Our terraform sets these as s3 paths, so include s3:// on the bucket name
PUBLIC_FILES_BUCKET=s3://local-mock-public-bucket
DRAFT_FILES_BUCKET=s3://local-mock-draft-bucket

# This env var is used to set local AWS credentials
IS_LOCAL_AWS=1
Expand Down
7 changes: 4 additions & 3 deletions api/src/adapters/aws/s3_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import boto3
import botocore.client
import botocore.config
from pydantic import Field

from src.util.env_config import PydanticBaseEnvConfig

Expand All @@ -16,9 +17,9 @@ class S3Config(PydanticBaseEnvConfig):
# so that we don't need to set all of these for every
# process that uses S3

# TODO - I'm not sure how we want to organize our
# s3 buckets so this will likely change in the future
s3_opportunity_bucket: str | None = None
# Note these env vars get set as "s3://..."
public_files_bucket_path: str = Field(alias="PUBLIC_FILES_BUCKET")
draft_files_bucket_path: str = Field(alias="DRAFT_FILES_BUCKET")


def get_s3_client(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import logging
from typing import Tuple
from typing import Tuple, cast

import src.data_migration.transformation.transform_constants as transform_constants
import src.data_migration.transformation.transform_util as transform_util
from src.adapters.aws import S3Config
from src.data_migration.transformation.subtask.abstract_transform_subtask import (
AbstractTransformSubTask,
)
from src.db.models.opportunity_models import Opportunity
from src.db.models.staging.opportunity import Topportunity
from src.services.opportunity_attachments import attachment_util
from src.task.task import Task
from src.util import file_util

logger = logging.getLogger(__name__)


class TransformOpportunity(AbstractTransformSubTask):

def __init__(self, task: Task, s3_config: S3Config | None = None):
super().__init__(task)

if s3_config is None:
s3_config = S3Config()

self.s3_config = s3_config

def transform_records(self) -> None:
# Fetch all opportunities that were modified
# Alongside that, grab the existing opportunity record
Expand Down Expand Up @@ -53,11 +66,18 @@ def process_opportunity(
extra,
)

# Cleanup the attachments from s3
if target_opportunity is not None:
for attachment in target_opportunity.opportunity_attachments:
file_util.delete_file(attachment.file_location)

else:
# To avoid incrementing metrics for records we fail to transform, record
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_opportunity is None

was_draft = target_opportunity.is_draft if target_opportunity else None

logger.info("Transforming and upserting opportunity", extra=extra)
transformed_opportunity = transform_util.transform_opportunity(
source_opportunity, target_opportunity
Expand All @@ -76,5 +96,23 @@ def process_opportunity(
)
self.db_session.merge(transformed_opportunity)

# If an opportunity went from being a draft to not a draft (published)
# then we need to move all of its attachments to the public bucket
# from the draft s3 bucket.
if was_draft and transformed_opportunity.is_draft is False:
for attachment in cast(Opportunity, target_opportunity).opportunity_attachments:
# Determine the new path
file_name = attachment_util.adjust_legacy_file_name(attachment.file_name)
s3_path = attachment_util.get_s3_attachment_path(
file_name,
attachment.attachment_id,
transformed_opportunity,
self.s3_config,
)

# Move the file
file_util.move_file(attachment.file_location, s3_path)
attachment.file_location = s3_path

logger.info("Processed opportunity", extra=extra)
source_opportunity.transformed_at = self.transform_time
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,30 @@

import src.data_migration.transformation.transform_constants as transform_constants
import src.data_migration.transformation.transform_util as transform_util
from src.adapters.aws import S3Config
from src.constants.lookup_constants import OpportunityAttachmentType
from src.data_migration.transformation.subtask.abstract_transform_subtask import (
AbstractTransformSubTask,
)
from src.db.models.opportunity_models import Opportunity, OpportunityAttachment
from src.db.models.staging.attachment import TsynopsisAttachment
from src.services.opportunity_attachments import attachment_util
from src.task.task import Task
from src.util import file_util

logger = logging.getLogger(__name__)


class TransformOpportunityAttachment(AbstractTransformSubTask):

def __init__(self, task: Task, s3_config: S3Config | None = None):
super().__init__(task)

if s3_config is None:
s3_config = S3Config()

self.s3_config = s3_config

def transform_records(self) -> None:

# Fetch staging attachment / our attachment / opportunity groups
Expand Down Expand Up @@ -63,16 +75,17 @@ def process_opportunity_attachment(
logger.info("Processing opportunity attachment", extra=extra)

if source_attachment.is_deleted:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/3322
# deletes are more complex because of s3
# this just handles deleting the DB record at the moment
self._handle_delete(
source=source_attachment,
target=target_attachment,
record_type=transform_constants.OPPORTUNITY_ATTACHMENT,
extra=extra,
)

# Delete the file from s3 as well
if target_attachment is not None:
file_util.delete_file(target_attachment.file_location)

elif opportunity is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
Expand All @@ -85,13 +98,29 @@ def process_opportunity_attachment(
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_attachment is None

prior_attachment_location = (
target_attachment.file_location if target_attachment else None
)

logger.info("Transforming and upserting opportunity attachment", extra=extra)

transformed_opportunity_attachment = transform_opportunity_attachment(
source_attachment, target_attachment
source_attachment, target_attachment, opportunity, self.s3_config
)

# TODO - we'll need to handle more with the s3 files here
# Write the file to s3
write_file(source_attachment, transformed_opportunity_attachment)

# If this was an update, and the file name changed
# Cleanup the old file from s3.
if (
prior_attachment_location is not None
and prior_attachment_location != transformed_opportunity_attachment.file_location
):
file_util.delete_file(prior_attachment_location)

logger.info("Transforming and upserting opportunity attachment", extra=extra)

if is_insert:
self.increment(
transform_constants.Metrics.TOTAL_RECORDS_INSERTED,
Expand All @@ -110,26 +139,38 @@ def process_opportunity_attachment(


def transform_opportunity_attachment(
source_attachment: TsynopsisAttachment, incoming_attachment: OpportunityAttachment | None
source_attachment: TsynopsisAttachment,
incoming_attachment: OpportunityAttachment | None,
opportunity: Opportunity,
s3_config: S3Config,
) -> OpportunityAttachment:

log_extra = transform_util.get_log_extra_opportunity_attachment(source_attachment)

if incoming_attachment is None:
logger.info("Creating new opportunity attachment record", extra=log_extra)

# Adjust the file_name to remove characters clunky in URLs
if source_attachment.file_name is None:
raise ValueError("Opportunity attachment does not have a file name, cannot process.")
file_name = attachment_util.adjust_legacy_file_name(source_attachment.file_name)

file_location = attachment_util.get_s3_attachment_path(
file_name, source_attachment.syn_att_id, opportunity, s3_config
)

# We always create a new record here and merge it in the calling function
# this way if there is any error doing the transformation, we don't modify the existing one.
target_attachment = OpportunityAttachment(
attachment_id=source_attachment.syn_att_id,
opportunity_id=source_attachment.opportunity_id,
# TODO - we'll eventually remove attachment type, for now just arbitrarily set the value
opportunity_attachment_type=OpportunityAttachmentType.OTHER,
# TODO - in https://github.com/HHS/simpler-grants-gov/issues/3322
# we'll actually handle the file location logic with s3
file_location="TODO", # TODO - next PR
# Note we calculate the file location here, but haven't yet done anything
# with s3, the calling function, will handle writing the file to s3.
file_location=file_location,
mime_type=source_attachment.mime_type,
file_name=source_attachment.file_name,
file_name=file_name,
file_description=source_attachment.file_desc,
file_size_bytes=source_attachment.file_lob_size,
created_by=source_attachment.creator_id,
Expand All @@ -142,3 +183,10 @@ def transform_opportunity_attachment(
)

return target_attachment


def write_file(
source_attachment: TsynopsisAttachment, destination_attachment: OpportunityAttachment
) -> None:
with file_util.open_stream(destination_attachment.file_location, "wb") as outfile:
outfile.write(source_attachment.file_lob)
37 changes: 21 additions & 16 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from enum import StrEnum
from typing import Iterator, Sequence

import smart_open
from opensearchpy.exceptions import ConnectionTimeout, TransportError
from pydantic import Field
from pydantic_settings import SettingsConfigDict
Expand All @@ -22,11 +21,16 @@
OpportunitySearchIndexQueue,
)
from src.task.task import Task
from src.util import file_util
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)

ALLOWED_ATTACHMENT_SUFFIXES = set(
["txt", "pdf", "docx", "doc", "xlsx", "xlsm", "html", "htm", "pptx", "ppt", "rtf"]
)


class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_")
Expand Down Expand Up @@ -97,6 +101,7 @@ def _create_multi_attachment_pipeline(self) -> None:
"field": "_ingest._value.data",
}
},
"ignore_missing": True,
}
}
],
Expand Down Expand Up @@ -275,28 +280,28 @@ def fetch_existing_opportunity_ids_in_index(self) -> set[int]:

return opportunity_ids

def filter_attachments(
self, attachments: list[OpportunityAttachment]
) -> list[OpportunityAttachment]:
return [attachment for attachment in attachments]
def filter_attachment(self, attachment: OpportunityAttachment) -> bool:
file_suffix = attachment.file_name.lower().split(".")[-1]
return file_suffix in ALLOWED_ATTACHMENT_SUFFIXES

def get_attachment_json_for_opportunity(
self, opp_attachments: list[OpportunityAttachment]
) -> list[dict]:

attachments = []
for att in opp_attachments:
with smart_open.open(
att.file_location,
"rb",
) as file:
file_content = file.read()
attachments.append(
{
"filename": att.file_name,
"data": base64.b64encode(file_content).decode("utf-8"),
}
)
if self.filter_attachment(att):
with file_util.open_stream(
att.file_location,
"rb",
) as file:
file_content = file.read()
attachments.append(
{
"filename": att.file_name,
"data": base64.b64encode(file_content).decode("utf-8"),
}
)

return attachments

Expand Down
Empty file.
Loading

0 comments on commit 9e86430

Please sign in to comment.