Skip to content

Commit

Permalink
new option: compression (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
cayetanobv authored Jan 9, 2025
1 parent e4cb051 commit 6b60ac8
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VENV=env
VENV ?= env
DIST=dist
BUILD=build
BIN=$(VENV)/bin
Expand Down
25 changes: 25 additions & 0 deletions docs/source/user_guide/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,31 @@ Or, with band names:
--band_name red \
--band_name green
You can enable compression of the band data using the ``--compress`` flag. This uses gzip compression which can significantly reduce storage size:

.. code-block:: bash
carto bigquery upload \
--file_path /path/to/my/raster/file.tif \
--project my-gcp-project \
--dataset my-bigquery-dataset \
--table my-bigquery-table \
--compress
The same works for Snowflake:

.. code-block:: bash
carto snowflake upload \
--file_path /path/to/my/raster/file.tif \
--database my-snowflake-database \
--schema my-snowflake-schema \
--table my-snowflake-table \
--account my-snowflake-account \
--username my-snowflake-user \
--password my-snowflake-password \
--compress
.. seealso::
See the :ref:`cli_details` for a full list of options.

Expand Down
12 changes: 12 additions & 0 deletions docs/source/user_guide/use_with_python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,15 @@ This function returns a DataFrame with some samples from the raster table on Big
See the :ref:`api_reference` for more details.

.. _`GCP documentation`: https://cloud.google.com/docs/authentication/provide-credentials-adc#local-key

To enable compression of the band data, which can significantly reduce storage size, use the ``compress`` parameter:

.. code-block:: python
connector.upload_raster(
file_path = 'path/to/raster.tif',
fqn = 'database.schema.tablename',
compress = True # Enable gzip compression of band data
)
The compression information will be stored in the metadata of the table, and the data will be automatically decompressed when reading it back.
9 changes: 9 additions & 0 deletions raster_loader/cli/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ def bigquery(args=None):
@click.option(
"--chunk_size", help="The number of blocks to upload in each chunk.", default=10000
)
@click.option(
"--compress",
help="Compress band data using zlib.",
is_flag=True,
default=False,
)
@click.option(
"--overwrite",
help="Overwrite existing data in the table if it already exists.",
Expand Down Expand Up @@ -100,6 +106,7 @@ def upload(
band,
band_name,
chunk_size,
compress,
overwrite=False,
append=False,
cleanup_on_failure=False,
Expand Down Expand Up @@ -163,6 +170,7 @@ def upload(
click.echo("Dataset: {}".format(dataset))
click.echo("Table: {}".format(table))
click.echo("Number of Records Per BigQuery Append: {}".format(chunk_size))
click.echo("Compress: {}".format(compress))

click.echo("Uploading Raster to BigQuery")

Expand All @@ -177,6 +185,7 @@ def upload(
cleanup_on_failure=cleanup_on_failure,
exact_stats=exact_stats,
basic_stats=basic_stats,
compress=compress,
)

click.echo("Raster file uploaded to Google BigQuery")
Expand Down
9 changes: 9 additions & 0 deletions raster_loader/cli/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ def snowflake(args=None):
required=False,
is_flag=True,
)
@click.option(
"--compress",
help="Compress band data using zlib.",
is_flag=True,
default=False,
)
@catch_exception()
def upload(
account,
Expand All @@ -113,6 +119,7 @@ def upload(
band,
band_name,
chunk_size,
compress,
overwrite=False,
append=False,
cleanup_on_failure=False,
Expand Down Expand Up @@ -187,6 +194,7 @@ def upload(
click.echo("Schema: {}".format(schema))
click.echo("Table: {}".format(table))
click.echo("Number of Records Per Snowflake Append: {}".format(chunk_size))
click.echo("Compress: {}".format(compress))

click.echo("Uploading Raster to Snowflake")

Expand All @@ -201,6 +209,7 @@ def upload(
cleanup_on_failure=cleanup_on_failure,
exact_stats=exact_stats,
basic_stats=basic_stats,
compress=compress,
)

click.echo("Raster file uploaded to Snowflake")
Expand Down
13 changes: 11 additions & 2 deletions raster_loader/io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def upload_raster(
cleanup_on_failure: bool = False,
exact_stats: bool = False,
basic_stats: bool = False,
compress: bool = False,
):
"""Write a raster file to a BigQuery table."""
print("Loading raster file to BigQuery...")
Expand All @@ -131,19 +132,26 @@ def upload_raster(
exit()

metadata = rasterio_metadata(
file_path, bands_info, self.band_rename_function, exact_stats, basic_stats
file_path,
bands_info,
self.band_rename_function,
exact_stats,
basic_stats,
compress=compress,
)

overviews_records_gen = rasterio_overview_to_records(
file_path,
self.band_rename_function,
bands_info
bands_info,
compress=compress,
)

windows_records_gen = rasterio_windows_to_records(
file_path,
self.band_rename_function,
bands_info,
compress=compress,
)
records_gen = chain(overviews_records_gen, windows_records_gen)

Expand Down Expand Up @@ -244,6 +252,7 @@ def done_callback(job):
self.delete_table(fqn)

import traceback

print(traceback.print_exc())
raise IOError("Error uploading to BigQuery: {}".format(e))

Expand Down
29 changes: 28 additions & 1 deletion raster_loader/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pyproj
import shapely
import sys
import zlib

from raster_loader._version import __version__
from collections import Counter
Expand Down Expand Up @@ -96,6 +97,21 @@ def get_default_nodata_value(dtype: str) -> float:
raise ValueError(f"Unsupported data type: {dtype}")


# For Python <=3.10 compatibility (handling wbits parameter)
# TODO: Remove this once we drop support for Python < 3.11
if sys.version_info < (3, 11):

def compress_bytes(arr_bytes):
compressed = zlib.compress(arr_bytes, level=6)
# Add gzip header corresponding to wbits=31
return b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03" + compressed

else:

def compress_bytes(arr_bytes):
return zlib.compress(arr_bytes, level=6, wbits=31)


def array_to_record(
arr: np.ndarray,
value_field: str,
Expand All @@ -105,6 +121,7 @@ def array_to_record(
resolution: int,
window: rasterio.windows.Window,
no_data_value: float = None,
compress: bool = False,
) -> dict:
row_off = window.row_off
col_off = window.col_off
Expand All @@ -126,6 +143,9 @@ def array_to_record(
else:
arr_bytes = np.ascontiguousarray(arr).tobytes()

# Apply compression if requested
arr_bytes = compress_bytes(arr_bytes) if compress else arr_bytes

record = {
band_rename_function("block"): block,
band_rename_function("metadata"): None,
Expand Down Expand Up @@ -185,13 +205,13 @@ def get_color_table(raster_dataset: rasterio.io.DatasetReader, band: int):
return None



def rasterio_metadata(
file_path: str,
bands_info: List[Tuple[int, str]],
band_rename_function: Callable,
exact_stats: bool = False,
basic_stats: bool = False,
compress: bool = False,
):
"""Open a raster file with rasterio."""
raster_info = rio_cogeo.cog_info(file_path).dict()
Expand All @@ -206,6 +226,9 @@ def rasterio_metadata(
width = raster_info["Profile"]["Width"]
height = raster_info["Profile"]["Height"]

# Add compression info to metadata
metadata["compression"] = "gzip" if compress else None

with rasterio.open(file_path) as raster_dataset:
raster_crs = raster_dataset.crs.to_string()

Expand Down Expand Up @@ -677,6 +700,7 @@ def rasterio_overview_to_records(
file_path: str,
band_rename_function: Callable,
bands_info: List[Tuple[int, str]],
compress: bool = False,
) -> Iterable:
raster_info = rio_cogeo.cog_info(file_path).dict()
with rasterio.open(file_path) as raster_dataset:
Expand Down Expand Up @@ -783,6 +807,7 @@ def rasterio_overview_to_records(
resolution - overview_index - 1,
tile_window,
no_data_value,
compress=compress,
)
if newrecord:
record.update(newrecord)
Expand All @@ -795,6 +820,7 @@ def rasterio_windows_to_records(
file_path: str,
band_rename_function: Callable,
bands_info: List[Tuple[int, str]],
compress: bool = False,
) -> Iterable:
invalid_names = [
name for _, name in bands_info if name and name.lower() in ["block", "metadata"]
Expand Down Expand Up @@ -841,6 +867,7 @@ def rasterio_windows_to_records(
resolution,
window,
no_data_value,
compress=compress,
)

# add the new columns generated by array_t
Expand Down
3 changes: 3 additions & 0 deletions raster_loader/io/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def upload_raster(
cleanup_on_failure: bool = False,
exact_stats: bool = False,
basic_stats: bool = False,
compress: bool = False,
) -> bool:
def band_rename_function(x):
return x.upper()
Expand Down Expand Up @@ -214,11 +215,13 @@ def band_rename_function(x):
file_path,
band_rename_function,
bands_info,
compress=compress,
)
windows_records_gen = rasterio_windows_to_records(
file_path,
band_rename_function,
bands_info,
compress=compress,
)

records_gen = chain(overviews_records_gen, windows_records_gen)
Expand Down
58 changes: 58 additions & 0 deletions raster_loader/tests/bigquery/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,61 @@ def test_get_labels(*args, **kwargs):
}
for version, expected_labels in cases.items():
assert connector.get_labels(version) == expected_labels


@patch(
"raster_loader.io.bigquery.BigQueryConnection.check_if_table_exists",
return_value=True,
)
@patch("raster_loader.io.bigquery.BigQueryConnection.delete_table", return_value=None)
@patch(
"raster_loader.io.bigquery.BigQueryConnection.check_if_table_is_empty",
return_value=False,
)
@patch("raster_loader.io.bigquery.ask_yes_no_question", return_value=True)
@patch("raster_loader.io.bigquery.BigQueryConnection.delete_table", return_value=None)
@patch("raster_loader.io.bigquery.BigQueryConnection.write_metadata", return_value=None)
@patch("raster_loader.io.bigquery.BigQueryConnection.update_labels", return_value=None)
@patch(
"raster_loader.io.bigquery.BigQueryConnection.get_metadata",
return_value={
"bounds": [0, 0, 0, 0],
"block_resolution": 5,
"nodata": 0,
"block_width": 256,
"block_height": 256,
"compression": "gzip",
"bands": [
{
"type": "uint8",
"name": "band_1",
"colorinterp": "red",
"stats": {
"min": 0.0,
"max": 255.0,
"mean": 28.66073989868164,
"stddev": 41.5693439511935,
"count": 100000,
"sum": 2866073.989868164,
"sum_squares": 1e15,
"approximated_stats": False,
"top_values": [1, 2, 3],
"version": "0.0.3",
},
"nodata": "0",
"colortable": None,
}
],
"num_blocks": 1,
"num_pixels": 1,
},
)
def test_rasterio_to_bigquery_with_compression(*args, **kwargs):
table_name = "test_mosaic_compressed"
connector = mocks.MockBigQueryConnection()
success = connector.upload_raster(
os.path.join(fixtures_dir, "mosaic_cog.tif"),
f"{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{table_name}",
compress=True,
)
assert success
Loading

0 comments on commit 6b60ac8

Please sign in to comment.