diff --git a/api/src/adapters/aws/pinpoint_adapter.py b/api/src/adapters/aws/pinpoint_adapter.py index 346429a40..3d1df2157 100644 --- a/api/src/adapters/aws/pinpoint_adapter.py +++ b/api/src/adapters/aws/pinpoint_adapter.py @@ -116,5 +116,10 @@ def _handle_mock_response(request: dict, to_address: str) -> PinpointResponse: return response +def _clear_mock_responses() -> None: + global _mock_responses + _mock_responses = [] + + def _get_mock_responses() -> list[tuple[dict, PinpointResponse]]: return _mock_responses diff --git a/api/src/db/models/user_models.py b/api/src/db/models/user_models.py index 70091869c..bf4f82f80 100644 --- a/api/src/db/models/user_models.py +++ b/api/src/db/models/user_models.py @@ -1,7 +1,7 @@ import uuid from datetime import datetime -from sqlalchemy import BigInteger, ForeignKey +from sqlalchemy import BigInteger, ForeignKey, and_ from sqlalchemy.dialects.postgresql import ARRAY, JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.sql.functions import now as sqlnow @@ -30,6 +30,22 @@ class User(ApiSchemaTable, TimestampMixin): "UserSavedSearch", back_populates="user", uselist=True, cascade="all, delete-orphan" ) + linked_login_gov_external_user: Mapped["LinkExternalUser | None"] = relationship( + "LinkExternalUser", + primaryjoin=lambda: and_( + LinkExternalUser.user_id == User.user_id, + LinkExternalUser.external_user_type == ExternalUserType.LOGIN_GOV, + ), + uselist=False, + viewonly=True, + ) + + @property + def email(self) -> str | None: + if self.linked_login_gov_external_user is not None: + return self.linked_login_gov_external_user.email + return None + class LinkExternalUser(ApiSchemaTable, TimestampMixin): __tablename__ = "link_external_user" diff --git a/api/src/task/notifications/generate_notifications.py b/api/src/task/notifications/generate_notifications.py index da08417a0..1682a751b 100644 --- a/api/src/task/notifications/generate_notifications.py +++ b/api/src/task/notifications/generate_notifications.py @@ -3,23 +3,36 @@ from dataclasses import dataclass, field from enum import StrEnum +import botocore.client +from pydantic import Field from sqlalchemy import select, update import src.adapters.db as db import src.adapters.db.flask_db as flask_db import src.adapters.search as search import src.adapters.search.flask_opensearch as flask_opensearch +from src.adapters.aws.pinpoint_adapter import send_pinpoint_email_raw from src.db.models.opportunity_models import OpportunityChangeAudit -from src.db.models.user_models import UserNotificationLog, UserSavedOpportunity, UserSavedSearch +from src.db.models.user_models import ( + User, + UserNotificationLog, + UserSavedOpportunity, + UserSavedSearch, +) from src.services.opportunities_v1.search_opportunities import search_opportunities_id from src.task.ecs_background_task import ecs_background_task from src.task.task import Task from src.task.task_blueprint import task_blueprint from src.util import datetime_util +from src.util.env_config import PydanticBaseEnvConfig logger = logging.getLogger(__name__) +class GenerateNotificationsConfig(PydanticBaseEnvConfig): + app_id: str = Field(alias="PINPOINT_APP_ID") + + @task_blueprint.cli.command( "generate-notifications", help="Send notifications for opportunity and search changes" ) @@ -55,10 +68,20 @@ class Metrics(StrEnum): SEARCHES_TRACKED = "searches_tracked" NOTIFICATIONS_SENT = "notifications_sent" - def __init__(self, db_session: db.Session, search_client: search.SearchClient) -> None: + def __init__( + self, + db_session: db.Session, + search_client: search.SearchClient, + pinpoint_client: botocore.client.BaseClient | None = None, + pinpoint_app_id: str | None = None, + ) -> None: super().__init__(db_session) + self.config = GenerateNotificationsConfig() + self.user_notification_map: dict[uuid.UUID, NotificationContainer] = {} self.search_client = search_client + self.pinpoint_client = pinpoint_client + self.app_id = pinpoint_app_id def run_task(self) -> None: """Main task logic to collect and send notifications""" @@ -150,9 +173,22 @@ def _send_notifications(self) -> None: if not container.saved_opportunities and not container.saved_searches: continue - # TODO: Implement actual notification sending in future ticket + user = self.db_session.execute( + select(User).where(User.user_id == user_id) + ).scalar_one_or_none() + + if not user or not user.email: + logger.warning("No email found for user", extra={"user_id": user_id}) + continue + + # Send email via Pinpoint + subject = "Updates to Your Saved Opportunities" + message = ( + f"You have updates to {len(container.saved_opportunities)} saved opportunities" + ) + logger.info( - "Would send notification to user", + "Sending notification to user", extra={ "user_id": user_id, "opportunity_count": len(container.saved_opportunities), @@ -160,21 +196,48 @@ def _send_notifications(self) -> None: }, ) - # Create notification log entry notification_log = UserNotificationLog( user_id=user_id, notification_reason=NotificationConstants.OPPORTUNITY_UPDATES, - notification_sent=True, + notification_sent=False, # Default to False, update on success ) self.db_session.add(notification_log) + try: + send_pinpoint_email_raw( + to_address=user.email, + subject=subject, + message=message, + pinpoint_client=self.pinpoint_client, + app_id=self.config.app_id, + ) + notification_log.notification_sent = True + logger.info( + "Successfully sent notification to user", + extra={ + "user_id": user_id, + "opportunity_count": len(container.saved_opportunities), + "search_count": len(container.saved_searches), + }, + ) + except Exception: + # Notification log will be updated in the finally block + logger.exception( + "Failed to send notification email", + extra={"user_id": user_id, "email": user.email}, + ) + + self.db_session.add(notification_log) + if container.saved_searches: - notification_log = UserNotificationLog( + search_notification_log = UserNotificationLog( user_id=user_id, notification_reason=NotificationConstants.SEARCH_UPDATES, - notification_sent=True, + notification_sent=False, # Default to False, update if email was successful ) - self.db_session.add(notification_log) + self.db_session.add(search_notification_log) + if notification_log.notification_sent: + search_notification_log.notification_sent = True # Update last_notified_at for all opportunities we just notified about opportunity_ids = [ diff --git a/api/tests/src/task/notifications/test_generate_notifications.py b/api/tests/src/task/notifications/test_generate_notifications.py index c14fd067a..970a84d65 100644 --- a/api/tests/src/task/notifications/test_generate_notifications.py +++ b/api/tests/src/task/notifications/test_generate_notifications.py @@ -1,8 +1,10 @@ from datetime import timedelta import pytest +from sqlalchemy import select import tests.src.db.models.factories as factories +from src.adapters.aws.pinpoint_adapter import _clear_mock_responses, _get_mock_responses from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema from src.db.models.user_models import UserNotificationLog, UserSavedOpportunity, UserSavedSearch from src.task.notifications.generate_notifications import ( @@ -14,6 +16,13 @@ from tests.src.api.opportunities_v1.test_opportunity_route_search import OPPORTUNITIES +@pytest.fixture +def user_with_email(db_session, user, monkeypatch): + monkeypatch.setenv("PINPOINT_APP_ID", "test-app-id") + factories.LinkExternalUserFactory.create(user=user, email="test@example.com") + return user + + @pytest.fixture def setup_search_data(opportunity_index, opportunity_index_alias, search_client): # Load into the search index @@ -33,14 +42,93 @@ def clear_notification_logs(db_session): db_session.query(UserSavedSearch).delete() -def test_via_cli(cli_runner, db_session, enable_factory_create, user): +def test_via_cli(cli_runner, db_session, enable_factory_create, user, user_with_email): """Simple test that verifies we can invoke the notification task via CLI""" result = cli_runner.invoke(args=["task", "generate-notifications"]) assert result.exit_code == 0 -def test_collect_notifications_cli(cli_runner, db_session, enable_factory_create, user, caplog): +def test_search_notifications_cli( + cli_runner, + db_session, + enable_factory_create, + user, + user_with_email, + caplog, + clear_notification_logs, + setup_search_data, +): + """Test that verifies we can collect and send search notifications via CLI""" + + # Create a saved search that needs notification + saved_search = factories.UserSavedSearchFactory.create( + user=user, + search_query={"keywords": "test"}, + name="Test Search", + last_notified_at=datetime_util.utcnow() - timedelta(days=1), + searched_opportunity_ids=[1, 2, 3], + ) + + notification_logs_count = ( + db_session.query(UserNotificationLog) + .filter(UserNotificationLog.notification_reason == NotificationConstants.SEARCH_UPDATES) + .count() + ) + + _clear_mock_responses() + + result = cli_runner.invoke(args=["task", "generate-notifications"]) + + assert result.exit_code == 0 + + # Verify expected log messages + assert "Collected search notifications" in caplog.text + assert "Sending notification to user" in caplog.text + + # Verify the log contains the correct metrics + log_records = [r for r in caplog.records if "Sending notification to user" in r.message] + assert len(log_records) == 1 + extra = log_records[0].__dict__ + assert extra["user_id"] == user.user_id + assert extra["opportunity_count"] == 0 + assert extra["search_count"] == 1 + + # Verify notification log was created + notification_logs = ( + db_session.query(UserNotificationLog) + .filter(UserNotificationLog.notification_reason == NotificationConstants.SEARCH_UPDATES) + .all() + ) + assert len(notification_logs) == notification_logs_count + 1 + + # Verify last_notified_at was updated + db_session.refresh(saved_search) + assert saved_search.last_notified_at > datetime_util.utcnow() - timedelta(minutes=1) + + # Verify email was sent via Pinpoint + mock_responses = _get_mock_responses() + assert len(mock_responses) == 1 + + request = mock_responses[0][0] + assert request["MessageRequest"]["Addresses"] == {"test@example.com": {"ChannelType": "EMAIL"}} + + # Verify notification log was created + notification_logs = ( + db_session.execute( + select(UserNotificationLog).where(UserNotificationLog.user_id == user.user_id) + ) + .scalars() + .all() + ) + + assert len(notification_logs) == 2 + assert notification_logs[0].notification_sent is True + + +def test_collect_notifications_cli( + cli_runner, db_session, enable_factory_create, user, user_with_email, caplog +): """Simple test that verifies we can invoke the notification task via CLI""" # Create a saved opportunity that needs notification opportunity = factories.OpportunityFactory.create() @@ -60,10 +148,10 @@ def test_collect_notifications_cli(cli_runner, db_session, enable_factory_create # Verify expected log messages assert "Collected opportunity notifications" in caplog.text - assert "Would send notification to user" in caplog.text + assert "Sending notification to user" in caplog.text # Verify the log contains the correct metrics - log_records = [r for r in caplog.records if "Would send notification to user" in r.message] + log_records = [r for r in caplog.records if "Sending notification to user" in r.message] assert len(log_records) == 1 extra = log_records[0].__dict__ assert extra["user_id"] == user.user_id @@ -71,7 +159,9 @@ def test_collect_notifications_cli(cli_runner, db_session, enable_factory_create assert extra["search_count"] == 0 -def test_last_notified_at_updates(cli_runner, db_session, enable_factory_create, user): +def test_last_notified_at_updates( + cli_runner, db_session, enable_factory_create, user, user_with_email +): """Test that last_notified_at gets updated after sending notifications""" # Create an opportunity that was updated after the last notification opportunity = factories.OpportunityFactory.create() @@ -101,7 +191,7 @@ def test_last_notified_at_updates(cli_runner, db_session, enable_factory_create, def test_notification_log_creation( - cli_runner, db_session, enable_factory_create, clear_notification_logs, user + cli_runner, db_session, enable_factory_create, clear_notification_logs, user, user_with_email ): """Test that notification logs are created when notifications are sent""" # Create a saved opportunity that needs notification @@ -132,7 +222,7 @@ def test_notification_log_creation( def test_no_notification_log_when_no_updates( - cli_runner, db_session, enable_factory_create, clear_notification_logs, user + cli_runner, db_session, enable_factory_create, clear_notification_logs, user, user_with_email ): """Test that no notification log is created when there are no updates""" # Create a saved opportunity that doesn't need notification @@ -152,60 +242,12 @@ def test_no_notification_log_when_no_updates( assert len(notification_logs) == 0 -def test_search_notifications_cli( - cli_runner, - db_session, - enable_factory_create, - user, - caplog, - clear_notification_logs, - setup_search_data, -): - """Test that verifies we can collect and send search notifications via CLI""" - - # Create a saved search that needs notification - saved_search = factories.UserSavedSearchFactory.create( - user=user, - search_query={"keywords": "test"}, - name="Test Search", - last_notified_at=datetime_util.utcnow() - timedelta(days=1), - searched_opportunity_ids=[1, 2, 3], - ) - - result = cli_runner.invoke(args=["task", "generate-notifications"]) - - assert result.exit_code == 0 - - # Verify expected log messages - assert "Collected search notifications" in caplog.text - assert "Would send notification to user" in caplog.text - - # Verify the log contains the correct metrics - log_records = [r for r in caplog.records if "Would send notification to user" in r.message] - assert len(log_records) == 1 - extra = log_records[0].__dict__ - assert extra["user_id"] == user.user_id - assert extra["opportunity_count"] == 0 - assert extra["search_count"] == 1 - - # Verify notification log was created - notification_logs = ( - db_session.query(UserNotificationLog) - .filter(UserNotificationLog.notification_reason == NotificationConstants.SEARCH_UPDATES) - .all() - ) - assert len(notification_logs) == 1 - - # Verify last_notified_at was updated - db_session.refresh(saved_search) - assert saved_search.last_notified_at > datetime_util.utcnow() - timedelta(minutes=1) - - def test_combined_notifications_cli( cli_runner, db_session, enable_factory_create, user, + user_with_email, caplog, clear_notification_logs, ): @@ -238,10 +280,10 @@ def test_combined_notifications_cli( # Verify expected log messages assert "Collected opportunity notifications" in caplog.text assert "Collected search notifications" in caplog.text - assert "Would send notification to user" in caplog.text + assert "Sending notification to user" in caplog.text # Verify the log contains the correct metrics - log_records = [r for r in caplog.records if "Would send notification to user" in r.message] + log_records = [r for r in caplog.records if "Sending notification to user" in r.message] assert len(log_records) == 1 extra = log_records[0].__dict__ assert extra["user_id"] == user.user_id @@ -270,11 +312,15 @@ def test_grouped_search_queries_cli( db_session, enable_factory_create, clear_notification_logs, + user, + user_with_email, ): """Test that verifies we properly handle multiple users with the same search query""" # Create two users with the same search query user1 = factories.UserFactory.create() user2 = factories.UserFactory.create() + factories.LinkExternalUserFactory.create(user=user1, email="user1@example.com") + factories.LinkExternalUserFactory.create(user=user2, email="user2@example.com") same_search_query = {"keywords": "shared search"} @@ -325,6 +371,7 @@ def test_search_notifications_on_index_change( db_session, enable_factory_create, user, + user_with_email, opportunity_index, search_client, clear_notification_logs,