Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Automate location extraction and english translation #642

Merged
merged 24 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/scripts/populate_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ def populate_location(self, feed, row, stable_id):
"""
Populate the location for the feed
"""
# TODO: validate behaviour for gtfs-rt feeds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be validated as part of #623

if feed.locations and feed.data_type == "gtfs":
self.logger.warning(f"Location already exists for feed {stable_id}")
return

country_code = self.get_safe_value(row, "location.country_code", "")
subdivision_name = self.get_safe_value(row, "location.subdivision_name", "")
municipality = self.get_safe_value(row, "location.municipality", "")
Expand Down
18 changes: 0 additions & 18 deletions functions-python/extract_bb/README.md

This file was deleted.

26 changes: 26 additions & 0 deletions functions-python/extract_location/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## Function Workflow

1. **Eventarc Trigger**: The original function is triggered by a `CloudEvent` indicating a GTFS dataset upload. It parses the event data to identify the dataset and calculates the bounding box and location information from the GTFS feed.

2. **Pub/Sub Triggered Function**: A new function is triggered by Pub/Sub messages. This allows for batch processing of dataset extractions, enabling multiple datasets to be processed in parallel without waiting for each one to complete sequentially.

3. **HTTP Triggered Batch Function**: Another function, triggered via HTTP request, identifies all latest datasets lacking bounding box or location information. It then publishes messages to the Pub/Sub topic to trigger the extraction process for these datasets.

4. **Data Parsing**: Extracts `stable_id`, `dataset_id`, and the GTFS feed `url` from the triggering event or message.

5. **GTFS Feed Processing**: Retrieves bounding box coordinates and other location-related information from the GTFS feed located at the provided URL.

6. **Database Update**: Updates the bounding box and location information for the dataset in the database.

## Expected Behavior

- Bounding boxes and location information are extracted for the latest datasets that are missing them, improving the efficiency of the process by utilizing both batch and individual dataset processing mechanisms.

## Function Configuration

The functions rely on the following environment variables:
- `FEEDS_DATABASE_URL`: The database URL for connecting to the database containing GTFS datasets.

## Local Development

Local development of these functions should follow standard practices for GCP serverless functions. For general instructions on setting up the development environment, refer to the main [README.md](../README.md) file.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "extract-bounding-box",
"name": "extract-location",
"description": "Extracts the bounding box from a dataset",
"entry_point": "extract_bounding_box",
"entry_point": "extract_location",
"timeout": 540,
"memory": "8Gi",
"trigger_http": false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import numpy
from geoalchemy2 import WKTElement

from database_gen.sqlacodegen_models import Gtfsdataset


def create_polygon_wkt_element(bounds: numpy.ndarray) -> WKTElement:
"""
Create a WKTElement polygon from bounding box coordinates.
@:param bounds (numpy.ndarray): Bounding box coordinates.
@:return WKTElement: The polygon representation of the bounding box.
"""
min_longitude, min_latitude, max_longitude, max_latitude = bounds
points = [
(min_longitude, min_latitude),
(min_longitude, max_latitude),
(max_longitude, max_latitude),
(max_longitude, min_latitude),
(min_longitude, min_latitude),
]
wkt_polygon = f"POLYGON(({', '.join(f'{lon} {lat}' for lon, lat in points)}))"
return WKTElement(wkt_polygon, srid=4326)


def update_dataset_bounding_box(session, dataset_id, geometry_polygon):
"""
Update the bounding box of a dataset in the database.
@:param session (Session): The database session.
@:param dataset_id (str): The ID of the dataset.
@:param geometry_polygon (WKTElement): The polygon representing the bounding box.
@:raises Exception: If the dataset is not found in the database.
"""
dataset: Gtfsdataset | None = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.stable_id == dataset_id)
.one_or_none()
)
if dataset is None:
raise Exception(f"Dataset {dataset_id} does not exist in the database.")
dataset.bounding_box = geometry_polygon
session.add(dataset)
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
from datetime import datetime

import functions_framework
import gtfs_kit
import numpy
from cloudevents.http import CloudEvent
from geoalchemy2 import WKTElement
from google.cloud import pubsub_v1
from sqlalchemy import or_
from sqlalchemy.orm import joinedload

from database_gen.sqlacodegen_models import Gtfsdataset
from helpers.database import start_db_session
from helpers.logger import Logger
from dataset_service.main import (
DatasetTraceService,
DatasetTrace,
Status,
PipelineStage,
)
from helpers.database import start_db_session
from helpers.logger import Logger
from .bounding_box.bounding_box_extractor import (
create_polygon_wkt_element,
update_dataset_bounding_box,
)
from .reverse_geolocation.location_extractor import update_location, reverse_coords
from .stops_utils import get_gtfs_feed_bounds_and_points

logging.basicConfig(level=logging.INFO)

Expand All @@ -40,64 +45,10 @@ def parse_resource_data(data: dict) -> tuple:
return stable_id, dataset_id, url


def get_gtfs_feed_bounds(url: str, dataset_id: str) -> numpy.ndarray:
"""
Retrieve the bounding box coordinates from the GTFS feed.
@:param url (str): URL to the GTFS feed.
@:param dataset_id (str): ID of the dataset for logs
@:return numpy.ndarray: An array containing the bounds (min_longitude, min_latitude, max_longitude, max_latitude).
@:raises Exception: If the GTFS feed is invalid
"""
try:
feed = gtfs_kit.read_feed(url, "km")
return feed.compute_bounds()
except Exception as e:
logging.error(f"[{dataset_id}] Error retrieving GTFS feed from {url}: {e}")
raise Exception(e)


