Skip to content

Commit

Permalink
Feat: transitFeedSyncProcessing implementation (#819)
Browse files Browse the repository at this point in the history
* feat: Add Transitland feed sync processor

This commit:
- Implements feed sync processing for Pub/Sub messages
- Ensures database consistency during sync operations
- Adds configuration files for feed sync settings
- Includes comprehensive test coverage
- Documents sync process and configuration options

* lint fix

* Refactor to use SQLAlchemy models for database operations

Replaced raw SQL queries with SQLAlchemy ORM models for handling database operations in feed processing. Enhanced test coverage and updated mock configurations to align with the new ORM-based approach.

* Remove unused freeze_time import from tests

* Update functions-python/feed_sync_process_transitland/src/main.py

Co-authored-by: cka-y <[email protected]>

* Refactor FeedProcessor for enhanced logging and error handling

Replaced custom logger setup with unified Logger class. Improved error handling and rollback in database transactions. Added location support and refined feed ID management. Updated test cases to reflect these changes.

* Update logging and refactor feed processing

Replaced direct logger calls with a unified log_message function to support both local and GCP logging. Refactored the test cases to mock enhanced logging and implemented new test scenarios to cover additional edge cases, ensuring robustness in feed processing.

* lint fix

* added pycountry to requirements.txt

* added additional test cases & included pycountry in requirements.txt

* added additional test cases & included pycountry in requirements.txt

* fix

* Add detailed error handling and checks for feed creation

 Refactored test coverage for feed processing, publish to batch topic, and event processing scenarios.

* Refactor mocking of PublisherClient in test setup.

* Update requirements: move pycountry to helpers

* Update requirements: pycountry

* Handle empty country name in get_country_code function

* Update test log message for empty country code

* fix: last test

---------

Co-authored-by: cka-y <[email protected]>
Co-authored-by: cka-y <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2024
1 parent 70f6dda commit a18227e
Show file tree
Hide file tree
Showing 16 changed files with 1,994 additions and 31 deletions.
2 changes: 1 addition & 1 deletion functions-python/batch_process_dataset/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ google-api-core
google-cloud-firestore
google-cloud-datastore
google-cloud-bigquery
cloudevents~=1.10.1
cloudevents~=1.10.1
9 changes: 9 additions & 0 deletions functions-python/feed_sync_process_transitland/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[run]
omit =
*/test*/*
*/dataset_service/*
*/helpers/*

[report]
exclude_lines =
if __name__ == .__main__.:
5 changes: 5 additions & 0 deletions functions-python/feed_sync_process_transitland/.env.rename_me
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Environment variables for tokens function to run locally. Delete this line after rename the file.
FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase
PROJECT_ID=mobility-feeds-dev
PUBSUB_TOPIC_NAME=my-topic
DATASET_BATCH_TOPIC_NAME=dataset_batch_topic_{env}_
107 changes: 107 additions & 0 deletions functions-python/feed_sync_process_transitland/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# TLD Feed Sync Process

Subscribed to the topic set in the `feed-sync-dispatcher` function, `feed-sync-process` is triggered for each message published. It handles the processing of feed updates, ensuring data consistency and integrity. The function performs the following operations:

1. **Feed Status Check**: It verifies the current state of the feed in the database using external_id and source.
2. **URL Validation**: Checks if the feed URL already exists in the database.
3. **Feed Processing**: Based on the current state:
- If no existing feed is found, creates a new feed entry
- If feed exists with a different URL, creates a new feed and deprecates the old one
- If feed exists with the same URL, no action is taken
4. **Batch Processing Trigger**: For non-authenticated feeds, publishes events to the dataset batch topic for further processing.

The function maintains feed history through the `redirectingid` table and ensures proper status tracking with 'active' and 'deprecated' states.

# Message Format
The function expects a Pub/Sub message with the following format:
```json
{
"message": {
"data": {
"external_id": "feed-identifier",
"feed_id": "unique-feed-id",
"feed_url": "http://example.com/feed",
"execution_id": "execution-identifier",
"spec": "gtfs",
"auth_info_url": null,
"auth_param_name": null,
"type": null,
"operator_name": "Transit Agency Name",
"country": "Country Name",
"state_province": "State/Province",
"city_name": "City Name",
"source": "TLD",
"payload_type": "new|update"
}
}
}
```

# Function Configuration
The function is configured using the following environment variables:
- `PROJECT_ID`: The Google Cloud project ID
- `DATASET_BATCH_TOPIC_NAME`: The name of the topic for batch processing triggers
- `FEEDS_DATABASE_URL`: The URL of the feeds database
- `ENV`: [Optional] Environment identifier (e.g., 'dev', 'prod')

# Database Schema
The function interacts with the following tables:
1. `feed`: Stores feed information
- Contains fields like id, data_type, feed_name, producer_url, etc.
- Tracks feed status ('active' or 'deprecated')
- Uses CURRENT_TIMESTAMP for created_at

2. `externalid`: Maps external identifiers to feed IDs
- Links external_id and source to feed entries
- Maintains source tracking

3. `redirectingid`: Tracks feed updates
- Maps old feed IDs to new ones
- Maintains update history

# Local development
The local development of this function follows the same steps as the other functions.

Install Google Pub/Sub emulator, please refer to the [README.md](../README.md) file for more information.

## Python requirements

- Install the requirements
```bash
pip install -r ./functions-python/feed_sync_process_transitland/requirements.txt
```

## Test locally with Google Cloud Emulators

- Execute the following commands to start the emulators:
```bash
gcloud beta emulators pubsub start --project=test-project --host-port='localhost:8043'
```

- Create a Pub/Sub topic in the emulator:
```bash
curl -X PUT "http://localhost:8043/v1/projects/test-project/topics/feed-sync-transitland"
```

- Start function
```bash
export PUBSUB_EMULATOR_HOST=localhost:8043 && ./scripts/function-python-run.sh --function_name feed_sync_process_transitland
```

- [Optional]: Create a local subscription to print published messages:
```bash
./scripts/pubsub_message_print.sh feed-sync-process-transitland
```

- Execute function
```bash
curl http://localhost:8080
```

- To run/debug from your IDE use the file `main_local_debug.py`

# Test
- Run the tests
```bash
./scripts/api-tests.sh --folder functions-python/feed_sync_dispatcher_transitland
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "feed-sync-process-transitland",
"description": "Feed Sync process for Transitland feeds",
"entry_point": "process_feed_event",
"timeout": 540,
"memory": "512Mi",
"trigger_http": true,
"include_folders": ["database_gen", "helpers"],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 20,
"max_instance_count": 10,
"min_instance_count": 0,
"available_cpu": 1
}
173 changes: 173 additions & 0 deletions functions-python/feed_sync_process_transitland/main_local_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Code to be able to debug locally without affecting the runtime cloud function.
Requirements:
- Google Cloud SDK installed
- Make sure to have the following environment variables set in your .env.local file:
- PROJECT_ID
- DATASET_BATCH_TOPIC_NAME
- FEEDS_DATABASE_URL
- Local database in running state
Usage:
- python feed_sync_process_transitland/main_local_debug.py
"""

import base64
import json
import os
from unittest.mock import MagicMock, patch
import logging
import sys

import pytest
from dotenv import load_dotenv

# Configure local logging first
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)

logger = logging.getLogger("feed_processor")

# Mock the Google Cloud Logger


class MockLogger:

"""Mock logger class"""

@staticmethod
def init_logger():
return MagicMock()

def __init__(self, name):
self.name = name

def get_logger(self):
return logger

def addFilter(self, filter):
pass


with patch("helpers.logger.Logger", MockLogger):
from feed_sync_process_transitland.src.main import process_feed_event

# Load environment variables
load_dotenv(dotenv_path=".env.rename_me")


class CloudEvent:
"""Cloud Event data structure."""

def __init__(self, attributes: dict, data: dict):
self.attributes = attributes
self.data = data


@pytest.fixture
def mock_pubsub():
"""Fixture to mock PubSub client"""
with patch("google.cloud.pubsub_v1.PublisherClient") as mock_publisher:
publisher_instance = MagicMock()

def mock_topic_path(project_id, topic_id):
return f"projects/{project_id}/topics/{topic_id}"

def mock_publish(topic_path, data):
logger.info(
f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}"
)
future = MagicMock()
future.result.return_value = "message_id"
return future

publisher_instance.topic_path.side_effect = mock_topic_path
publisher_instance.publish.side_effect = mock_publish
mock_publisher.return_value = publisher_instance

yield mock_publisher


def process_event_safely(cloud_event, description=""):
"""Process event with error handling."""
try:
logger.info(f"\nProcessing {description}:")
logger.info("-" * 50)
result = process_feed_event(cloud_event)
logger.info(f"Process result: {result}")
return True
except Exception as e:
logger.error(f"Error processing {description}: {str(e)}")
return False


def main():
"""Main function to run local debug tests"""
logger.info("Starting local debug session...")

# Define test event data
test_payload = {
"external_id": "test-feed-1",
"feed_id": "feed1",
"feed_url": "https://example.com/test-feed-2",
"execution_id": "local-debug-123",
"spec": "gtfs",
"auth_info_url": None,
"auth_param_name": None,
"type": None,
"operator_name": "Test Operator",
"country": "USA",
"state_province": "CA",
"city_name": "Test City",
"source": "TLD",
"payload_type": "new",
}

# Create cloud event
cloud_event = CloudEvent(
attributes={
"type": "com.google.cloud.pubsub.topic.publish",
"source": f"//pubsub.googleapis.com/projects/{os.getenv('PROJECT_ID')}/topics/test-topic",
},
data={
"message": {
"data": base64.b64encode(
json.dumps(test_payload).encode("utf-8")
).decode("utf-8")
}
},
)

# Set up mocks
with patch(
"google.cloud.pubsub_v1.PublisherClient", new_callable=MagicMock
) as mock_publisher, patch("google.cloud.logging.Client", MagicMock()):
publisher_instance = MagicMock()

def mock_topic_path(project_id, topic_id):
return f"projects/{project_id}/topics/{topic_id}"

def mock_publish(topic_path, data):
logger.info(
f"[LOCAL DEBUG] Would publish to {topic_path}: {data.decode('utf-8')}"
)
future = MagicMock()
future.result.return_value = "message_id"
return future

publisher_instance.topic_path.side_effect = mock_topic_path
publisher_instance.publish.side_effect = mock_publish
mock_publisher.return_value = publisher_instance

# Process test event
process_event_safely(cloud_event, "test feed event")

logger.info("Local debug session completed.")


if __name__ == "__main__":
main()
23 changes: 23 additions & 0 deletions functions-python/feed_sync_process_transitland/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Common packages
functions-framework==3.*
google-cloud-logging
psycopg2-binary==2.9.6
aiohttp~=3.10.5
asyncio~=3.4.3
urllib3~=2.2.2
requests~=2.32.3
attrs~=23.1.0
pluggy~=1.3.0
certifi~=2024.8.30

# SQL Alchemy and Geo Alchemy
SQLAlchemy==2.0.23
geoalchemy2==0.14.7

# Google specific packages for this function
google-cloud-pubsub
cloudevents~=1.10.1

# Additional packages for this function
pandas
pycountry
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Faker
pytest~=7.4.3
Empty file.
Loading

0 comments on commit a18227e

Please sign in to comment.