Skip to content

Commit

Permalink
[Issue #4004] Fix transform problem with simulatenous delete + insert (
Browse files Browse the repository at this point in the history
…#4024)

## Summary
Fixes #4004

### Time to review: __15 mins__

## Changes proposed
Modify the transformations for the applicant type, funding category, and
funding instrument transformations to process all deletes separate from
inserts/updates

## Context for reviewers
This fixes a bug where if we received both a delete + insert for one of
these tables at the same time, we'd delete, but be unable to insert.
This happens anytime an opportunity summary is updated in the old system
due to how it handles versioning.

For a much more in-depth description of the bug, see
#4004 (comment)

By separating the deletes and inserts, we don't hit the weird
delete-not-yet-processed caching issue.
  • Loading branch information
chouinar authored Feb 27, 2025
1 parent 5ed0231 commit 1ac1a2c
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def run_subtask(self) -> None:

@abc.abstractmethod
def transform_records(self) -> None:
"""Abstract method implemented by derived, returns True when done processing"""
"""Abstract method implemented by derived classes"""
pass

def _handle_delete(
Expand Down Expand Up @@ -155,19 +155,18 @@ def fetch_with_opportunity_summary(
destination_model: Type[transform_constants.D],
join_clause: Sequence,
is_forecast: bool,
is_delete: bool,
relationship_load_value: Any,
) -> list[
Tuple[transform_constants.S, transform_constants.D | None, OpportunitySummary | None]
]:
# setup the join clause for getting the opportunity summary

opportunity_summary_join_clause = [
source_model.opportunity_id == OpportunitySummary.opportunity_id, # type: ignore[attr-defined]
OpportunitySummary.is_forecast.is_(is_forecast),
OpportunitySummary.revision_number.is_(None),
]

opportunity_summary_join_clause.append(OpportunitySummary.revision_number.is_(None))

return cast(
list[
Tuple[
Expand All @@ -179,6 +178,7 @@ def fetch_with_opportunity_summary(
.join(OpportunitySummary, and_(*opportunity_summary_join_clause), isouter=True)
.join(destination_model, and_(*join_clause), isouter=True)
.where(source_model.transformed_at.is_(None))
.where(source_model.is_deleted.is_(is_delete))
.options(selectinload(relationship_load_value))
.execution_options(yield_per=5000, populate_existing=True)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,23 @@ def transform_records(self) -> None:
link_table = LinkOpportunitySummaryApplicantType
relationship_load_value = OpportunitySummary.link_applicant_types

logger.info("Processing forecast applicant types")
logger.info("Processing deletes for forecast applicant types")
delete_forecast_applicant_type_records = self.fetch_with_opportunity_summary(
TapplicanttypesForecast,
link_table,
[
TapplicanttypesForecast.at_frcst_id
== LinkOpportunitySummaryApplicantType.legacy_applicant_type_id,
OpportunitySummary.opportunity_summary_id
== LinkOpportunitySummaryApplicantType.opportunity_summary_id,
],
is_forecast=True,
is_delete=True,
relationship_load_value=relationship_load_value,
)
self.process_link_applicant_types_group(delete_forecast_applicant_type_records)

logger.info("Processing inserts/updates for forecast applicant types")
forecast_applicant_type_records = self.fetch_with_opportunity_summary(
TapplicanttypesForecast,
link_table,
Expand All @@ -29,11 +45,28 @@ def transform_records(self) -> None:
== LinkOpportunitySummaryApplicantType.opportunity_summary_id,
],
is_forecast=True,
is_delete=False,
relationship_load_value=relationship_load_value,
)
self.process_link_applicant_types_group(forecast_applicant_type_records)

logger.info("Processing synopsis applicant types")
logger.info("Processing deletes for synopsis applicant types")
delete_synopsis_applicant_type_records = self.fetch_with_opportunity_summary(
TapplicanttypesSynopsis,
link_table,
[
TapplicanttypesSynopsis.at_syn_id
== LinkOpportunitySummaryApplicantType.legacy_applicant_type_id,
OpportunitySummary.opportunity_summary_id
== LinkOpportunitySummaryApplicantType.opportunity_summary_id,
],
is_forecast=False,
is_delete=True,
relationship_load_value=relationship_load_value,
)
self.process_link_applicant_types_group(delete_synopsis_applicant_type_records)

logger.info("Processing inserts/updates for synopsis applicant types")
synopsis_applicant_type_records = self.fetch_with_opportunity_summary(
TapplicanttypesSynopsis,
link_table,
Expand All @@ -44,6 +77,7 @@ def transform_records(self) -> None:
== LinkOpportunitySummaryApplicantType.opportunity_summary_id,
],
is_forecast=False,
is_delete=False,
relationship_load_value=relationship_load_value,
)
self.process_link_applicant_types_group(synopsis_applicant_type_records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,23 @@ def transform_records(self) -> None:
link_table = LinkOpportunitySummaryFundingCategory
relationship_load_value = OpportunitySummary.link_funding_categories

logger.info("Processing forecast funding categories")
logger.info("Processing deletes for forecast funding categories")
delete_forecast_funding_category_records = self.fetch_with_opportunity_summary(
TfundactcatForecast,
link_table,
[
TfundactcatForecast.fac_frcst_id
== LinkOpportunitySummaryFundingCategory.legacy_funding_category_id,
OpportunitySummary.opportunity_summary_id
== LinkOpportunitySummaryFundingCategory.opportunity_summary_id,
],
is_forecast=True,
is_delete=True,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_categories_group(delete_forecast_funding_category_records)

logger.info("Processing inserts/updates for forecast funding categories")
forecast_funding_category_records = self.fetch_with_opportunity_summary(
TfundactcatForecast,
link_table,
Expand All @@ -32,11 +48,28 @@ def transform_records(self) -> None:
== LinkOpportunitySummaryFundingCategory.opportunity_summary_id,
],
is_forecast=True,
is_delete=False,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_categories_group(forecast_funding_category_records)

logger.info("Processing synopsis funding categories")
logger.info("Processing deletes for synopsis funding categories")
delete_synopsis_funding_category_records = self.fetch_with_opportunity_summary(
TfundactcatSynopsis,
link_table,
[
TfundactcatSynopsis.fac_syn_id
== LinkOpportunitySummaryFundingCategory.legacy_funding_category_id,
OpportunitySummary.opportunity_summary_id
== LinkOpportunitySummaryFundingCategory.opportunity_summary_id,
],
is_forecast=False,
is_delete=True,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_categories_group(delete_synopsis_funding_category_records)

logger.info("Processing inserts/updates for synopsis funding categories")
synopsis_funding_category_records = self.fetch_with_opportunity_summary(
TfundactcatSynopsis,
link_table,
Expand All @@ -47,6 +80,7 @@ def transform_records(self) -> None:
== LinkOpportunitySummaryFundingCategory.opportunity_summary_id,
],
is_forecast=False,
is_delete=False,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_categories_group(synopsis_funding_category_records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,23 @@ def transform_records(self) -> None:
link_table = LinkOpportunitySummaryFundingInstrument
relationship_load_value = OpportunitySummary.link_funding_instruments

logger.info("Processing forecast funding instruments")
logger.info("Processing deletes for forecast funding instruments")
delete_forecast_funding_instrument_records = self.fetch_with_opportunity_summary(
TfundinstrForecast,
link_table,
[
TfundinstrForecast.fi_frcst_id
== LinkOpportunitySummaryFundingInstrument.legacy_funding_instrument_id,
OpportunitySummary.opportunity_summary_id
== LinkOpportunitySummaryFundingInstrument.opportunity_summary_id,
],
is_forecast=True,
is_delete=True,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_instruments_group(delete_forecast_funding_instrument_records)

logger.info("Processing inserts/updates for forecast funding instruments")
forecast_funding_instrument_records = self.fetch_with_opportunity_summary(
TfundinstrForecast,
link_table,
Expand All @@ -32,11 +48,28 @@ def transform_records(self) -> None:
== LinkOpportunitySummaryFundingInstrument.opportunity_summary_id,
],
is_forecast=True,
is_delete=False,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_instruments_group(forecast_funding_instrument_records)

logger.info("Processing synopsis funding instruments")
logger.info("Processing deletes for synopsis funding instruments")
delete_synopsis_funding_instrument_records = self.fetch_with_opportunity_summary(
TfundinstrSynopsis,
link_table,
[
TfundinstrSynopsis.fi_syn_id
== LinkOpportunitySummaryFundingInstrument.legacy_funding_instrument_id,
OpportunitySummary.opportunity_summary_id
== LinkOpportunitySummaryFundingInstrument.opportunity_summary_id,
],
is_forecast=False,
is_delete=True,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_instruments_group(delete_synopsis_funding_instrument_records)

logger.info("Processing inserts/updates for synopsis funding instruments")
synopsis_funding_instrument_records = self.fetch_with_opportunity_summary(
TfundinstrSynopsis,
link_table,
Expand All @@ -47,6 +80,7 @@ def transform_records(self) -> None:
== LinkOpportunitySummaryFundingInstrument.opportunity_summary_id,
],
is_forecast=False,
is_delete=False,
relationship_load_value=relationship_load_value,
)
self.process_link_funding_instruments_group(synopsis_funding_instrument_records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,95 @@ def test_process_applicant_types(self, db_session, transform_applicant_type):
assert transform_constants.Metrics.TOTAL_ERROR_COUNT not in metrics
assert metrics[transform_constants.Metrics.TOTAL_DELETE_ORPHANS_SKIPPED] == 1

def test_process_applicant_types_delete_and_inserts(self, db_session, transform_applicant_type):
"""Test that if we receive an insert and delete of the same lookup value
in a single batch, we'll delete and then insert the record (effectively no meaningful change)
"""
opportunity_summary_forecast = f.OpportunitySummaryFactory.create(
is_forecast=True, revision_number=None, no_link_values=True
)
forecast_insert1 = setup_applicant_type(
create_existing=False,
opportunity_summary=opportunity_summary_forecast,
legacy_lookup_value="04",
)
forecast_insert2 = setup_applicant_type(
create_existing=False,
opportunity_summary=opportunity_summary_forecast,
legacy_lookup_value="05",
)
forecast_delete1 = setup_applicant_type(
create_existing=True,
is_delete=True,
opportunity_summary=opportunity_summary_forecast,
legacy_lookup_value="04",
applicant_type=ApplicantType.SPECIAL_DISTRICT_GOVERNMENTS,
)
forecast_delete2 = setup_applicant_type(
create_existing=True,
is_delete=True,
opportunity_summary=opportunity_summary_forecast,
legacy_lookup_value="05",
applicant_type=ApplicantType.INDEPENDENT_SCHOOL_DISTRICTS,
)

opportunity_summary_syn = f.OpportunitySummaryFactory.create(
is_forecast=False, revision_number=None, no_link_values=True
)
syn_insert1 = setup_applicant_type(
create_existing=False,
opportunity_summary=opportunity_summary_syn,
legacy_lookup_value="25",
)
syn_insert2 = setup_applicant_type(
create_existing=False,
opportunity_summary=opportunity_summary_syn,
legacy_lookup_value="99",
)
syn_delete1 = setup_applicant_type(
create_existing=True,
is_delete=True,
opportunity_summary=opportunity_summary_syn,
legacy_lookup_value="25",
applicant_type=ApplicantType.OTHER,
)
syn_delete2 = setup_applicant_type(
create_existing=True,
is_delete=True,
opportunity_summary=opportunity_summary_syn,
legacy_lookup_value="99",
applicant_type=ApplicantType.UNRESTRICTED,
)

transform_applicant_type.run_subtask()

validate_applicant_type(
db_session,
forecast_insert1,
expected_applicant_type=ApplicantType.SPECIAL_DISTRICT_GOVERNMENTS,
)
validate_applicant_type(
db_session,
forecast_insert2,
expected_applicant_type=ApplicantType.INDEPENDENT_SCHOOL_DISTRICTS,
)
validate_applicant_type(
db_session, syn_insert1, expected_applicant_type=ApplicantType.OTHER
)
validate_applicant_type(
db_session, syn_insert2, expected_applicant_type=ApplicantType.UNRESTRICTED
)

validate_applicant_type(db_session, forecast_delete1, expect_in_db=False)
validate_applicant_type(db_session, forecast_delete2, expect_in_db=False)
validate_applicant_type(db_session, syn_delete1, expect_in_db=False)
validate_applicant_type(db_session, syn_delete2, expect_in_db=False)

metrics = transform_applicant_type.metrics
assert metrics[transform_constants.Metrics.TOTAL_RECORDS_PROCESSED] == 8
assert metrics[transform_constants.Metrics.TOTAL_RECORDS_DELETED] == 4
assert metrics[transform_constants.Metrics.TOTAL_RECORDS_INSERTED] == 4

@pytest.mark.parametrize("is_forecast", [True, False])
def test_process_applicant_type_but_no_opportunity_summary_non_hist(
self,
Expand Down
Loading

0 comments on commit 1ac1a2c

Please sign in to comment.