def create_polygon_wkt_element(bounds: numpy.ndarray) -> WKTElement:
"""
Create a WKTElement polygon from bounding box coordinates.
@:param bounds (numpy.ndarray): Bounding box coordinates.
@:return WKTElement: The polygon representation of the bounding box.
"""
min_longitude, min_latitude, max_longitude, max_latitude = bounds
points = [
(min_longitude, min_latitude),
(min_longitude, max_latitude),
(max_longitude, max_latitude),
(max_longitude, min_latitude),
(min_longitude, min_latitude),
]
wkt_polygon = f"POLYGON(({', '.join(f'{lon} {lat}' for lon, lat in points)}))"
return WKTElement(wkt_polygon, srid=4326)


def update_dataset_bounding_box(session, dataset_id, geometry_polygon):
"""
Update the bounding box of a dataset in the database.
@:param session (Session): The database session.
@:param dataset_id (str): The ID of the dataset.
@:param geometry_polygon (WKTElement): The polygon representing the bounding box.
@:raises Exception: If the dataset is not found in the database.
"""
dataset: Gtfsdataset | None = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.stable_id == dataset_id)
.one_or_none()
)
if dataset is None:
raise Exception(f"Dataset {dataset_id} does not exist in the database.")
dataset.bounding_box = geometry_polygon
session.add(dataset)
session.commit()


@functions_framework.cloud_event
def extract_bounding_box_pubsub(cloud_event: CloudEvent):
def extract_location_pubsub(cloud_event: CloudEvent):
"""
Main function triggered by a Pub/Sub message to extract and update the bounding box in the database.
Main function triggered by a Pub/Sub message to extract and update the location information in the database.
@param cloud_event: The CloudEvent containing the Pub/Sub message.
"""
Logger.init_logger()
Expand All @@ -106,6 +57,7 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):
except ValueError:
maximum_executions = 1
data = cloud_event.data
location_extraction_n_points = os.getenv("LOCATION_EXTRACTION_N_POINTS", 5)
logging.info(f"Function triggered with Pub/Sub event data: {data}")

# Extract the Pub/Sub message data
Expand Down Expand Up @@ -164,7 +116,9 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):
try:
logging.info(f"[{dataset_id}] accessing url: {url}")
try:
bounds = get_gtfs_feed_bounds(url, dataset_id)
bounds, location_geo_points = get_gtfs_feed_bounds_and_points(
url, dataset_id, location_extraction_n_points
)
except Exception as e:
error = f"Error processing GTFS feed: {e}"
raise e
Expand All @@ -176,16 +130,19 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):
try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
update_dataset_bounding_box(session, dataset_id, geometry_polygon)
update_location(reverse_coords(location_geo_points), dataset_id, session)
except Exception as e:
error = f"Error updating bounding box in database: {e}"
error = f"Error updating location information in database: {e}"
logging.error(f"[{dataset_id}] Error while processing: {e}")
if session is not None:
session.rollback()
raise e
finally:
if session is not None:
session.close()
logging.info(f"[{stable_id} - {dataset_id}] Bounding box updated successfully.")
logging.info(
f"[{stable_id} - {dataset_id}] Location information updated successfully."
)
except Exception:
pass
finally:
Expand All @@ -195,7 +152,7 @@ def extract_bounding_box_pubsub(cloud_event: CloudEvent):


@functions_framework.cloud_event
def extract_bounding_box(cloud_event: CloudEvent):
def extract_location(cloud_event: CloudEvent):
"""
Wrapper function to extract necessary data from the CloudEvent and call the core function.
@param cloud_event: The CloudEvent containing the Pub/Sub message.
Expand Down Expand Up @@ -232,15 +189,16 @@ def extract_bounding_box(cloud_event: CloudEvent):
new_cloud_event = CloudEvent(attributes=attributes, data=new_cloud_event_data)

# Call the pubsub function with the constructed CloudEvent
return extract_bounding_box_pubsub(new_cloud_event)
return extract_location_pubsub(new_cloud_event)


@functions_framework.http
def extract_bounding_box_batch(_):
def extract_location_batch(_):
Logger.init_logger()
logging.info("Batch function triggered.")

pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None)
force_datasets_update = bool(os.getenv("FORCE_DATASETS_UPDATE", False))
if pubsub_topic_name is None:
logging.error("PUBSUB_TOPIC_NAME environment variable not set.")
return "PUBSUB_TOPIC_NAME environment variable not set.", 500
Expand All @@ -251,15 +209,22 @@ def extract_bounding_box_batch(_):
datasets_data = []
try:
session = start_db_session(os.getenv("FEEDS_DATABASE_URL"))
# Select all latest datasets with no bounding boxes or all datasets if forced
datasets = (
session.query(Gtfsdataset)
.filter(Gtfsdataset.bounding_box == None) # noqa: E711
.filter(
or_(
force_datasets_update,
Gtfsdataset.bounding_box == None, # noqa: E711
)
)
.filter(Gtfsdataset.latest)
.options(joinedload(Gtfsdataset.feed))
.all()
)
for dataset in datasets:
data = {
"stable_id": dataset.feed_id,
"stable_id": dataset.feed.stable_id,
"dataset_id": dataset.stable_id,
"url": dataset.hosted_url,
"execution_id": execution_id,
Expand All @@ -274,7 +239,7 @@ def extract_bounding_box_batch(_):
if session is not None:
session.close()

# Trigger update bounding box for each dataset by publishing to Pub/Sub
# Trigger update location for each dataset by publishing to Pub/Sub
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(os.getenv("PROJECT_ID"), pubsub_topic_name)
for data in datasets_data:
Expand Down
Loading
Loading