Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support temporal raster for snowflake summit #141

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions raster_loader/cli/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ def upload(

# create default table name if not provided
if table is None:
table = get_default_table_name(file_path if is_local_file else urlparse(file_url).path, band)
table = get_default_table_name(
file_path if is_local_file else urlparse(file_url).path, band
)

credentials = None
if token is not None:
Expand Down Expand Up @@ -170,7 +172,12 @@ def upload(
@click.option("--dataset", help="The name of the dataset.", required=True)
@click.option("--table", help="The name of the table.", required=True)
@click.option("--limit", help="Limit number of rows returned", default=10)
@click.option("--token", help="An access token to authenticate with.", required=False, default=None)
@click.option(
"--token",
help="An access token to authenticate with.",
required=False,
default=None,
)
def describe(project, dataset, table, limit, token):
credentials = None
if token is not None:
Expand Down
48 changes: 39 additions & 9 deletions raster_loader/cli/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ def snowflake(args=None):
@click.option("--account", help="The Swnoflake account.", required=True)
@click.option("--username", help="The username.", required=False, default=None)
@click.option("--password", help="The password.", required=False, default=None)
@click.option("--token", help="An access token to authenticate with.", required=False, default=None)
@click.option(
"--token",
help="An access token to authenticate with.",
required=False,
default=None,
)
@click.option("--role", help="The role to use for the file upload.", default=None)
@click.option("--file_path", help="The path to the raster file.", required=False, default=None)
@click.option("--file_url", help="The path to the raster file.", required=False, default=None)
@click.option(
"--file_path", help="The path to the raster file.", required=False, default=None
)
@click.option(
"--file_url", help="The path to the raster file.", required=False, default=None
)
@click.option("--database", help="The name of the database.", required=True)
@click.option("--schema", help="The name of the schema.", required=True)
@click.option("--table", help="The name of the table.", default=None)
Expand Down Expand Up @@ -77,6 +86,9 @@ def snowflake(args=None):
default=False,
is_flag=True,
)
@click.option(
"--timestamp", help="The timestamp value to attach as a column.", default=None
)
@catch_exception()
def upload(
account,
Expand All @@ -95,15 +107,20 @@ def upload(
overwrite=False,
append=False,
cleanup_on_failure=False,
timestamp=None,
):
from raster_loader.io.common import (
get_number_of_blocks,
print_band_information,
get_block_dims,
)

if (token is None and (username is None or password is None)) or all(v is not None for v in [token, username, password]):
raise ValueError("Either --token or --username and --password must be provided.")
if (token is None and (username is None or password is None)) or all(
v is not None for v in [token, username, password]
):
raise ValueError(
"Either --token or --username and --password must be provided."
)

if file_path is None and file_url is None:
raise ValueError("Either --file_path or --file_url must be provided.")
Expand All @@ -126,7 +143,9 @@ def upload(

# create default table name if not provided
if table is None:
table = get_default_table_name(file_path if is_local_file else urlparse(file_url).path, band)
table = get_default_table_name(
file_path if is_local_file else urlparse(file_url).path, band
)

connector = SnowflakeConnection(
username=username,
Expand Down Expand Up @@ -158,6 +177,7 @@ def upload(
click.echo("Schema: {}".format(schema))
click.echo("Table: {}".format(table))
click.echo("Number of Records Per Snowflake Append: {}".format(chunk_size))
click.echo("Timestamp: {}".format(timestamp))

click.echo("Uploading Raster to Snowflake")

Expand All @@ -170,6 +190,7 @@ def upload(
overwrite=overwrite,
append=append,
cleanup_on_failure=cleanup_on_failure,
timestamp=timestamp,
)

click.echo("Raster file uploaded to Snowflake")
Expand All @@ -180,16 +201,25 @@ def upload(
@click.option("--account", help="The Swnoflake account.", required=True)
@click.option("--username", help="The username.", required=False, default=None)
@click.option("--password", help="The password.", required=False, default=None)
@click.option("--token", help="An access token to authenticate with.", required=False, default=None)
@click.option(
"--token",
help="An access token to authenticate with.",
required=False,
default=None,
)
@click.option("--role", help="The role to use for the file upload.", default=None)
@click.option("--database", help="The name of the database.", required=True)
@click.option("--schema", help="The name of the schema.", required=True)
@click.option("--table", help="The name of the table.", required=True)
@click.option("--limit", help="Limit number of rows returned", default=10)
def describe(account, username, password, token, role, database, schema, table, limit):

if (token is None and (username is None or password is None)) or all(v is not None for v in [token, username, password]):
raise ValueError("Either --token or --username and --password must be provided.")
if (token is None and (username is None or password is None)) or all(
v is not None for v in [token, username, password]
):
raise ValueError(
"Either --token or --username and --password must be provided."
)

fqn = f"{database}.{schema}.{table}"
connector = SnowflakeConnection(
Expand Down
22 changes: 20 additions & 2 deletions raster_loader/io/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from typing import Iterable, List, Tuple

from datetime import datetime

from raster_loader.errors import (
IncompatibleRasterException,
import_error_snowflake,
Expand All @@ -28,6 +30,17 @@
else:
_has_snowflake = True


def parse_timestamp_to_int(timestamp_str):
# Parse the timestamp string into a datetime object
dt_obj = datetime.fromisoformat(timestamp_str)

# Convert the datetime object to a Unix timestamp
unix_timestamp = int(dt_obj.timestamp())

return unix_timestamp


class SnowflakeConnection(DataWarehouseConnection):
def __init__(self, username, password, account, database, schema, token, role):
if not _has_snowflake:
Expand Down Expand Up @@ -112,10 +125,14 @@ def upload_records(
records: Iterable,
fqn: str,
overwrite: bool,
timestamp: str = None,
):
records_list = []
for record in records:
del record["METADATA"]
if timestamp is not None:
# parse timestamp from date string to int
record["TIMESTAMP"] = parse_timestamp_to_int(timestamp)
records_list.append(record)

data_df = pd.DataFrame(records_list)
Expand Down Expand Up @@ -174,6 +191,7 @@ def upload_raster(
overwrite: bool = False,
append: bool = False,
cleanup_on_failure: bool = False,
timestamp: str = None,
) -> bool:
print("Loading raster file to Snowflake...")

Expand Down Expand Up @@ -208,7 +226,7 @@ def upload_raster(
total_blocks = get_number_of_blocks(file_path)

if chunk_size is None:
ret = self.upload_records(records_gen, fqn, overwrite)
ret = self.upload_records(records_gen, fqn, overwrite, timestamp)
if not ret:
raise IOError("Error uploading to Snowflake.")
else:
Expand All @@ -221,7 +239,7 @@ def upload_raster(
isFirstBatch = True
for records in batched(records_gen, chunk_size):
ret = self.upload_records(
records, fqn, overwrite and isFirstBatch
records, fqn, overwrite and isFirstBatch, timestamp
)
pbar.update(chunk_size)
if not ret:
Expand Down
1 change: 1 addition & 0 deletions raster_loader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def batched(iterable, n):
while batch := tuple(islice(it, n)): # noqa
yield batch


def get_default_table_name(base_path: str, band):
table = os.path.basename(base_path).split(".")[0]
table = "_".join([table, "band", str(band), str(uuid.uuid4())])
Expand Down
80 changes: 80 additions & 0 deletions temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# INPUT_PATH='./temporal_raster/data/climate_data_Weather_source_era5_precip_201401010000_201501010000.nc'
INPUT_PATH='./temporal_raster/data/climate_data_Weather_source_era5_precip_201501010000_201601010000.nc'
BQ_UPLOAD=0
SF_UPLOAD=1
# cdo remapbil,global_0.25 $INPUT_PATH "${INPUT_PATH/.nc/}_regridded.nc"
# INPUT_PATH="${INPUT_PATH/.nc/}_regridded.nc"

timestamp=1
for idate in $(cdo showtimestamp $INPUT_PATH)
do
echo 'Processing idate: ' $idate
ymd="${idate:0:10}" # Extract year, month, and day
d="${idate:8:2}" # Extract day
hour="${idate:11:2}" # Extract hour
gdal_translate -ot Float64 NETCDF:$INPUT_PATH:precipitation_in -b $timestamp "${INPUT_PATH/.nc/}_${d}_${hour}.tif"
((timestamp++))

TIF_PATH="${INPUT_PATH/.nc/}_${d}_${hour}.tif"
filename=$(basename "$TIF_PATH")
echo $filename
gdalwarp -s_srs EPSG:4326 -t_srs EPSG:3857 $TIF_PATH "${TIF_PATH/.tif/_webmercator.tif}"
rm $TIF_PATH

WEBMERCATOR_PATH="${TIF_PATH/.tif/_webmercator.tif}"
OUTPUT_PATH="${WEBMERCATOR_PATH/_webmercator.tif/_quadbin.tif}"
gdalwarp "$WEBMERCATOR_PATH" \
-of COG \
-co TILING_SCHEME=GoogleMapsCompatible \
-co COMPRESS=DEFLATE -co OVERVIEWS=NONE -co ADD_ALPHA=NO -co RESAMPLING=NEAREST "$OUTPUT_PATH"
rm $WEBMERCATOR_PATH

# TABLE="${filename/.tif/_quadbin}"


# Get the number of bands in the GeoTIFF file
N_BANDS=$(gdalinfo "$OUTPUT_PATH" | grep "Band " | wc -l)

if [ $BQ_UPLOAD -eq 1 ]; then
# GCP_PROJECT="cartodb-data-engineering-team" GCP_DATASET="vdelacruz_carto" GCP_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh

COMMAND="echo \"yes\" | carto bigquery upload"
for ((band=1; band<=$N_BANDS; band++)); do
COMMAND+=" --band $band"
done
COMMAND+=" --file_path \"$OUTPUT_PATH\""
COMMAND+=" --project \"$GCP_PROJECT\""
COMMAND+=" --dataset \"$GCP_DATASET\""
COMMAND+=" --table \"$GCP_TABLE\""
COMMAND+=" --append"

eval "$COMMAND"
fi

if [ $SF_UPLOAD -eq 1 ]; then

# SF_DATABASE="CARTO_DATA_ENGINEERING_TEAM" SF_SCHEMA="vdelacruz_carto" SF_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000" SF_ACCOUNT="sxa81489.us-east-1" SF_USERNAME="SUPERUSER_DATA_ENG_TEAM" SF_PASSWORD="XXXXX" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time.sh

COMMAND="echo \"yes\" | carto snowflake upload"
# for ((band=1; band<=$N_BANDS; band++)); do
# COMMAND+=" --band $band"
# done
COMMAND+=" --band 1 --band_name precipitation"
COMMAND+=" --file_path \"$OUTPUT_PATH\""
COMMAND+=" --database \"$SF_DATABASE\""
COMMAND+=" --schema \"$SF_SCHEMA\""
COMMAND+=" --table \"$SF_TABLE\""
COMMAND+=" --account \"$SF_ACCOUNT\""
COMMAND+=" --username \"$SF_USERNAME\""
COMMAND+=" --password \"$SF_PASSWORD\""
COMMAND+=" --append"
COMMAND+=" --timestamp $idate"

eval "$COMMAND"
fi
rm $OUTPUT_PATH
# if timestamp > 10 break
# if [ $timestamp -gt 0 ]; then
# break # Remove this line to process all the files
# fi
done
82 changes: 82 additions & 0 deletions temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
INPUT_PATH='./temporal_raster/data2/climate_data_Weather_source_era5_precip_201401010000_201501010000.nc'
# INPUT_PATH='./temporal_raster/data/climate_data_Weather_source_era5_precip_201501010000_201601010000.nc'
BQ_UPLOAD=0
SF_UPLOAD=1
# cdo remapbil,global_0.25 $INPUT_PATH "${INPUT_PATH/.nc/}_regridded.nc"
# INPUT_PATH="${INPUT_PATH/.nc/}_regridded.nc"

timestamp=1
for idate in $(cdo showtimestamp $INPUT_PATH)
do
echo 'Processing idate: ' $idate
year="${idate:0:4}" # Extract year
month="${idate:5:2}" # Extract month
d="${idate:8:2}" # Extract day
hour="${idate:11:2}" # Extract hour

TIF_PATH="${INPUT_PATH/.nc/}_${year}_${month}_${d}_${hour}.tif"
WEBMERCATOR_PATH="${TIF_PATH/.tif/_webmercator.tif}"
OUTPUT_PATH="${WEBMERCATOR_PATH/_webmercator.tif/_quadbin.tif}"

gdal_translate -ot Float64 NETCDF:$INPUT_PATH:precipitation_in -b $timestamp "${TIF_PATH}"
((timestamp++))

filename=$(basename "$TIF_PATH")
echo $filename
gdalwarp -s_srs EPSG:4326 -t_srs EPSG:3857 $TIF_PATH "${WEBMERCATOR_PATH}"
rm $TIF_PATH

gdalwarp "$WEBMERCATOR_PATH" \
-of COG \
-co TILING_SCHEME=GoogleMapsCompatible \
-co COMPRESS=DEFLATE -co OVERVIEWS=NONE -co ADD_ALPHA=NO -co RESAMPLING=NEAREST "$OUTPUT_PATH"
rm $WEBMERCATOR_PATH

# TABLE="${filename/.tif/_quadbin}"


# Get the number of bands in the GeoTIFF file
N_BANDS=$(gdalinfo "$OUTPUT_PATH" | grep "Band " | wc -l)

if [ $BQ_UPLOAD -eq 1 ]; then
# GCP_PROJECT="cartodb-data-engineering-team" GCP_DATASET="vdelacruz_carto" GCP_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000_24" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh

COMMAND="echo \"yes\" | carto bigquery upload"
for ((band=1; band<=$N_BANDS; band++)); do
COMMAND+=" --band $band"
done
COMMAND+=" --file_path \"$OUTPUT_PATH\""
COMMAND+=" --project \"$GCP_PROJECT\""
COMMAND+=" --dataset \"$GCP_DATASET\""
COMMAND+=" --table \"$GCP_TABLE\""
COMMAND+=" --append"

eval "$COMMAND"
fi

if [ $SF_UPLOAD -eq 1 ]; then

# SF_DATABASE="CARTO_DATA_ENGINEERING_TEAM" SF_SCHEMA="vdelacruz_carto" SF_TABLE="climate_data_weather_source_era5_precip_201401010000_201601010000_24" SF_ACCOUNT="sxa81489.us-east-1" SF_USERNAME="SUPERUSER_DATA_ENG_TEAM" SF_PASSWORD="XXXX" . ./temporal_raster/entrypoint_WS_loop_netcdf_w_time_1day.sh

COMMAND="echo \"yes\" | carto snowflake upload"
# for ((band=1; band<=$N_BANDS; band++)); do
# COMMAND+=" --band $band"
# done
COMMAND+=" --band 1 --band_name precipitation"
COMMAND+=" --file_path \"$OUTPUT_PATH\""
COMMAND+=" --database \"$SF_DATABASE\""
COMMAND+=" --schema \"$SF_SCHEMA\""
COMMAND+=" --table \"$SF_TABLE\""
COMMAND+=" --account \"$SF_ACCOUNT\""
COMMAND+=" --username \"$SF_USERNAME\""
COMMAND+=" --password \"$SF_PASSWORD\""
COMMAND+=" --append"
COMMAND+=" --timestamp $idate"

eval "$COMMAND"
fi
rm $OUTPUT_PATH
if [ $timestamp -gt 24 ]; then
break # Remove this line to process all the files
fi
done
Loading