Skip to content

Commit

Permalink
Merge pull request #6 from cal-itp/load-rt
Browse files Browse the repository at this point in the history
specify output files by feed/url and suffix
  • Loading branch information
atvaccaro authored Feb 23, 2022
2 parents 9e7fed3 + 738c743 commit a6a9d1e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 19 deletions.
34 changes: 22 additions & 12 deletions gtfs_rt_validator_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pendulum
import structlog as structlog
import typer
from calitp.config import get_bucket
from structlog import configure
from structlog.threadlocal import bind_threadlocal, clear_threadlocal, merge_threadlocal

Expand Down Expand Up @@ -238,21 +239,21 @@ def validate_gcs_bucket(
def validate_gcs_bucket_many(
project_id: str = "cal-itp-data-infra",
token: str = None, # "cloud",
param_csv: str = f"gs://gtfs-data-test/rt-processed/calitp_validation_params/{pendulum.today().to_date_string()}.csv",
results_bucket: str = "gs://calitp-py-ci/gtfs-rt-validator-api/test-pipeline",
param_csv: str = f"{get_bucket()}/rt-processed/calitp_validation_params/{pendulum.today().to_date_string()}.csv",
results_bucket: str = f"{get_bucket()}/rt-processed/validation/{pendulum.today().to_date_string()}",
verbose: bool = True,
aggregate_counts: bool = True,
status_result_path: str = "gs://calitp-py-ci/gtfs-rt-validator-api/test-pipeline/status.json",
summary_path: str = f"{get_bucket()}/rt-processed/validation/{pendulum.today().to_date_string()}/summary.json",
strict: bool = False,
result_name_prefix: str = "result_",
result_name_prefix: str = "validation_results",
threads: int = 1,
limit: int = None,
):
"""Validate many gcs buckets using a parameter file.
Additional Arguments:
strict: whether to raise an error when a validation fails
status_result_path: directory for saving the status of validations
summary_path: directory for saving the status of validations
result_name_prefix: a name to prefix to each result file name. File names
will be numbered. E.g. result_0.parquet, result_1.parquet for two feeds.
Expand All @@ -268,7 +269,13 @@ def validate_gcs_bucket_many(

import gcsfs

required_cols = ["gtfs_schedule_path", "gtfs_rt_glob_path"]
required_cols = [
"calitp_itp_id",
"calitp_url_number",
"gtfs_schedule_path",
"gtfs_rt_glob_path",
"output_filename",
]

logger.info(f"reading params from {param_csv}")
fs = gcsfs.GCSFileSystem(project_id, token=token)
Expand Down Expand Up @@ -300,10 +307,13 @@ def validate_gcs_bucket_many(
project_id,
token,
verbose=verbose,
results_bucket=results_bucket + f"/{result_name_prefix}{idx}.parquet",
# TODO: os.path.join() would be better probably
results_bucket=results_bucket
+ f"/{result_name_prefix}/{row['calitp_itp_id']}/{row['calitp_url_number']}/{row['output_filename']}.parquet",
aggregate_counts=aggregate_counts,
idx=idx,
**row[required_cols],
gtfs_schedule_path=row["gtfs_schedule_path"],
gtfs_rt_glob_path=row["gtfs_rt_glob_path"],
): row
for idx, row in params.iterrows()
}
Expand All @@ -317,18 +327,18 @@ def validate_gcs_bucket_many(
except Exception as e:
if strict:
raise e
statuses.append({**row, "is_success": False})
statuses.append({**row, "is_success": False, "exc": str(e)})
else:
statuses.append({**row, "is_success": True})

successes = sum(s["is_success"] for s in statuses)

logger.info(f"finished multiprocessing; {successes} successful of {len(statuses)}")

status_newline_json = "\n".join([json.dumps(record) for record in statuses])
summary_ndjson = "\n".join([json.dumps(record) for record in statuses])

if status_result_path:
fs.pipe(status_result_path, status_newline_json.encode())
if summary_path:
fs.pipe(summary_path, summary_ndjson.encode())


def download_gtfs_schedule_zip(gtfs_schedule_path, dst_path, fs):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ black==19.10b0
typer==0.4.0
pendulum==2.1.2
structlog==21.5.0

calitp==0.0.8
14 changes: 8 additions & 6 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ def test_validate_gcs_bucket_many(tmp_gcs_dir):
results_bucket=tmp_gcs_dir,
verbose=True,
aggregate_counts=True,
status_result_path=tmp_gcs_dir + "/status.json",
summary_path=tmp_gcs_dir + "/status.json",
strict=True,
)

fname = f"{tmp_gcs_dir}/result_0.parquet"
fname = f"{tmp_gcs_dir}/validation_results/126/0/everything.parquet"
df = pd.read_parquet(fs.open(fname))

assert (df.calitp_itp_id == 126).all()
Expand All @@ -146,22 +147,23 @@ def test_validate_gcs_bucket_many_25(tmp_gcs_dir):
results_bucket=tmp_gcs_dir,
verbose=True,
aggregate_counts=True,
status_result_path=tmp_gcs_dir + "/status.json",
summary_path=tmp_gcs_dir + "/status.json",
threads=4,
)

fs = get_fs()
gcs_files = fs.ls(tmp_gcs_dir)
gcs_files = fs.glob(tmp_gcs_dir + "/validation_results/*/*/everything.parquet")

# check that bucket contains the 24 rollups and a status.json
assert len([x for x in gcs_files if "result" in x]) == 24
assert len([x for x in gcs_files if "everything" in x]) == 24

# check that 1 failed feed is in status
status = pd.read_json(tmp_gcs_dir + "/status.json", lines=True)
assert len(status) == 25
assert len(status[~status.is_success]) == 1

# check 1 result file
fname = f"{tmp_gcs_dir}/result_0.parquet"
fname = f"{tmp_gcs_dir}/validation_results/106/0/everything.parquet"
df = pd.read_parquet(fs.open(fname))

assert (df.calitp_itp_id == 106).all()

0 comments on commit a6a9d1e

Please sign in to comment.