Skip to content

Commit

Permalink
[Issue #3536] Add saved opportunity notifications to backend job (#3639)
Browse files Browse the repository at this point in the history
## Summary
Fixes #3536

### Time to review: 15 mins

## Changes proposed
Add `last_notified_at` to `user_saved_opportunity`
Collect opportunity notifications, log them out and test.

## Context for reviewers
Can be merged, now that #3527 is wrapped up. 

## Additional information
See attached unit tests
  • Loading branch information
mikehgrantsgov authored Jan 29, 2025
1 parent 73b32a7 commit 0dd0466
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add last_notified_at to user saved opportunity table
Revision ID: 43b179a7c92e
Revises: dc04ce955a9a
Create Date: 2025-01-24 17:15:14.064880
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "43b179a7c92e"
down_revision = "9e7fc937646a"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"user_saved_opportunity",
sa.Column(
"last_notified_at", sa.TIMESTAMP(timezone=True), server_default="NOW()", nullable=False
),
schema="api",
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("user_saved_opportunity", "last_notified_at", schema="api")
# ### end Alembic commands ###
8 changes: 7 additions & 1 deletion api/src/db/models/user_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class UserSavedOpportunity(ApiSchemaTable, TimestampMixin):
BigInteger, ForeignKey(Opportunity.opportunity_id), primary_key=True
)

last_notified_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow, server_default="NOW()", nullable=False
)

user: Mapped[User] = relationship(User, back_populates="saved_opportunities")
opportunity: Mapped[Opportunity] = relationship(
"Opportunity", back_populates="saved_opportunities_by_users"
Expand Down Expand Up @@ -115,7 +119,9 @@ class UserSavedSearch(ApiSchemaTable, TimestampMixin):
class UserNotificationLog(ApiSchemaTable, TimestampMixin):
__tablename__ = "user_notification_log"

user_notification_log_id: Mapped[uuid.UUID] = mapped_column(UUID, primary_key=True)
user_notification_log_id: Mapped[uuid.UUID] = mapped_column(
UUID, primary_key=True, default=uuid.uuid4
)

user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey(User.user_id), index=True)
user: Mapped[User] = relationship(User)
Expand Down
74 changes: 62 additions & 12 deletions api/src/task/notifications/generate_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from dataclasses import dataclass, field
from enum import StrEnum

from sqlalchemy import select, update

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
from src.db.models.user_models import User
from src.db.models.opportunity_models import OpportunityChangeAudit
from src.db.models.user_models import UserNotificationLog, UserSavedOpportunity
from src.task.ecs_background_task import ecs_background_task
from src.task.task import Task
from src.task.task_blueprint import task_blueprint
from src.util import datetime_util

logger = logging.getLogger(__name__)

Expand All @@ -24,12 +28,15 @@ def run_notification_task(db_session: db.Session) -> None:
task.run()


class NotificationConstants:
OPPORTUNITY_UPDATES = "opportunity_updates"


@dataclass
class NotificationContainer:
"""Container for collecting notifications for a single user"""

user: User
updated_opportunity_ids: list[int] = field(default_factory=list)
saved_opportunities: list[UserSavedOpportunity] = field(default_factory=list)
# TODO: Change from str to something else
updated_searches: list[str] = field(default_factory=list)

Expand All @@ -55,11 +62,35 @@ def run_task(self) -> None:
self._send_notifications()

def _collect_opportunity_notifications(self) -> None:
"""Collect notifications for changed opportunities
To be implemented in future ticket
"""
logger.info("Opportunity notification collection not yet implemented")
pass
"""Collect notifications for changed opportunities that users are tracking"""
stmt = (
select(UserSavedOpportunity)
.join(
OpportunityChangeAudit,
OpportunityChangeAudit.opportunity_id == UserSavedOpportunity.opportunity_id,
)
.where(OpportunityChangeAudit.updated_at > UserSavedOpportunity.last_notified_at)
.distinct()
)

