Skip to content

Commit

Permalink
Bug/sc 425841/internal team raster loader killed when using (#153)
Browse files Browse the repository at this point in the history
Co-authored-by: cayetanobv <[email protected]>
  • Loading branch information
rantolin and cayetanobv authored Oct 31, 2024
1 parent 18aed4d commit 167c3d6
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 143 deletions.
16 changes: 16 additions & 0 deletions raster_loader/cli/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ def bigquery(args=None):
default=False,
is_flag=True,
)
@click.option(
"--exact_stats",
help="Compute exact statistics for the raster bands.",
default=False,
is_flag=True,
)
@click.option(
"--all_stats",
help="Compute all statistics including quantiles and most frequent values.",
required=False,
is_flag=True,
)
@catch_exception()
def upload(
file_path,
Expand All @@ -91,6 +103,8 @@ def upload(
overwrite=False,
append=False,
cleanup_on_failure=False,
exact_stats=False,
all_stats=False,
):
from raster_loader.io.common import (
get_number_of_blocks,
Expand Down Expand Up @@ -161,6 +175,8 @@ def upload(
overwrite=overwrite,
append=append,
cleanup_on_failure=cleanup_on_failure,
exact_stats=exact_stats,
all_stats=all_stats,
)

click.echo("Raster file uploaded to Google BigQuery")
Expand Down
16 changes: 16 additions & 0 deletions raster_loader/cli/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ def snowflake(args=None):
default=False,
is_flag=True,
)
@click.option(
"--exact_stats",
help="Compute exact statistics for the raster bands.",
default=False,
is_flag=True,
)
@click.option(
"--all_stats",
help="Compute all statistics including quantiles and most frequent values.",
required=False,
is_flag=True,
)
@catch_exception()
def upload(
account,
Expand All @@ -104,6 +116,8 @@ def upload(
overwrite=False,
append=False,
cleanup_on_failure=False,
exact_stats=False,
all_stats=False,
):
from raster_loader.io.common import (
get_number_of_blocks,
Expand Down Expand Up @@ -185,6 +199,8 @@ def upload(
overwrite=overwrite,
append=append,
cleanup_on_failure=cleanup_on_failure,
exact_stats=exact_stats,
all_stats=all_stats,
)

click.echo("Raster file uploaded to Snowflake")
Expand Down
39 changes: 32 additions & 7 deletions raster_loader/io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import rasterio
import re

from itertools import chain
from raster_loader import __version__
from raster_loader.errors import import_error_bigquery, IncompatibleRasterException
from raster_loader.utils import ask_yes_no_question, batched
from raster_loader.io.common import (
check_metadata_is_compatible,
get_number_of_blocks,
get_number_of_overviews_blocks,
rasterio_metadata,
rasterio_overview_to_records,
rasterio_windows_to_records,
get_number_of_blocks,
check_metadata_is_compatible,
update_metadata,
)

Expand Down Expand Up @@ -104,6 +107,8 @@ def upload_raster(
overwrite: bool = False,
append: bool = False,
cleanup_on_failure: bool = False,
exact_stats: bool = False,
all_stats: bool = False,
):
"""Write a raster file to a BigQuery table."""
print("Loading raster file to BigQuery...")
Expand All @@ -126,21 +131,31 @@ def upload_raster(
exit()

metadata = rasterio_metadata(
file_path, bands_info, self.band_rename_function
file_path, bands_info, self.band_rename_function, exact_stats, all_stats
)

records_gen = rasterio_windows_to_records(
overviews_records_gen = rasterio_overview_to_records(
file_path,
self.band_rename_function,
bands_info
)

windows_records_gen = rasterio_windows_to_records(
file_path,
self.band_rename_function,
bands_info,
)
records_gen = chain(overviews_records_gen, windows_records_gen)

if append_records:
old_metadata = self.get_metadata(fqn)
check_metadata_is_compatible(metadata, old_metadata)
update_metadata(metadata, old_metadata)

total_blocks = get_number_of_blocks(file_path)
number_of_blocks = get_number_of_blocks(file_path)
number_of_overview_tiles = get_number_of_overviews_blocks(file_path)
total_blocks = number_of_blocks + number_of_overview_tiles

if chunk_size is None:
job = self.upload_records(records_gen, fqn)
# raise error if job went wrong (blocking call)
Expand All @@ -150,7 +165,10 @@ def upload_raster(

jobs = []
errors = []
print(f"Writing {total_blocks} blocks to BigQuery...")
print(
f"Writing {number_of_blocks} blocks and {number_of_overview_tiles} "
"overview tiles to BigQuery..."
)
with tqdm(total=total_blocks) as pbar:
if total_blocks < chunk_size:
chunk_size = total_blocks
Expand All @@ -167,9 +185,11 @@ def done_callback(job):
# job already removed because failed
pass

processed_blocks = 0
for records in batched(records_gen, chunk_size):
job = self.upload_records(records, fqn)
job.num_records = len(records)
processed_blocks += len(records)

job.add_done_callback(partial(lambda job: done_callback(job)))
jobs.append(job)
Expand All @@ -185,7 +205,10 @@ def done_callback(job):
if len(errors):
raise Exception(errors)

pbar.update(1)
empty_blocks = total_blocks - processed_blocks
pbar.update(empty_blocks)

print("Number of empty blocks: ", empty_blocks)

print("Writing metadata to BigQuery...")
self.write_metadata(metadata, append_records, fqn)
Expand Down Expand Up @@ -220,6 +243,8 @@ def done_callback(job):
if delete:
self.delete_table(fqn)

import traceback
print(traceback.print_exc())
raise IOError("Error uploading to BigQuery: {}".format(e))

print("Done.")
Expand Down
Loading

0 comments on commit 167c3d6

Please sign in to comment.