results = self.db_session.execute(stmt)

for row in results.scalars():
user_id = row.user_id
if user_id not in self.user_notification_map:
self.user_notification_map[user_id] = NotificationContainer()
self.user_notification_map[user_id].saved_opportunities.append(row)

logger.info(
"Collected opportunity notifications",
extra={
"user_count": len(self.user_notification_map),
"total_notifications": sum(
len(container.saved_opportunities)
for container in self.user_notification_map.values()
),
},
)

def _collect_search_notifications(self) -> None:
"""Collect notifications for changed saved searches
Expand All @@ -71,22 +102,41 @@ def _collect_search_notifications(self) -> None:
def _send_notifications(self) -> None:
"""Send collected notifications to users"""
for user_id, container in self.user_notification_map.items():
if not container.updated_opportunity_ids and not container.updated_searches:
if not container.saved_opportunities and not container.updated_searches:
continue

# TODO: Implement actual notification sending in future ticket
logger.info(
"Would send notification to user",
extra={
"user_id": user_id,
"opportunity_count": len(container.updated_opportunity_ids),
"opportunity_count": len(container.saved_opportunities),
"search_count": len(container.updated_searches),
},
)

self.increment(
self.Metrics.OPPORTUNITIES_TRACKED, len(container.updated_opportunity_ids)
# Create notification log entry
notification_log = UserNotificationLog(
user_id=user_id,
notification_reason=NotificationConstants.OPPORTUNITY_UPDATES,
notification_sent=True,
)
self.db_session.add(notification_log)

# Update last_notified_at for all opportunities we just notified about
opportunity_ids = [
saved_opp.opportunity_id for saved_opp in container.saved_opportunities
]
self.db_session.execute(
update(UserSavedOpportunity)
.where(
UserSavedOpportunity.user_id == user_id,
UserSavedOpportunity.opportunity_id.in_(opportunity_ids),
)
.values(last_notified_at=datetime_util.utcnow())
)

self.increment(self.Metrics.OPPORTUNITIES_TRACKED, len(container.saved_opportunities))
self.increment(self.Metrics.SEARCHES_TRACKED, len(container.updated_searches))
self.increment(self.Metrics.NOTIFICATIONS_SENT)
self.increment(self.Metrics.USERS_NOTIFIED)
9 changes: 7 additions & 2 deletions api/src/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ def run(self) -> None:
job_succeeded = True

try:
logger.info("Starting %s", self.cls_name())
start = time.perf_counter()
# Create initial job record
self.job = JobLog(job_type=self.cls_name(), job_status=JobStatus.STARTED)
self.db_session.add(self.job)
self.db_session.commit()

# Create initial job record
self.job = JobLog(job_type=self.cls_name(), job_status=JobStatus.STARTED)
Expand All @@ -49,6 +51,9 @@ def run(self) -> None:
# Run the actual task
self.run_task()

logger.info("Starting %s", self.cls_name())
start = time.perf_counter()

# Calculate and set a duration
end = time.perf_counter()
duration = round((end - start), 3)
Expand Down
127 changes: 127 additions & 0 deletions api/tests/src/task/notifications/test_generate_notifications.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,132 @@
from datetime import timedelta

import pytest

import tests.src.db.models.factories as factories
from src.db.models.user_models import UserNotificationLog
from src.task.notifications.generate_notifications import NotificationConstants


@pytest.fixture
def clear_notification_logs(db_session):
"""Clear all notification logs"""
db_session.query(UserNotificationLog).delete()


def test_via_cli(cli_runner, db_session, enable_factory_create, user):
"""Simple test that verifies we can invoke the notification task via CLI"""
result = cli_runner.invoke(args=["task", "generate-notifications"])

assert result.exit_code == 0


def test_collect_notifications_cli(cli_runner, db_session, enable_factory_create, user, caplog):
"""Simple test that verifies we can invoke the notification task via CLI"""
# Create a saved opportunity that needs notification
opportunity = factories.OpportunityFactory.create()
saved_opportunity = factories.UserSavedOpportunityFactory.create(
user=user,
opportunity=opportunity,
last_notified_at=opportunity.updated_at - timedelta(days=1),
)
factories.OpportunityChangeAuditFactory.create(
opportunity=opportunity,
updated_at=saved_opportunity.last_notified_at + timedelta(minutes=1),
)

result = cli_runner.invoke(args=["task", "generate-notifications"])

assert result.exit_code == 0

# Verify expected log messages
assert "Collected opportunity notifications" in caplog.text
assert "Would send notification to user" in caplog.text

# Verify the log contains the correct metrics
log_records = [r for r in caplog.records if "Would send notification to user" in r.message]
assert len(log_records) == 1
extra = log_records[0].__dict__
assert extra["user_id"] == user.user_id
assert extra["opportunity_count"] == 1
assert extra["search_count"] == 0


def test_last_notified_at_updates(cli_runner, db_session, enable_factory_create, user):
"""Test that last_notified_at gets updated after sending notifications"""
# Create an opportunity that was updated after the last notification
opportunity = factories.OpportunityFactory.create()
saved_opp = factories.UserSavedOpportunityFactory.create(
user=user,
opportunity=opportunity,
last_notified_at=opportunity.updated_at - timedelta(days=1),
)
factories.OpportunityChangeAuditFactory.create(
opportunity=opportunity,
updated_at=saved_opp.last_notified_at + timedelta(minutes=1),
)
# Store the original notification time
original_notification_time = saved_opp.last_notified_at

# Run the notification task
result = cli_runner.invoke(args=["task", "generate-notifications"])
assert result.exit_code == 0

# Refresh the saved opportunity from the database
db_session.refresh(saved_opp)

# Verify last_notified_at was updated
assert saved_opp.last_notified_at > original_notification_time
# Verify last_notified_at is now after the opportunity's updated_at
assert saved_opp.last_notified_at > opportunity.updated_at


def test_notification_log_creation(
cli_runner, db_session, enable_factory_create, clear_notification_logs, user
):
"""Test that notification logs are created when notifications are sent"""
# Create a saved opportunity that needs notification
opportunity = factories.OpportunityFactory.create()
saved_opportunity = factories.UserSavedOpportunityFactory.create(
user=user,
opportunity=opportunity,
last_notified_at=opportunity.updated_at - timedelta(days=1),
)

factories.OpportunityChangeAuditFactory.create(
opportunity=opportunity,
updated_at=saved_opportunity.last_notified_at + timedelta(minutes=1),
)

# Run the notification task
result = cli_runner.invoke(args=["task", "generate-notifications"])
assert result.exit_code == 0

# Verify notification log was created
notification_logs = db_session.query(UserNotificationLog).all()
assert len(notification_logs) == 1

log = notification_logs[0]
assert log.user_id == user.user_id
assert log.notification_reason == NotificationConstants.OPPORTUNITY_UPDATES
assert log.notification_sent is True


def test_no_notification_log_when_no_updates(
cli_runner, db_session, enable_factory_create, clear_notification_logs, user
):
"""Test that no notification log is created when there are no updates"""
# Create a saved opportunity that doesn't need notification
opportunity = factories.OpportunityFactory.create()
factories.UserSavedOpportunityFactory.create(
user=user,
opportunity=opportunity,
last_notified_at=opportunity.updated_at + timedelta(minutes=1), # After the update
)

# Run the notification task
result = cli_runner.invoke(args=["task", "generate-notifications"])
assert result.exit_code == 0

# Verify no notification log was created
notification_logs = db_session.query(UserNotificationLog).all()
assert len(notification_logs) == 0
Binary file modified documentation/api/database/erds/api-schema.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 0dd0466

Please sign in to comment.