diff --git a/.env.sample b/.env.sample deleted file mode 100644 index 5673beb..0000000 --- a/.env.sample +++ /dev/null @@ -1,43 +0,0 @@ -FLASK_APP=isimip_files_api.app - -# FLASK_ENV=production -# FLASK_ENV=development - -# LOG_LEVEL=ERROR -# LOG_LEVEL=DEBUG - -# LOG_FILE=log -# LOG_FILE=/var/log/isimip-cutout - -# BASE_URL=http://127.0.0.1:5000 -# OUTPUT_URL=http://127.0.0.1/api/output/ - -# INPUT_PATH=/path/to/the/isimip/data -# OUTPUT_PATH=/path/to/the/output/directory -# OUTPUT_PREFIX=isimip-download- - -# CDO_BIN=/usr/bin/cdo -# NCKS_BIN=/usr/bin/ncks - -# COUNTRYMASKS_FILE_PATH=/path/to/countrymasks.nc -# LANDSEAMASK_FILE_PATH=/path/to/landseamask.nc - -# CORS=False -# GLOBAL=_global_ -# MAX_FILES=32 - -# WORKER_TIMEOUT=180 -# WORKER_LOG_FILE=worker.log -# WORKER_LOG_LEVEL=DEBUG -# WORKER_TTL=86400 -# WORKER_FAILURE_TTL=86400 -# WORKER_RESULT_TTL=604800 - -# gunicorn configuration -# GUNICORN_BIN=/path/to/api/env/bin/gunicorn -# GUNICORN_WORKER=3 -# GUNICORN_PORT=9002 -# GUNICORN_TIMEOUT=120 -# GUNICORN_PID_FILE=/run/gunicorn/api/pid -# GUNICORN_ACCESS_LOG_FILE=/var/log/gunicorn/api/access.log -# GUNICORN_ERROR_LOG_FILE=/var/log/gunicorn/api/error.log diff --git a/.gitignore b/.gitignore index 6fb7823..cad719a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,8 +9,21 @@ __pycache__/ /build /dist +/volumes + +/log +/input +/tmp +/output +/testing/output/ + +/*.log +/*.toml +!/pyproject.toml + /*.egg-info /env + +/.coverage + /.env -/log -/*.log diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5ed7ddf..4c8271f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,9 +7,8 @@ repos: hooks: - id: check-ast - id: end-of-file-fixer - exclude: \.json$ + exclude: \.json$|.prj$ - id: trailing-whitespace - exclude: fldmean\.csv$ - id: debug-statements - repo: https://github.com/charliermarsh/ruff-pre-commit rev: v0.0.284 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..32a6ced --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-slim-bookworm + +RUN apt-get update -y && \ + apt-get upgrade -y && \ + apt-get install -y build-essential cdo nco + +WORKDIR /api + +COPY . . + +RUN pip3 install . + +CMD ["gunicorn", "-b", "0.0.0.0:5000", "isimip_files_api.app:create_app()"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f4dad06 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +# set environment variables for `flask run` and `rq worker` for development +# in production, the variables should be set in systemd or docker files +export FLASK_APP=isimip_files_api.app +export FLASK_ENV=development +export FLASK_DEBUG=true +export FLASK_CONFIG=config.toml +export RQ_WORKER_CLASS=isimip_files_api.worker.Worker + +server: + flask run + +worker: + rq worker + +burst: + rq worker --burst + +.PHONY: server worker diff --git a/README.md b/README.md index 8b6e65d..6a01618 100644 --- a/README.md +++ b/README.md @@ -1,77 +1,55 @@ -isimip-files-api +ISIMIP Files API ================ -A webservice to asynchronously mask regions from NetCDF files, using [Flask](https://palletsprojects.com/p/flask/) and [RQ](https://python-rq.org/). +A webservice to asynchronously perform operations on NetCDF files before downloading them, using [Flask](https://palletsprojects.com/p/flask/) and [RQ](https://python-rq.org/). -Setup ------ - -The service needs [redis](https://redis.io/) to be set up and configured properly. With redit it is especially important to [guard it agains remote access](https://redis.io/topics/security). +The service is deployed on https://files.isimip.org/api/v2 as part of the [ISIMIP Repository](https://data.isimip.org). The previous version of the API is available at https://files.isimip.org/api/v1. -The python dependencies can be installed (in a virtual environment) using: -``` -pip install -r requirements.txt -``` +Setup +----- -The the `.env` file can be created from `.env.sample` and adjusted to the particular environment. +The API makes no assumptions about the files other than that they are globally gridded NetCDF files. In particular, no ISIMIP internal conventions are used. It can therefore be reused by other archives. Setup and deployment are described in [docs/install.md](docs/setup.md). Usage ----- -Once the application is setup, the development server can be started using: - -``` -flask run -``` - -The worker for the asynchronous jobs need to be started in a different terminal session using: - -``` -rq worker -``` +The service is integrated into the [ISIMIP Repository](https://data.isimip.org) and is available from the search interface and the dataset and file pages through the "Configure downloads" link. This functionality currently uses version 1.1.1 of the API. -Asynchronous jobs are created using a HTTP `POST` request to the root api entpoint. To mask everything but a bounding box in lat/lon use: +For programmatic access, the API can be used with standard HTTP libraries (e.g. [requests](https://requests.readthedocs.io) for Python). While the following examples use the ISIMIP Repository, Python and `requests`, they should be transferable to other servers, languages or libraries. -``` -POST / -{ - "path": "path/to/file.nc", - "task": "mask_bbox", - "bbox": [south, north, west, east] -} -``` +The API is used by sending HTTP POST request to its root endpoint. The request needs to use the content type `application/json` and contain a single JSON object with a list of `paths` and a list of `operations`. While the `paths` can be obtained from the [ISIMIP Repository](https://data.isimip.org) (e.g. `ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc`), the operations are described in [docs/operations.md](docs/operations.md). -where `south`, `north`, `west`, `east` are floats and `path` is the path to the file on the server relative to `INPUT_PATH` given in `.env`. For a country use: +Using Python and `requests`, requests can be performed like this: -``` -POST / -{ - "path": "path/to/file.nc", - "task": "mask_country", - "country": "deu" -} -``` +```python +import requests -for, e. g. Germany. To mask out all sea and antarctica data use: +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [ + 'ISIMIP3b/InputData/.../gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc', + 'ISIMIP3b/InputData/.../gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2021_2030.nc', + 'ISIMIP3b/InputData/.../gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2031_2031.nc', + ... + ], + 'operations': [ + { + 'operation': 'select_point', + 'bbox': [52.380551, 13.064332] + } + ] +}) -``` -POST / -{ - "path": "path/to/file.nc", - "task": "mask_landonly" -} +result = response.json() ``` -The response is a JSON like this: +The `result` is a dictionary describing the job on the server: -``` +```json { - "file_name": "isimip-download-1eff769a7edd0a8076f11dc85609f0090562a671.zip", - "file_url": "https://files.isimip.org/api/v1/output/isimip-download-1eff769a7edd0a8076f11dc85609f0090562a671.zip", "id": "5741ca0e7f824d37ef23e107f5e5261a31e974a6", - "job_url": "http://127.0.0.1:5000/5741ca0e7f824d37ef23e107f5e5261a31e974a6", + "job_url": "https://files.isimip.org/api/v2/5741ca0e7f824d37ef23e107f5e5261a31e974a6", "meta": {}, "status": "queued", "ttl": 604800 @@ -80,188 +58,36 @@ The response is a JSON like this: Performing the initial request again, or performing a `GET` on the url given in `job_url`, will give an update on the job status, e.g. -``` +```json { - "file_name": "isimip-download-1eff769a7edd0a8076f11dc85609f0090562a671.zip", - "file_url": "https://files.isimip.org/api/v1/output/isimip-download-1eff769a7edd0a8076f11dc85609f0090562a671.zip", - "id": "5741ca0e7f824d37ef23e107f5e5261a31e974a6", - "job_url": "http://127.0.0.1:5000/5741ca0e7f824d37ef23e107f5e5261a31e974a6", - "meta": {"created_files": 1, "total_files": 1}, - "status": "finished", - "ttl": 604800 + "id": "5741ca0e7f824d37ef23e107f5e5261a31e974a6", + "job_url": "https://files.isimip.org/api/v2/5741ca0e7f824d37ef23e107f5e5261a31e974a6", + "meta": { + "created_files": 0, + "total_files": 1 + }, + "status": "started", + "ttl": 604800 } ``` -When the job is finished, the resulting file is located at `file_name` relative to the path given in `OUTPUT_PATH` in `.env`. When `OUTPUT_PATH` is made public via a web server (e.g. NGINX, see below for deployment), the file can be downloaded under the URL given by `file_url`. - -The following exaples can be used from the command line with [httpie](https://httpie.org/) or [curl](https://curl.haxx.se/): - -``` -http :5000 path=path/to/file.nc bbox=:"[0, 10, 0, 10]" -http :5000 path=path/to/file.nc country=deu -http :5000 path=path/to/file.nc landonly:=true - -curl 127.0.0.1:5000 -H "Content-Type: application/json" -d '{"path": "path/to/file.nc", "task": "mask_bbox","bbox": [south, north, west, east]}' -curl 127.0.0.1:5000 -H "Content-Type: application/json" -d '{"path": "path/to/file.nc", "task": "mask_country", "country": "deu"}' -curl 127.0.0.1:5000 -H "Content-Type: application/json" -d '{"path": "path/to/file.nc", "task": "mask_landonly"}' -``` - -Deployment ----------- - -When deploying to the internet, a setup of [NGINX](https://www.nginx.com/), (gunicorn)[https://gunicorn.org/], and [systemd](https://www.freedesktop.org/wiki/Software/systemd/) services is recommended, but other services can be used as well. We further assume that a user `isimip` with the group `isimip` and the home `/home/isimip` exists, and that the repository is cloned at `/home/isimip/api`. - -After following the steps under **Setup** (as the `isimip` user), add the folowing to `.env`: - -``` -# gunicorn configuration -GUNICORN_BIN=/home/isimip/api/env/bin/gunicorn -GUNICORN_WORKER=3 -GUNICORN_PORT=9002 -GUNICORN_TIMEOUT=120 -GUNICORN_PID_FILE=/run/gunicorn/api/pid -GUNICORN_ACCESS_LOG_FILE=/var/log/gunicorn/api/access.log -GUNICORN_ERROR_LOG_FILE=/var/log/gunicorn/api/error.log -``` - -Then, as `root`, create a file `/etc/tmpfiles.d/isimip-api.conf` with the following content: - -``` -d /var/log/isimip-api 750 isimip isimip -d /var/log/gunicorn/api 750 isimip isimip -d /run/gunicorn/api 750 isimip isimip -``` - -Create temporary directories using: - -``` -systemd-tmpfiles --create -``` +When the job is completed on the server the status becomes `finished` and the result contains a `file_name` and a `file_url`. -In order to run the api service with systemd three scripts need to be added to `/etc/systemd/system` - -``` -# in /etc/systemd/system/isimip-files-api.service - -[Unit] -Description=pseudo-service to start/stop all isimip-files-api services - -[Service] -Type=oneshot -ExecStart=/bin/true -RemainAfterExit=yes - -[Install] -WantedBy=network.target -``` - -``` -# in /etc/systemd/system/isimip-files-api-app.service - -[Unit] -Description=isimip-api gunicorn daemon -PartOf=isimip-files-api.service -After=isimip-files-api.service - -[Service] -User=isimip -Group=isimip - -WorkingDirectory=/home/isimip/api -EnvironmentFile=/home/isimip/api/.env - -ExecStart=/bin/sh -c '${GUNICORN_BIN} \ - --workers ${GUNICORN_WORKER} \ - --pid ${GUNICORN_PID_FILE} \ - --bind localhost:${GUNICORN_PORT} \ - --timeout ${GUNICORN_TIMEOUT} \ - --access-logfile ${GUNICORN_ACCESS_LOG_FILE} \ - --error-logfile ${GUNICORN_ERROR_LOG_FILE} \ - "isimip_files_api:app:create_app()"' - -ExecReload=/bin/sh -c '/usr/bin/pkill -HUP -F ${GUNICORN_PID_FILE}' - -ExecStop=/bin/sh -c '/usr/bin/pkill -TERM -F ${GUNICORN_PID_FILE}' - -[Install] -WantedBy=isimip-api.target -``` - -``` -# in /etc/systemd/system/isimip-files-api-worker.service - -[Unit] -Description=RQ worker for isimip-api -PartOf=isimip-files-api.service -After=isimip-files-api.service - -[Service] -User=isimip -Group=isimip - -WorkingDirectory=/home/isimip/api -Environment=LANG=en_US.UTF-8 -Environment=LC_ALL=en_US.UTF-8 -Environment=LC_LANG=en_US.UTF-8 - -ExecStart=/home/isimip/api/env/bin/rq worker -w 'isimip_files_api.worker.LogWorker' - -ExecReload=/bin/kill -s HUP $MAINPID - -ExecStop=/bin/kill -s TERM $MAINPID - -PrivateTmp=true -Restart=always - -[Install] -WantedBy=isimip-api.target -``` - -Reload `systemd`, start and enable the service: - -``` -systemctl daemon-reload -systemctl start isimip-api-app -systemctl start isimip-api-worker - -systemctl enable isimip-api-app -systemctl enable isimip-api-worker -systemctl enable isimip-api -``` - -From now on, the services can be controlled using: - -``` -systemctl start isimip-api -systemctl stop isimip-api -systemctl restart isimip-api -``` - -If the services won't start: `journalctl -xf` might give a clue why. - -Lastly, add - -``` - location /api/v1 { - proxy_pass http://127.0.0.1:9002/; - proxy_redirect off; - - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - } - - location /api/public { - alias /data/api/public; - } +```json +{ + "file_name": "download-5741ca0e7f824d37ef23e107f5e5261a31e974a6.zip", + "file_url": "https://files.isimip.org/api/v2/output/download-5741ca0e7f824d37ef23e107f5e5261a31e974a6.zip", + "id": "5741ca0e7f824d37ef23e107f5e5261a31e974a6", + "job_url": "https://files.isimip.org/api/v2/5741ca0e7f824d37ef23e107f5e5261a31e974a6", + "meta": { + "created_files": 1, + "total_files": 1 + }, + "status": "finished", + "ttl": 604800 +} ``` -to your NGINX virtual host configuration. The service should then be available at https://yourdomain/api/v1/. - -The created files can be automatically deleted using the included `isimip-files-api-clean` script. To do so, add the following to the crontab of the `isimip` user (by using `crontab -e`): +The file can be downloaded under the URL given by `file_url` (if the output directory of the API is made public via a web server). -``` -# clean files everyday at 5 a.m. -0 5 * * * cd /home/isimip/api; /home/isimip/api/env/bin/isimip-files-api-clean -``` +Please also note the examples given in the [examples](examples) directory. diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..e619b55 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,40 @@ +version: "3.7" + +services: + api: + build: . + depends_on: + - redis + volumes: + - ${DOCKER_INPUT_PATH}:/input + - ${DOCKER_OUTPUT_PATH}:/output + ports: + - "${DOCKER_API_PORT}:5000" + environment: + FLASK_REDIS_URL: redis://redis:6379 + FLASK_INPUT_PATH: /input + FLASK_OUTPUT_PATH: /output + env_file: .env + + worker: + build: . + command: rq worker + depends_on: + - redis + volumes: + - ${DOCKER_INPUT_PATH}:/input + - ${DOCKER_OUTPUT_PATH}:/output + environment: + FLASK_INPUT_PATH: /input + FLASK_OUTPUT_PATH: /output + RQ_REDIS_URL: redis://redis:6379 + RQ_WORKER_CLASS: isimip_files_api.worker.Worker + env_file: .env + + redis: + image: redis + command: redis-server --appendonly yes + restart: always + volumes: + - ${DOCKER_REDIS_PATH}:/data + env_file: .env diff --git a/docs/operations.md b/docs/operations.md new file mode 100644 index 0000000..67f514c --- /dev/null +++ b/docs/operations.md @@ -0,0 +1,288 @@ +Operations +========== + +The following operations are available. Please note that some of the operations can be chained, e.g. + +```python +data = { + 'paths': [...], + 'operations': [ + { + 'operation': 'create_mask', + ... + }, + { + 'operation': 'mask_mask', + ... + }, + { + 'operation': 'compute_mean' + }, + { + 'operation': 'output_csv' + } + ] +} +``` + +Please also note the examples given in the [examples](../examples) directory. + +### Select point + +A time series of a point can be selected using: + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'select_point', + 'point': [52.380551, 13.064332] + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -sellonlatbox,WEST,EAST,SOUTH,NORTH IFILE OFILE +``` + +### Select bounding box + +A rectangular bounding box can be selected using: + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'select_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -selindexbox,IX,IX,IY,IY IFILE OFILE +``` + +where `IX` and `IY` are the grid indexes of the point computed from the file. + +### Mask bounding box + +A rectangular bounding box can be masked (everything outside is set to `missing_value`) using: + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'mask_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -sellonlatbox,WEST,EAST,SOUTH,NORTH IFILE OFILE +``` + +### Mask country + +A country can be masked (i.e. everything outside is set to `missing_value`) using: + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'mask_country', + 'country': "bra" # e.g. Brasil + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -ifthen -selname,m_BRA COUNTRYMASK IFILE OFILE +``` + +### Mask land only + +The landmass (without antarctica) can be masked (i.e. the ocean is set to `missing_value`) using: + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'mask_landonly' + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -ifthen LANDSEAMASK IFILE OFILE +``` + +### Mask using a NetCDF mask + +In order to mask using a custom NetCDF file, the file needs to be uploaded together with the JSON. This is done using the content type `multipart/form-data`. Using `requests` this is done slightly different as before: + +```python +import json +from pathlib import Path + +import requests + +mask_path = Path('path/to/mask.nc') + +data = { + 'paths': [...], + 'operations': [ + { + 'operation': 'mask_mask', + 'mask': 'mask.nc', + 'var': 'm_VAR' # the mask variable in the NetCDF file + } + ] +} + +response = requests.post(url, files={ + 'data': json.dumps(data), + 'mask.nc': mask_path.read_bytes(), +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -ifthen -selname,m_VAR mask.nc IFILE OFILE +``` + +### Compute mean + +After one of the [CDO](https://code.mpimet.mpg.de/projects/cdo) based operations (e.g. `mask_country`) a field mean can be computed using: + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'mask_country', + 'country': "bra" # e.g. Brasil + }, + { + 'operation': 'compute_mean' + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -f nc4c -z zip_5 -L -fldmean -ifthen -selname,m_BRA COUNTRYMASK IFILE OFILE +``` + +### Output CSV + +After one of the other [CDO](https://code.mpimet.mpg.de/projects/cdo) based operations (e.g. `mask_country` and `compute_mean`) the output can be converted to [CSV](https://en.wikipedia.org/wiki/Comma-separated_values): + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'mask_country', + 'country': "bra" # e.g. Brasil + }, + { + 'operation': 'compute_mean' + }, + { + 'operation': 'output_csv' + } + ] +}) +``` + +The operation is performed using [CDO](https://code.mpimet.mpg.de/projects/cdo) using: + +```bash +cdo -s outputtab,date,value,nohead -fldmean -ifthen -selname,m_BRA COUNTRYMASK IFILE OFILE +``` + +Afterwards the TAB seperated CDO output is converted to CSV. + +Full examples are is given in [examples/time_series_bbox.py](../examples/time_series_bbox.py) and [examples/time_series_country.py](../examples/time_series_country.py). + +### Cutout bounding box + +Instead of using [CDO](https://code.mpimet.mpg.de/projects/cdo) to select a bounding box, the cut-out can also be performed using [ncks](https://nco.sourceforge.net/nco.html). This operation has a much better performance when applied to the high resolution data from [CHELSA-W5E5 v1.0: W5E5 v1.0 downscaled with CHELSA v2.0](https://doi.org/10.48364/ISIMIP.836809.3). + +```python +response = requests.post('https://files.isimip.org/api/v2', json={ + 'paths': [...], + 'operations': [ + { + 'operation': 'cutout_bbox', + 'bbox': [47.5520, 47.6680, 12.8719, 13.1393] + } + ] +}) +``` + +The operation is performed using [ncks](https://nco.sourceforge.net/nco.html) using: + +```bash +ncks -h -d lat,SOUTH,NORTH -d WEST,EAST IFILE OFILE +``` + +### Create mask + +Mask can be created from vector based input file, namely [Shapefiles](https://en.wikipedia.org/wiki/Shapefile) or [GeoJSON files](https://en.wikipedia.org/wiki/GeoJSON). This operation is performed once and the resulting mask can then be used in the `mask_mask` operation. As for the `mask_mask` operation, the input file needs to be uploaded together with the JSON. This is done using the content type `multipart/form-data`. For a shapefile, the different files need to be in one zip file. Using `requests` this is done like this: + +```python +import json +from pathlib import Path + +import requests + +shape_path = Path('path/to/shape.zip') + +data = { + 'paths': [...], + 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'shape.zip', + 'mask': 'shape.nc' + }, + { + 'operation': 'mask_mask', + 'mask': 'shape.nc' + } + ] +} + +response = requests.post(url, files={ + 'data': json.dumps(data), + 'shape.zip': shape_path.read_bytes(), +}) +``` + +Full examples are is given in [examples/time_series_shapefile.py](../examples/time_series_shapefile.py) and [examples/time_series_geojson.py](../examples/time_series_geojson.py). diff --git a/docs/setup.md b/docs/setup.md new file mode 100644 index 0000000..98ef050 --- /dev/null +++ b/docs/setup.md @@ -0,0 +1,181 @@ +# Setup + +The service needs [redis](https://redis.io/) to be set up and configured properly. With redit it is especially important to [guard it agains remote access](https://redis.io/topics/security). + +The Package and its dependencies can be installed (in a virtual environment) using: + +``` +pip install -e . +``` + +The service can be configured using a `config.toml` located at the root of the repository. Please refer to [isimip_files_api/config.py](../isimip_files_api/config.py) for the different settings and their default values. + +## Usage + +Once the application is setup, the development setup can be controlled using the provided `Makefile`, which some enviroment variables and wraps `flask run` +and `rq worker`. + +The development server can be started using: + +``` +make server +``` + +The worker for the asynchronous jobs need to be started in a different terminal using: + +``` +make worker +``` + +The API is then available at http://127.0.0.1:5000. + +## Deployment + +When deploying to the internet, a setup of [NGINX](https://www.nginx.com/), [gunicorn](https://gunicorn.org/), and [systemd](https://www.freedesktop.org/wiki/Software/systemd/) is recommended, but other services can be used as well. We further assume that a user `isimip` with the group `isimip` and the home `/home/isimip` exists, and that the repository is cloned at `/home/isimip/api`. + +Then, as `root`, create a file `/etc/tmpfiles.d/api.conf` with the following content: + +``` +d /var/log/gunicorn/api 750 isimip isimip +d /var/log/flask/api 750 isimip isimip +d /run/gunicorn/api 750 isimip isimip +``` + +Create temporary directories using: + +``` +systemd-tmpfiles --create +``` + +In order to run the api service with systemd three scripts need to be added to `/etc/systemd/system` + +``` +# in /etc/systemd/system/api.service + +[Unit] +Description=isimip-files-api v2 gunicorn daemon +After=network.target + +[Service] +User=isimip +Group=isimip + +WorkingDirectory=/srv/isimip/api + +Environment=FLASK_APP=isimip_files_api.app +Environment=FLASK_ENV=production +Environment=FLASK_CONFIG=config.toml +Environment=FLASK_REDIS_URL=redis://localhost:6379 + +Environment=GUNICORN_BIN=env/bin/gunicorn +Environment=GUNICORN_WORKER=3 +Environment=GUNICORN_PORT=9001 +Environment=GUNICORN_TIMEOUT=120 +Environment=GUNICORN_PID_FILE=/run/gunicorn/api-v2/pid +Environment=GUNICORN_ACCESS_LOG_FILE=/var/log/gunicorn/api-v2/access.log +Environment=GUNICORN_ERROR_LOG_FILE=/var/log/gunicorn/api-v2/error.log + +ExecStart=/bin/sh -c '${GUNICORN_BIN} \ + --workers ${GUNICORN_WORKER} \ + --pid ${GUNICORN_PID_FILE} \ + --bind localhost:${GUNICORN_PORT} \ + --timeout ${GUNICORN_TIMEOUT} \ + --access-logfile ${GUNICORN_ACCESS_LOG_FILE} \ + --error-logfile ${GUNICORN_ERROR_LOG_FILE} \ + "isimip_files_api.app:create_app()"' + +ExecReload=/bin/sh -c '/usr/bin/pkill -HUP -F ${GUNICORN_PID_FILE}' + +ExecStop=/bin/sh -c '/usr/bin/pkill -TERM -F ${GUNICORN_PID_FILE}' + +[Install] +WantedBy=multi-user.target +``` + +``` +# in /etc/systemd/system/api-worker@.service + +[Unit] +Description=RQ worker for isimip-files-api v2 (#%i) +After=network.target + +[Service] +Type=simple +User=isimip +Group=isimip + +WorkingDirectory=/srv/isimip/api + +Environment=LANG=en_US.UTF-8 +Environment=LC_ALL=en_US.UTF-8 +Environment=LC_LANG=en_US.UTF-8 + +Environment=FLASK_APP=isimip_files_api.app +Environment=FLASK_ENV=production +Environment=FLASK_CONFIG=config.toml + +Environment=RQ_BIN=env/bin/rq +Environment=RQ_WORKER_CLASS=isimip_files_api.worker.Worker +Environment=RQ_REDIS_URL=redis://localhost:6379 + +ExecStart=/bin/sh -c '${RQ_BIN} worker' +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID + +PrivateTmp=true +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +``` + +Reload `systemd`, start and enable the service: + +``` +systemctl daemon-reload +systemctl start api +systemctl start api-worker@1 +systemctl start api-worker@2 # more worked can be created + +systemctl enable api +systemctl enable api-worker@1 +systemctl enable api-worker@2 +``` + +From now on, the services can be controlled using: + +``` +systemctl start api +systemctl stop api +systemctl restart api +``` + +If the services won't start: `journalctl -xf` might give a clue why. + +Lastly, add + +``` + location /api/v2 { + proxy_pass http://127.0.0.1:9000/; + proxy_redirect off; + + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Proto $scheme; + } + + location /api/v2/public { + alias /data/api/public; + } +``` + +to your NGINX virtual host configuration. The service should then be available at https://yourdomain/api/v1/. + +The created files can be automatically deleted using the included `isimip-files-api-clean` script. To do so, add the following to the crontab of the `isimip` user (by using `crontab -e`): + +``` +# clean files everyday at 5 a.m. +0 5 * * * cd /home/isimip/api; /home/isimip/api/env/bin/isimip-files-api-clean +``` diff --git a/examples/cutout_bbox.py b/examples/cutout_bbox.py new file mode 100644 index 0000000..8f0b77e --- /dev/null +++ b/examples/cutout_bbox.py @@ -0,0 +1,50 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'cutout_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/mask_bbox.py b/examples/mask_bbox.py new file mode 100644 index 0000000..151df9a --- /dev/null +++ b/examples/mask_bbox.py @@ -0,0 +1,50 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'mask_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/mask_country.py b/examples/mask_country.py new file mode 100644 index 0000000..3f885c0 --- /dev/null +++ b/examples/mask_country.py @@ -0,0 +1,50 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'mask_country', + 'country': 'aus' + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/mask_landonly.py b/examples/mask_landonly.py new file mode 100644 index 0000000..4ad8ace --- /dev/null +++ b/examples/mask_landonly.py @@ -0,0 +1,49 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'mask_landonly' + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/time_series_bbox.py b/examples/time_series_bbox.py new file mode 100644 index 0000000..6f3aaf4 --- /dev/null +++ b/examples/time_series_bbox.py @@ -0,0 +1,56 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'select_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + }, + { + 'operation': 'compute_mean', + }, + { + 'operation': 'output_csv' + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/time_series_country.py b/examples/time_series_country.py new file mode 100644 index 0000000..1bfdde0 --- /dev/null +++ b/examples/time_series_country.py @@ -0,0 +1,56 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'mask_country', + 'country': 'bra' + }, + { + 'operation': 'compute_mean', + }, + { + 'operation': 'output_csv' + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/time_series_geojson.py b/examples/time_series_geojson.py new file mode 100644 index 0000000..42c9a05 --- /dev/null +++ b/examples/time_series_geojson.py @@ -0,0 +1,66 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000/' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +shape_path = Path('testing') / 'shapes' / 'pm.json' + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.json', + 'mask': 'pm.nc' + }, + { + 'operation': 'mask_mask', + 'mask': 'pm.nc', + }, + { + 'operation': 'compute_mean', + }, + { + 'operation': 'output_csv' + } + ] +} + +response = requests.post(url, files={ + 'data': json.dumps(data), + 'pm.json': shape_path.read_bytes(), +}) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/time_series_point.py b/examples/time_series_point.py new file mode 100644 index 0000000..21fb5a7 --- /dev/null +++ b/examples/time_series_point.py @@ -0,0 +1,56 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'select_point', + 'point': [52.380551, 13.064332] + }, + { + 'operation': 'compute_mean', + }, + { + 'operation': 'output_csv' + } + ] +} + +response = requests.post(url, json=data) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/examples/time_series_shapefile.py b/examples/time_series_shapefile.py new file mode 100644 index 0000000..29333fd --- /dev/null +++ b/examples/time_series_shapefile.py @@ -0,0 +1,66 @@ +import json +import time +import zipfile +from pathlib import Path + +import requests + +url = 'http://localhost:5000/' + +paths = [ + 'ISIMIP3b/InputData/climate/atmosphere/bias-adjusted/global/daily/ssp585/GFDL-ESM4/gfdl-esm4_r1i1p1f1_w5e5_ssp585_tas_global_daily_2015_2020.nc' +] + +shape_path = Path('testing') / 'shapes' / 'pm.zip' + +data = { + 'paths': paths, + 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.zip', + 'mask': 'pm.nc' + }, + { + 'operation': 'mask_mask', + 'mask': 'pm.nc' + }, + # { + # 'operation': 'compute_mean', + # }, + # { + # 'operation': 'output_csv' + # } + ] +} + +response = requests.post(url, files={ + 'data': json.dumps(data), + 'pm.zip': shape_path.read_bytes(), +}) + +job = response.json() +print(json.dumps(job, indent=2)) + +for i in range(100): + job = requests.get(job['job_url']).json() + print(json.dumps(job, indent=2)) + + if job['status'] in ['queued', 'started']: + time.sleep(2) + else: + break + +if job['status'] == 'finished': + # download file + zip_path = Path(job['file_name']) + with requests.get(job['file_url'], stream=True) as response: + with zip_path.open('wb') as fp: + for chunk in response.iter_content(chunk_size=8192): + fp.write(chunk) + + # extract zip file + out_path = Path(job['file_name']).with_suffix('') + out_path.mkdir(exist_ok=True) + with zipfile.ZipFile(zip_path, 'r') as zf: + zf.extractall(out_path) diff --git a/isimip_files_api/__init__.py b/isimip_files_api/__init__.py index e5ea9e1..568587b 100644 --- a/isimip_files_api/__init__.py +++ b/isimip_files_api/__init__.py @@ -1 +1 @@ -VERSION = __version__ = '1.1.0' +VERSION = __version__ = '2.0.0dev1' diff --git a/isimip_files_api/app.py b/isimip_files_api/app.py index 67df03b..341dc68 100644 --- a/isimip_files_api/app.py +++ b/isimip_files_api/app.py @@ -1,43 +1,69 @@ -import logging -from collections import defaultdict - from flask import Flask, request -from flask_cors import CORS as FlaskCORS -from .jobs import create_job, delete_job, fetch_job -from .settings import CORS, LOG_FILE, LOG_LEVEL -from .utils import get_errors_response -from .validators import validate_data, validate_datasets +import tomli +from flask_cors import CORS as FlaskCORS -logging.basicConfig(level=LOG_LEVEL, filename=LOG_FILE) +from .commands import CommandRegistry +from .jobs import count_jobs, create_job, delete_job, fetch_job +from .logging import configure_logging +from .operations import OperationRegistry +from .responses import get_errors_response +from .utils import get_config_path, handle_post_request +from .validators import validate_data, validate_operations, validate_paths, validate_uploads def create_app(): # create and configure the app app = Flask(__name__) + app.config.from_object('isimip_files_api.config') + app.config.from_prefixed_env() + + config_path = get_config_path(app.config.get('CONFIG')) + if config_path: + app.config.from_file(get_config_path(config_path), load=tomli.load, text=False) - if CORS: + # configure logging + configure_logging(app) + + # enable CORS + if app.config['CORS']: FlaskCORS(app) @app.route('/', methods=['GET']) def index(): return { - 'status': 'ok' + 'status': 'ok', + 'jobs': count_jobs(), + 'commands': list(CommandRegistry().commands.keys()), + 'operations': list(OperationRegistry().operations.keys()), }, 200 @app.route('/', methods=['POST']) def create(): - errors = defaultdict(list) + data, uploads = handle_post_request(request) + app.logger.debug('data = %s', data) + app.logger.debug('files = %s', uploads.keys()) + + # validation step 1: check data + errors = validate_data(data) + if errors: + app.logger.debug('errors = %s', errors) + return get_errors_response(errors) - cleaned_data = validate_data(request.json, errors) + # validation step 2: check paths and operations + errors = dict(**validate_paths(data), + **validate_operations(data)) if errors: + app.logger.debug('errors = %s', errors) return get_errors_response(errors) - validate_datasets(*cleaned_data, errors) + # validation step 3: check uploads + errors = validate_uploads(data, uploads) if errors: + app.logger.debug('errors = %s', errors) return get_errors_response(errors) - return create_job(*cleaned_data) + return create_job(data, uploads) @app.route('/', methods=['GET']) def detail(job_id): diff --git a/isimip_files_api/cdo.py b/isimip_files_api/cdo.py deleted file mode 100644 index 35f57ba..0000000 --- a/isimip_files_api/cdo.py +++ /dev/null @@ -1,96 +0,0 @@ -import csv -import logging -import subprocess - -from .netcdf import get_index -from .settings import CDO_BIN, COUNTRYMASKS_FILE_PATH, LANDSEAMASK_FILE_PATH -from .utils import mask_cmd - - -def mask_bbox(dataset_path, output_path, bbox): - # cdo -f nc4c -z zip_5 -masklonlatbox,WEST,EAST,SOUTH,NORTH IFILE OFILE - south, north, west, east = bbox - return cdo('-f', 'nc4c', - '-z', 'zip_5', - f'-masklonlatbox,{west:f},{east:f},{south:f},{north:f}', - str(dataset_path), - str(output_path)) - - -def mask_country(dataset_path, output_path, country): - # cdo -f nc4c -z zip_5 -ifthen -selname,m_COUNTRY COUNTRYMASK IFILE OFILE - return cdo('-f', 'nc4c', - '-z', 'zip_5', - '-ifthen', - f'-selname,m_{country.upper():3.3}', - str(COUNTRYMASKS_FILE_PATH), - str(dataset_path), - str(output_path)) - - -def mask_landonly(dataset_path, output_path): - # cdo -f nc4c -z zip_5 -ifthen LANDSEAMASK IFILE OFILE - return cdo('-f', 'nc4c', - '-z', 'zip_5', - '-ifthen', - str(LANDSEAMASK_FILE_PATH), - str(dataset_path), - str(output_path)) - - -def select_point(dataset_path, output_path, point): - # cdo -s outputtab,date,value,nohead -selindexbox,IX,IX,IY,IY IFILE - ix, iy = get_index(dataset_path, point) - - # add one since cdo is counting from 1! - ix, iy = ix + 1, iy + 1 - - return cdo('-s', - 'outputtab,date,value,nohead', - f'-selindexbox,{ix:d},{ix:d},{iy:d},{iy:d}', - str(dataset_path), - output_path=output_path) - - -def select_bbox(dataset_path, output_path, bbox): - # cdo -s outputtab,date,value,nohead -fldmean -sellonlatbox,WEST,EAST,SOUTH,NORTH IFILE - south, north, west, east = bbox - return cdo('-s', - 'outputtab,date,value,nohead', - '-fldmean', - f'-sellonlatbox,{west:f},{east:f},{south:f},{north:f}', - str(dataset_path), - output_path=output_path) - - -def select_country(dataset_path, output_path, country): - # cdo -s outputtab,date,value,nohead -fldmean -ifthen -selname,m_COUNTRY COUNTRYMASK IFILE - return cdo('-s', - 'outputtab,date,value,nohead', - '-fldmean', - '-ifthen', - f'-selname,m_{country.upper():3.3}', - str(COUNTRYMASKS_FILE_PATH), - str(dataset_path), - output_path=output_path) - - -def cdo(*args, output_path=None): - cmd_args = [CDO_BIN, *list(args)] - cmd = ' '.join(cmd_args) - env = { - 'CDI_VERSION_INFO': '0', - 'CDO_VERSION_INFO': '0', - 'CDO_HISTORY_INFO': '0' - } - - logging.debug(cmd) - output = subprocess.check_output(cmd_args, env=env) - - if output_path: - with open(output_path, 'w', newline='') as fp: - writer = csv.writer(fp, delimiter=',') - for line in output.splitlines(): - writer.writerow(line.decode().strip().split()) - - return mask_cmd(cmd) diff --git a/isimip_files_api/commands/__init__.py b/isimip_files_api/commands/__init__.py new file mode 100644 index 0000000..137a9c2 --- /dev/null +++ b/isimip_files_api/commands/__init__.py @@ -0,0 +1,48 @@ +from flask import current_app as app + +from ..utils import import_class + + +class CommandRegistry: + + def __init__(self): + self.commands = {} + for python_path in app.config['COMMANDS']: + command_class = import_class(python_path) + self.commands[command_class.command] = command_class + + def get(self, command): + if command in self.commands: + return self.commands[command]() + else: + raise RuntimeError(f'Command "{command}" not found in CommandRegistry.') + + +class BaseCommand: + + perform_once = False + max_operations = None + + def __init__(self): + self.operations = [] + self.outputs = [] + self.artefacts = [] + + def execute(self, input_path, output_path, tmp_path): + raise NotImplementedError + + def get_suffix(self): + # loop over operations and take the first one + for operation in self.operations: + suffix = operation.get_suffix() + if suffix is not None: + return suffix + + def get_region(self): + # loop over operations concat the regions with a hyphen + regions = [] + for operation in self.operations: + region = operation.get_region() + if region is not None: + regions.append(region) + return '-'.join(regions) diff --git a/isimip_files_api/commands/cdo.py b/isimip_files_api/commands/cdo.py new file mode 100644 index 0000000..5ee289d --- /dev/null +++ b/isimip_files_api/commands/cdo.py @@ -0,0 +1,53 @@ +import csv +import subprocess + +from flask import current_app as app + +from . import BaseCommand + + +class CdoCommand(BaseCommand): + + command = 'cdo' + + def execute(self, job_path, input_path, output_path): + write_csv = (output_path.suffix == '.csv') + + # use the cdo bin from the config, NETCDF4_CLASSIC and compression + cmd_args = [app.config['CDO_BIN'], '-f', 'nc4c', '-z', 'zip_5', '-L'] + + # collect args from operations + for operation in reversed(self.operations): + operation.input_path = input_path + operation.output_path = output_path + cmd_args += operation.get_args() + + # add the input file + cmd_args += [str(input_path)] + + # add the output file + if not write_csv: + cmd_args += [str(output_path)] + + # join the cmd_args and execute the the command + cmd = ' '.join(cmd_args) + app.logger.debug(cmd) + + output = subprocess.check_output(cmd_args, env={ + 'CDI_VERSION_INFO': '0', + 'CDO_VERSION_INFO': '0', + 'CDO_HISTORY_INFO': '0' + }, cwd=job_path) + + # write the subprocess output into a csv file + if write_csv: + with open(job_path / output_path, 'w', newline='') as fp: + writer = csv.writer(fp, delimiter=',') + for line in output.splitlines(): + writer.writerow(line.decode().strip().split()) + + # add the output path to the commands outputs + self.outputs = [output_path] + + # return the command without the paths + return cmd diff --git a/isimip_files_api/commands/ncks.py b/isimip_files_api/commands/ncks.py new file mode 100644 index 0000000..4fa10b0 --- /dev/null +++ b/isimip_files_api/commands/ncks.py @@ -0,0 +1,34 @@ +import subprocess + +from flask import current_app as app + +from . import BaseCommand + + +class NcksCommand(BaseCommand): + + command = 'ncks' + + def execute(self, job_path, input_path, output_path): + # use the ncks bin from the config + cmd_args = [app.config['NCKS_BIN']] + + # add the arguments from the operations + for operation in self.operations: + operation.input_path = input_path + operation.output_path = output_path + cmd_args += operation.get_args() + + # add the input file and output file + cmd_args += [str(input_path), str(output_path)] + + # join the cmd_args and execute the the command + cmd = ' '.join(cmd_args) + app.logger.debug(cmd) + subprocess.check_call(cmd_args, cwd=job_path) + + # add the output path to the commands outputs + self.outputs = [output_path] + + # return the command without the paths + return cmd diff --git a/isimip_files_api/commands/python/__init__.py b/isimip_files_api/commands/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/isimip_files_api/commands/python/create_mask.py b/isimip_files_api/commands/python/create_mask.py new file mode 100644 index 0000000..817852a --- /dev/null +++ b/isimip_files_api/commands/python/create_mask.py @@ -0,0 +1,35 @@ +import subprocess +from pathlib import Path + +from flask import current_app as app + +from isimip_files_api.commands import BaseCommand + + +class CreateMaskCommand(BaseCommand): + + command = 'create_mask' + + perform_once = True + max_operations = 1 + + def execute(self, job_path, input_path, output_path): + # use the ncks bin from the config + cmd_args = [app.config['CREATE_MASK_BIN']] + + # add the arguments from the first operation + shape_file, mask_file = self.operations[0].get_args() + + # add the arguments to cmd_args + cmd_args += [shape_file, mask_file] + + # join the cmd_args and execute the the command + cmd = ' '.join(cmd_args) + app.logger.debug(cmd) + subprocess.check_call(cmd_args, cwd=job_path) + + # add the input and the output path of the command to the commands artefacts + self.artefacts = [Path(shape_file), Path(mask_file)] + + # return the command without the paths + return cmd diff --git a/isimip_files_api/config.py b/isimip_files_api/config.py new file mode 100644 index 0000000..7c8d4d9 --- /dev/null +++ b/isimip_files_api/config.py @@ -0,0 +1,119 @@ +''' +this file contains the default configuration, which can be overridden by + +* environment variables prefixed with 'FLASK_', either in the environment or a .env file +* a config.toml file at the root of the repository or at a location given by FLASK_CONFIG +''' + +# flask environment +ENV = 'production' # choose from 'production', 'development', 'testing' + +# enable Cross-Origin Resource Sharing (CORS) +CORS = True + +# log level and (optional) path to flask.log +LOG_LEVEL = 'ERROR' +LOG_PATH = None + +# the base url the api is running on, in production this will be something like https://api.example.com/api/v2 +BASE_URL = 'http://127.0.0.1:5000' + +# the output url the download packages will be available on +OUTPUT_URL = 'http://127.0.0.1/api/output/' + +# input path to the NetCDF files to process +INPUT_PATH = 'input' + +# temporary path to store interim files +TMP_PATH = 'tmp' + +# output path to store the created download packages, this directory should be exposed on OUTPUT_URL +OUTPUT_PATH = 'output' + +# output prefix to be prepended to the job ID to create the filename for the download package +OUTPUT_PREFIX = 'download-' + +# maximal number of files to process in one job +MAX_FILES = 32 + +# list of commands which can be executed +COMMANDS = [ + 'isimip_files_api.commands.cdo.CdoCommand', + 'isimip_files_api.commands.python.create_mask.CreateMaskCommand', + 'isimip_files_api.commands.ncks.NcksCommand' +] + +# maximum number of commands which can be performed +MAX_COMMANDS = 8 + +# list of operations which can be performed +OPERATIONS = [ + 'isimip_files_api.operations.cdo.SelectBBoxOperation', + 'isimip_files_api.operations.cdo.SelectPointOperation', + 'isimip_files_api.operations.cdo.MaskBBoxOperation', + 'isimip_files_api.operations.cdo.MaskMaskOperation', + 'isimip_files_api.operations.cdo.MaskCountryOperation', + 'isimip_files_api.operations.cdo.MaskLandonlyOperation', + 'isimip_files_api.operations.cdo.ComputeMeanOperation', + 'isimip_files_api.operations.cdo.OutputCsvOperation', + 'isimip_files_api.operations.python.create_mask.CreateMaskOperation', + 'isimip_files_api.operations.ncks.CutOutBBoxOperation' +] + +# maximum number of operations which can be performed +MAX_OPERATIONS = 16 + +# the tag which designates global files, this tag will be replaced by the region +# specifier of the operations, if set to None, the region will be appended +GLOBAL_TAG = '_global_' + +# the cdo binary on the system, e.g. /usr/bin/cdo +CDO_BIN = 'cdo' + +# the ncks binary on the system, e.g. /usr/bin/ncks +NCKS_BIN = 'ncks' + +# the binary used to create masks from geojson and shapefiles, +# shipped with this software and located in scripts/create_mask.py +CREATE_MASK_BIN = 'create-mask' + +# special settings for the countries +COUNTRYMASKS_FILE_PATH = 'countrymasks.nc' +COUNTRYMASKS_COUNTRIES = [ + 'AFG', 'ALB', 'DZA', 'AND', 'AGO', 'ATG', 'ARG', 'ARM', 'AUS', 'AUT', + 'AZE', 'BHS', 'BHR', 'BGD', 'BRB', 'BLR', 'BEL', 'BLZ', 'BEN', 'BTN', + 'BOL', 'BIH', 'BWA', 'BRA', 'BRN', 'BGR', 'BFA', 'BDI', 'KHM', 'CMR', + 'CAN', 'CPV', 'CSID', 'CYM', 'CAF', 'TCD', 'CHL', 'CHN', 'COL', 'COM', + 'COG', 'CRI', 'HRV', 'CUB', 'CYP', 'CZE', 'CIV', 'PRK', 'COD', 'DNK', + 'DJI', 'DMA', 'DOM', 'ECU', 'EGY', 'SLV', 'GNQ', 'ERI', 'EST', 'ETH', + 'FLK', 'FRO', 'FJI', 'FIN', 'FRA', 'GUF', 'PYF', 'ATF', 'GAB', 'GMB', + 'GEO', 'DEU', 'GHA', 'GRC', 'GRL', 'GRD', 'GLP', 'GUM', 'GTM', 'GIN', + 'GNB', 'GUY', 'HTI', 'HMD', 'HND', 'HKG', 'HUN', 'ISL', 'IND', 'IOSID', + 'IDN', 'IRN', 'IRQ', 'IRL', 'IMN', 'ISR', 'ITA', 'JAM', 'JKX', 'JPN', + 'JOR', 'KAZ', 'KEN', 'KIR', 'KWT', 'KGZ', 'LAO', 'LVA', 'LBN', 'LSO', + 'LBR', 'LBY', 'LTU', 'LUX', 'MDG', 'MWI', 'MYS', 'MLI', 'MLT', 'MTQ', + 'MRT', 'MUS', 'MYT', 'MEX', 'FSM', 'MDA', 'MNG', 'MNE', 'MAR', 'MOZ', + 'MMR', 'NAM', 'NPL', 'NLD', 'ANT', 'NCL', 'NZL', 'NIC', 'NER', 'NGA', + 'NIU', 'NOR', 'OMN', 'PSID', 'PAK', 'PLW', 'PSE', 'PAN', 'PNG', 'PRY', + 'PER', 'PHL', 'POL', 'PRT', 'PRI', 'QAT', 'KOR', 'ROU', 'RUS', 'RWA', + 'REU', 'LCA', 'SPM', 'VCT', 'WSM', 'STP', 'SAU', 'SEN', 'SRB', 'SLE', + 'SGP', 'SVK', 'SVN', 'SLB', 'SOM', 'ZAF', 'SGS', 'SSD', 'ESP', 'LKA', + 'SDN', 'SUR', 'SJM', 'SWZ', 'SWE', 'CHE', 'SYR', 'TWN', 'TJK', 'THA', + 'MKD', 'TLS', 'TGO', 'TON', 'TTO', 'TUN', 'TUR', 'TKM', 'GBR', 'UGA', + 'UKR', 'ARE', 'TZA', 'VIR', 'USA', 'URY', 'UZB', 'VUT', 'VEN', 'VNM', + 'ESH', 'YEM', 'ZMB', 'ZWE' +] + +# special settings for the land sea mask +LANDSEAMASK_FILE_PATH = 'landseamask.nc' + +# redis configuration +REDIS_URL = 'redis://localhost:6379' + +# configuration for the worker +WORKER_TIMEOUT = 180 +WORKER_LOG_FILE = None +WORKER_LOG_LEVEL = 'ERROR' +WORKER_TTL = 86400 # one day +WORKER_FAILURE_TTL = 86400 # one day +WORKER_RESULT_TTL = 604800 # one week diff --git a/isimip_files_api/jobs.py b/isimip_files_api/jobs.py index 2490b76..85a595f 100644 --- a/isimip_files_api/jobs.py +++ b/isimip_files_api/jobs.py @@ -1,24 +1,46 @@ -from redis import Redis +from flask import current_app as app + from rq import Queue from rq.exceptions import NoSuchJobError from rq.job import Job -from .settings import WORKER_FAILURE_TTL, WORKER_RESULT_TTL, WORKER_TIMEOUT, WORKER_TTL +from redis import Redis + +from .responses import get_response from .tasks import run_task -from .utils import get_hash, get_response +from .utils import get_hash, store_uploads + -redis = Redis() +def count_jobs(): + redis = Redis.from_url(app.config['REDIS_URL']) + queue = Queue(connection=redis) + return { + 'started': queue.started_job_registry.count, + 'deferred': queue.deferred_job_registry.count, + 'finished': queue.finished_job_registry.count, + 'failed': queue.failed_job_registry.count, + 'scheduled': queue.scheduled_job_registry.count + } + +def create_job(data, uploads): + redis = Redis.from_url(app.config['REDIS_URL']) + + job_id = get_hash(data, uploads) -def create_job(paths, args): - job_id = get_hash(paths, args) try: job = Job.fetch(job_id, connection=redis) return get_response(job, 200) except NoSuchJobError: - job = Job.create(run_task, id=job_id, args=[paths, args], - timeout=WORKER_TIMEOUT, ttl=WORKER_TTL, - result_ttl=WORKER_RESULT_TTL, failure_ttl=WORKER_FAILURE_TTL, + # create tmp dir and store uploaded files + store_uploads(job_id, uploads) + + # create and enqueue asyncronous job + job = Job.create(run_task, id=job_id, args=[data['paths'], data['operations']], + timeout=app.config['WORKER_TIMEOUT'], + ttl=app.config['WORKER_TTL'], + result_ttl=app.config['WORKER_RESULT_TTL'], + failure_ttl=app.config['WORKER_FAILURE_TTL'], connection=redis) queue = Queue(connection=redis) queue.enqueue_job(job) @@ -26,6 +48,8 @@ def create_job(paths, args): def fetch_job(job_id): + redis = Redis.from_url(app.config['REDIS_URL']) + try: job = Job.fetch(job_id, connection=redis) return get_response(job, 200) @@ -37,6 +61,8 @@ def fetch_job(job_id): def delete_job(job_id): + redis = Redis.from_url(app.config['REDIS_URL']) + try: job = Job.fetch(job_id, connection=redis) job.delete() diff --git a/isimip_files_api/logging.py b/isimip_files_api/logging.py new file mode 100644 index 0000000..e20a1ec --- /dev/null +++ b/isimip_files_api/logging.py @@ -0,0 +1,40 @@ +import logging +from pathlib import Path + +from flask.logging import default_handler + +import colorlog + + +def configure_logging(app): + app.logger.removeHandler(default_handler) + app.logger.setLevel(app.config['LOG_LEVEL'].upper()) + + # log to the console in development + if app.config['ENV'] == 'development': + formatter = colorlog.ColoredFormatter('%(log_color)s[%(asctime)s] %(levelname)s' + ' %(filename)s:%(funcName)s %(message)s') + + handler = colorlog.StreamHandler() + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + + app.logger.addHandler(handler) + + # log to a file + if app.config['LOG_PATH']: + log_path = Path(app.config['LOG_PATH']) + if log_path.exists: + formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s') + + handler = logging.FileHandler(log_path / 'app.log', 'a') + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + + app.logger.addHandler(handler) + else: + raise RuntimeError('LOG_PATH does not exist') + + # disable logger if not handlers are set + if not app.logger.handlers: + app.logger.disabled = True diff --git a/isimip_files_api/nco.py b/isimip_files_api/nco.py deleted file mode 100644 index ca97e12..0000000 --- a/isimip_files_api/nco.py +++ /dev/null @@ -1,28 +0,0 @@ -import logging -import subprocess - -from .settings import NCKS_BIN -from .utils import mask_cmd - - -def cutout_bbox(dataset_path, output_path, bbox): - # ncks -O -h -d lat,SOUTH,NORTH -d lon,WEST,EAST IFILE OFILE - south, north, west, east = bbox - return ncks( - '-O', # overwrite - '-h', # omit history - '-d', f'lat,{south:f},{north:f}', # longitude - '-d', f'lon,{west:f},{east:f}', # latitude - str(dataset_path), # input - str(output_path) # output - ) - - -def ncks(*args): - cmd_args = [NCKS_BIN, *list(args)] - cmd = ' '.join(cmd_args) - - logging.debug(cmd) - subprocess.check_output(cmd_args) - - return mask_cmd(cmd) diff --git a/isimip_files_api/netcdf.py b/isimip_files_api/netcdf.py index d873031..775c0c5 100644 --- a/isimip_files_api/netcdf.py +++ b/isimip_files_api/netcdf.py @@ -1,6 +1,6 @@ -from netCDF4 import Dataset +from flask import current_app as app -from .settings import RESOLUTIONS +from netCDF4 import Dataset def open_dataset(path): @@ -9,7 +9,7 @@ def open_dataset(path): def check_resolution(ds, resolution): try: - lat_size, lon_size = RESOLUTIONS[resolution] + lat_size, lon_size = app.config['RESOLUTIONS'][resolution] return ds.dimensions['lat'].size == lat_size or ds.dimensions['lon'].size == lon_size except KeyError: return False diff --git a/isimip_files_api/operations/__init__.py b/isimip_files_api/operations/__init__.py new file mode 100644 index 0000000..ff4c061 --- /dev/null +++ b/isimip_files_api/operations/__init__.py @@ -0,0 +1,146 @@ +import re +from pathlib import Path + +from flask import current_app as app + +from ..commands import CommandRegistry +from ..utils import import_class + + +class OperationRegistry: + + def __init__(self): + self.operations = {} + for python_path in app.config['OPERATIONS']: + operation_class = import_class(python_path) + self.operations[operation_class.operation] = operation_class + + def get(self, config): + if 'operation' in config and config['operation'] in self.operations: + return self.operations[config['operation']](config) + + def get_command_list(self, operations): + commands = [] + + command_registry = CommandRegistry() + current_command = None + for index, operation_config in enumerate(operations): + operation = self.get(operation_config) + + # add a new command, if + # * its the first operation + # * the operation has a different command than the previous one + # * the command reached its limit of operations + if ( + current_command is None or + current_command.command != operation.command or + ( + current_command.max_operations is not None and + len(current_command.operations) >= current_command.max_operations + ) + ): + current_command = command_registry.get(operation.command) + commands.append(current_command) + + current_command.operations.append(operation) + + return commands + + +class BaseOperation: + + def __init__(self, config): + self.config = config + + def validate(self): + raise NotImplementedError + + def validate_uploads(self, uploads): + pass + + def get_args(self): + raise NotImplementedError + + def get_suffix(self): + return None + + def get_region(self): + return None + + +class BBoxOperationMixin: + + def get_bbox(self): + return ( + float(self.config['bbox'][0]), + float(self.config['bbox'][1]), + float(self.config['bbox'][2]), + float(self.config['bbox'][3]) + ) + + def validate_bbox(self): + if 'bbox' in self.config: + try: + self.get_bbox() + except (ValueError, IndexError): + return [f'bbox is not of the form [%f, %f, %f, %f] for operation "{self.operation}"'] + else: + return [f'bbox is missing for operation "{self.operation}"'] + + +class PointOperationMixin: + + def get_point(self): + return ( + float(self.config['point'][0]), + float(self.config['point'][1]) + ) + + def validate_point(self): + if 'point' in self.config: + try: + self.get_point() + except (ValueError, IndexError): + return [f'point is not of the form [%f, %f] for operation "{self.operation}"'] + else: + return [f'point is missing for operation "{self.operation}"'] + + +class CountryOperationMixin: + + def get_country(self): + return self.config['country'].upper() + + def get_mask_path(self): + return Path(app.config['COUNTRYMASKS_FILE_PATH']).expanduser().resolve() + + def validate_country(self): + if 'country' in self.config: + if self.get_country() not in app.config['COUNTRYMASKS_COUNTRIES']: + return [f'country not in the list of supported countries (e.g. deu) for operation "{self.operation}"'] + else: + return [f'country is missing for operation "{self.operation}"'] + + +class MaskOperationMixin: + + def get_var(self): + return self.config.get('var', 'm_0') + + def get_mask_path(self): + return Path(self.config.get('mask')) + + def validate_var(self): + if 'var' in self.config: + if not re.match(r'^[A-Za-z0-9_]*$', self.config['var']): + return [f'only letters, numbers, underscores are permitted in "var" for operation "{self.operation}"'] + + def validate_mask(self): + if 'mask' in self.config: + if not re.match(r'^[A-Za-z0-9-.]*$', self.config['mask']): + return ['only letters, numbers, hyphens, and periods are permitted in "mask"' + f' for operation "{self.operation}"'] + elif re.search(r'\.{2}', self.config['mask']): + return [f'consecutive periods are not permitted in "mask" for operation "{self.operation}"'] + else: + return [f'mask is missing for operation "{self.operation}"'] diff --git a/isimip_files_api/operations/cdo.py b/isimip_files_api/operations/cdo.py new file mode 100644 index 0000000..4750eac --- /dev/null +++ b/isimip_files_api/operations/cdo.py @@ -0,0 +1,140 @@ +from pathlib import Path + +from flask import current_app as app + +from ..netcdf import get_index +from . import BaseOperation, BBoxOperationMixin, CountryOperationMixin, MaskOperationMixin, PointOperationMixin + + +class CdoOperation(BaseOperation): + + command = 'cdo' + + +class SelectBBoxOperation(BBoxOperationMixin, CdoOperation): + + operation = 'select_bbox' + + def validate(self): + return self.validate_bbox() + + def get_args(self): + south, north, west, east = self.get_bbox() + return [f'-sellonlatbox,{west:f},{east:f},{south:f},{north:f}'] + + def get_region(self): + south, north, west, east = self.get_bbox() + return f'lat{south}to{north}lon{west}to{east}' + + +class SelectPointOperation(PointOperationMixin, CdoOperation): + + operation = 'select_point' + + def validate(self): + return self.validate_point() + + def get_args(self): + point = self.get_point() + ix, iy = get_index(self.input_path, point) + + # add one since cdo is counting from 1! + ix, iy = ix + 1, iy + 1 + + return [f'-selindexbox,{ix:d},{ix:d},{iy:d},{iy:d}'] + + def get_region(self): + lat, lon = self.get_point() + return f'lat{lat}lon{lon}' + + +class MaskBBoxOperation(BBoxOperationMixin, CdoOperation): + + operation = 'mask_bbox' + + def validate(self): + return self.validate_bbox() + + def get_args(self): + south, north, west, east = self.get_bbox() + return [f'-masklonlatbox,{west:f},{east:f},{south:f},{north:f}'] + + def get_region(self): + south, north, west, east = self.get_bbox() + return f'lat{south}to{north}lon{west}to{east}' + + +class MaskMaskOperation(MaskOperationMixin, CdoOperation): + + operation = 'mask_mask' + + def validate(self): + errors = [] + errors += self.validate_var() or [] + errors += self.validate_mask() or [] + return errors + + def get_args(self): + var = self.get_var() + mask_path = self.get_mask_path() + return ['-ifthen', f'-selname,{var}', str(mask_path)] + + def get_region(self): + mask_path = self.get_mask_path() + return mask_path.stem + + +class MaskCountryOperation(CountryOperationMixin, CdoOperation): + + operation = 'mask_country' + + def validate(self): + return self.validate_country() + + def get_args(self): + country = self.get_country() + mask_path = str(Path(app.config['COUNTRYMASKS_FILE_PATH']).expanduser()) + return ['-ifthen', f'-selname,m_{country:3.3}', mask_path] + + def get_region(self): + return self.get_country().lower() + + +class MaskLandonlyOperation(CdoOperation): + + operation = 'mask_landonly' + + def validate(self): + pass + + def get_args(self): + mask_path = str(Path(app.config['LANDSEAMASK_FILE_PATH']).expanduser()) + return ['-ifthen', mask_path] + + def get_region(self): + return 'landonly' + + +class ComputeMeanOperation(CdoOperation): + + operation = 'compute_mean' + + def validate(self): + pass + + def get_args(self): + return ['-fldmean'] + + +class OutputCsvOperation(CdoOperation): + + operation = 'output_csv' + + def validate(self): + pass + + def get_args(self): + return ['-s', 'outputtab,date,value,nohead'] + + def get_suffix(self): + return '.csv' diff --git a/isimip_files_api/operations/ncks.py b/isimip_files_api/operations/ncks.py new file mode 100644 index 0000000..40bb1f4 --- /dev/null +++ b/isimip_files_api/operations/ncks.py @@ -0,0 +1,26 @@ +from . import BaseOperation, BBoxOperationMixin + + +class NcksOperation(BaseOperation): + + command = 'ncks' + + +class CutOutBBoxOperation(BBoxOperationMixin, NcksOperation): + + operation = 'cutout_bbox' + + def validate(self): + return self.validate_bbox() + + def get_args(self): + south, north, west, east = self.get_bbox() + return [ + '-h', # omit history + '-d', f'lat,{south:f},{north:f}', # longitude + '-d', f'lon,{west:f},{east:f}', # latitude + ] + + def get_region(self): + south, north, west, east = self.get_bbox() + return f'lat{south}to{north}lon{west}to{east}' diff --git a/isimip_files_api/operations/python/__init__.py b/isimip_files_api/operations/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/isimip_files_api/operations/python/create_mask.py b/isimip_files_api/operations/python/create_mask.py new file mode 100644 index 0000000..00c32bd --- /dev/null +++ b/isimip_files_api/operations/python/create_mask.py @@ -0,0 +1,29 @@ +from isimip_files_api.operations import BaseOperation, MaskOperationMixin + + +class CreateMaskOperation(MaskOperationMixin, BaseOperation): + + command = 'create_mask' + operation = 'create_mask' + + def validate(self): + errors = [] + errors += self.validate_shape() or [] + errors += self.validate_mask() or [] + return errors + + def validate_shape(self): + if 'shape' not in self.config: + return [f'shape is missing for operation "{self.operation}"'] + + def validate_uploads(self, uploads): + if 'shape' in self.config: + shape = self.config['shape'] + if not uploads.get(shape): + return [f'File "{shape}" for operation "{self.operation}" is not part of the uploads'] + + def get_args(self): + return [ + self.config.get('shape'), + self.config.get('mask') + ] diff --git a/isimip_files_api/responses.py b/isimip_files_api/responses.py new file mode 100644 index 0000000..b557118 --- /dev/null +++ b/isimip_files_api/responses.py @@ -0,0 +1,31 @@ +from flask import current_app as app + +from .utils import get_zip_file_name + + +def get_response(job, http_status): + file_name = get_zip_file_name(job.id) + status = job.get_status() + + response = { + 'id': job.id, + 'job_url': app.config['BASE_URL'] + '/' + job.id, + 'meta': job.meta, + 'ttl': app.config['WORKER_RESULT_TTL'], + 'status': status + } + + if status == 'finished': + response.update({ + 'file_name': file_name, + 'file_url': app.config['OUTPUT_URL'] + '/' + file_name, + }) + + return response, http_status + + +def get_errors_response(errors): + return { + 'status': 'error', + 'errors': errors + }, 400 diff --git a/isimip_files_api/scripts.py b/isimip_files_api/scripts.py deleted file mode 100644 index 2429ae6..0000000 --- a/isimip_files_api/scripts.py +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env python -import argparse -import logging -import os -from pathlib import Path - -from dotenv import load_dotenv -from redis import Redis -from rq.exceptions import NoSuchJobError -from rq.job import Job - -from .cdo import mask_bbox, mask_country, mask_landonly, select_bbox, select_country, select_point -from .nco import cutout_bbox -from .settings import LOG_FILE, LOG_LEVEL, OUTPUT_PATH -from .utils import get_output_name - -logging.basicConfig(level=LOG_LEVEL, filename=LOG_FILE) - -redis = Redis() - - -class FloatListAction(argparse.Action): - - def __call__(self, parser, namespace, values, option_string=None): - setattr(namespace, self.dest, [float(c) for c in values.split(',')]) - - -def parse_floats(string): - return [float(c) for c in string.bbox.split(',')] if string else None - - -def select(): - parser = argparse.ArgumentParser() - parser.add_argument('paths', nargs='+', help='List of files to mask') - parser.add_argument('--point', help='Select by point, e.g. "52.39,13.06"', action=FloatListAction) - parser.add_argument('--country', help='Select by country, e.g. "deu"') - parser.add_argument('--bbox', help='Select by bounding box, e.g. "-23.43651,23.43651,-180,180"', - action=FloatListAction) - parser.add_argument('--output', help='Output directory, default: .', default='.') - args = parser.parse_args() - - if not any([args.country, args.bbox, args.point]): - parser.error('Please provide at least --country, --bbox, or --point.') - - for path in args.paths: - input_path = Path(path) - output_path = Path(args.output).expanduser() / get_output_name(path, vars(args), suffix='.csv') - - if args.bbox: - select_bbox(input_path, output_path, args.bbox) - elif args.country: - select_country(input_path, output_path, args.country) - elif args.point: - select_point(input_path, output_path, args.point) - - -def mask(): - parser = argparse.ArgumentParser() - parser.add_argument('paths', nargs='+', help='List of files to mask') - parser.add_argument('--country', help='Mask by country, e.g. "deu"') - parser.add_argument('--bbox', help='Mask by bounding box, e.g. "-23.43651,23.43651,-180,180"', - action=FloatListAction) - parser.add_argument('--landonly', action='store_true', help='Mask only land data') - parser.add_argument('--output', help='Output directory, default: .', default='.') - args = parser.parse_args() - - if not any([args.country, args.bbox, args.landonly]): - parser.error('Please provide at least --country, --bbox, or --landonly.') - - for path in args.paths: - input_path = Path(path) - output_path = Path(args.output).expanduser() / get_output_name(path, vars(args)) - - if args.bbox: - mask_bbox(input_path, output_path, args.bbox) - elif args.country: - mask_country(input_path, output_path, args.country) - elif args.landonly: - mask_landonly(input_path, output_path) - - -def cutout(): - parser = argparse.ArgumentParser() - parser.add_argument('paths', nargs='+', help='List of files to mask') - parser.add_argument('--bbox', help='Mask by bounding box (south, north, west, east),' - ' e.g. "-23.43,23.43,-180,180"', - action=FloatListAction) - parser.add_argument('--output', help='Output directory, default: .', default='.') - args = parser.parse_args() - - if not any([args.bbox]): - parser.error('Please provide at least --bbox.') - - for path in args.paths: - input_path = Path(path) - output_path = Path(args.output).expanduser() / get_output_name(path, vars(args)) - - cutout_bbox(input_path, output_path, args.bbox) - - -def clean(): - load_dotenv(Path().cwd() / '.env') - - for root, dirs, files in os.walk(OUTPUT_PATH, topdown=False): - root_path = Path(root) - - for file_name in files: - file_path = root_path / file_name - - # construct relative path and job_id - path = root_path.relative_to(OUTPUT_PATH) / file_name - job_id = path.stem.split('-')[-1] - - # check if there is a job for this - try: - Job.fetch(job_id, connection=redis) - except NoSuchJobError: - os.remove(file_path) - - # remove empty directories - for dir_name in dirs: - dir_path = root_path / dir_name - if not os.listdir(dir_path): - os.rmdir(dir_path) diff --git a/isimip_files_api/scripts/__init__.py b/isimip_files_api/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/isimip_files_api/scripts/create_mask.py b/isimip_files_api/scripts/create_mask.py new file mode 100644 index 0000000..3eb36f7 --- /dev/null +++ b/isimip_files_api/scripts/create_mask.py @@ -0,0 +1,75 @@ +import argparse + +import geopandas +import netCDF4 as nc +import numpy as np +import rioxarray # noqa: F401 +import shapely +import xarray as xr + +FILL_VALUE_FLOAT = 1e+20 +FILL_VALUE_BOOL = -128 + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('input_path', help='path to the input file') + parser.add_argument('output_path', help='path to the output file') + parser.add_argument('-g|--grid', dest='grid', help='grid spacing in arcsec', + type=int, default=1800) + + args = parser.parse_args() + + n_lat = int((360 * 60 * 60) / args.grid) + n_lon = int((180 * 60 * 60) / args.grid) + delta = args.grid / 3600.0 + + df = geopandas.read_file(args.input_path) + + # create a diskless netcdf file using python-netCDF4 + ds = nc.Dataset(args.output_path, 'w', format='NETCDF4_CLASSIC', diskless=True) + ds.createDimension('lon', n_lat) + ds.createDimension('lat', n_lon) + + lon = ds.createVariable('lon', 'f8', ('lon',), fill_value=FILL_VALUE_FLOAT) + lon.standard_name = 'longitude' + lon.long_name = 'Longitude' + lon.units = 'degrees_east' + lon.axis = 'X' + lon[:] = np.arange(-180 + 0.5 * delta, 180, delta) + + lat = ds.createVariable('lat', 'f8', ('lat',), fill_value=FILL_VALUE_FLOAT) + lat.standard_name = 'latitude' + lat.long_name = 'Latitude' + lat.units = 'degrees_north' + lat.axis = 'Y' + lat[:] = np.arange(90 - 0.5 * delta, -90 - 0.5 * delta, -delta) + + for index, row in df.iterrows(): + variable_name = f'm_{index}' + variable = ds.createVariable(variable_name, 'b', ('lat', 'lon'), + fill_value=FILL_VALUE_BOOL, compression='zlib') + + for key, value in row.items(): + if isinstance(value, (str, int, float)): + setattr(variable, key.lower(), value) + + variable[:, :] = np.ones((n_lon, n_lat)) + + # convert to a crs-aware xarray dataset + ds = xr.open_dataset(xr.backends.NetCDF4DataStore(ds)) + ds.rio.write_crs(df.crs, inplace=True) + + for index, row in df.iterrows(): + variable_name = f'm_{index}' + variable = ds[variable_name] + + geometry = shapely.geometry.mapping(row['geometry']) + + mask = variable.rio.clip([geometry], drop=False) + variable[:, :] = mask[:, :] + + ds.to_netcdf(args.output_path) + + +if __name__ == '__main__': + main() diff --git a/isimip_files_api/settings.py b/isimip_files_api/settings.py deleted file mode 100644 index f7b82a4..0000000 --- a/isimip_files_api/settings.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -from pathlib import Path - -from dotenv import load_dotenv - -load_dotenv(Path().cwd() / '.env') - -LOG_FILE = os.getenv('LOG_FILE') -LOG_LEVEL = os.getenv('LOG_LEVEL', 'ERROR') - -BASE_URL = os.getenv('BASE_URL', 'http://127.0.0.1:5000').rstrip('/') -OUTPUT_URL = os.getenv('OUTPUT_URL', 'http://127.0.0.1/api/output/').rstrip('/') - -INPUT_PATH = Path(os.getenv('INPUT_PATH', 'input')) -OUTPUT_PATH = Path(os.getenv('OUTPUT_PATH', 'output')) -OUTPUT_PREFIX = os.getenv('OUTPUT_PREFIX', 'isimip-files-api-') - -CDO_BIN = os.getenv('CDO_BIN', 'cdo') -NCKS_BIN = os.getenv('NCKS_BIN', 'ncks') - -CORS = os.getenv('CORS', '').upper() in ['TRUE', 1] -GLOBAL = os.getenv('GLOBAL', '_global_') -MAX_FILES = int(os.getenv('MAX_FILES', '32')) - -WORKER_TIMEOUT = int(os.getenv('WORKER_TIMEOUT', '180')) -WORKER_LOG_FILE = os.getenv('WORKER_LOG_FILE') -WORKER_LOG_LEVEL = os.getenv('WORKER_LOG_LEVEL', 'ERROR') -WORKER_TTL = int(os.getenv('RESULT_TTL', '86400')) # one day -WORKER_FAILURE_TTL = int(os.getenv('WORKER_FAILURE_TTL', '86400')) # one day -WORKER_RESULT_TTL = int(os.getenv('WORKER_RESULT_TTL', '604800')) # one week - -RESOLUTIONS = { - '30arcsec': (20880, 43200), - '90arcsec': (6960, 14400), - '300arcsec': (2088, 4320), - '1800arcsec': (348, 720), - '15arcmin': (720, 1440), - '30arcmin': (360, 720), - '60arcmin': (180, 360), - '120arcmin': (90, 180) -} - -TASKS = { - 'cutout_bbox': ['30arcsec', '90arcsec', '300arcsec', '1800arcsec', - '15arcmin', '30arcmin', '60arcmin', '120arcmin'], - 'mask_bbox': ['15arcmin', '30arcmin', '60arcmin', '120arcmin'], - 'mask_country': ['30arcmin'], - 'mask_landonly': ['30arcmin'], - 'select_bbox': ['15arcmin', '30arcmin', '60arcmin', '120arcmin'], - 'select_country': ['30arcmin'], - 'select_point': ['15arcmin', '30arcmin', '60arcmin', '120arcmin'] -} - -COUNTRYMASKS_FILE_PATH = Path(os.getenv('COUNTRYMASKS_FILE_PATH', 'countrymasks.nc')) -COUNTRYMASKS_COUNTRIES = [ - 'AFG', 'ALB', 'DZA', 'AND', 'AGO', 'ATG', 'ARG', 'ARM', 'AUS', 'AUT', - 'AZE', 'BHS', 'BHR', 'BGD', 'BRB', 'BLR', 'BEL', 'BLZ', 'BEN', 'BTN', - 'BOL', 'BIH', 'BWA', 'BRA', 'BRN', 'BGR', 'BFA', 'BDI', 'KHM', 'CMR', - 'CAN', 'CPV', 'CSID', 'CYM', 'CAF', 'TCD', 'CHL', 'CHN', 'COL', 'COM', - 'COG', 'CRI', 'HRV', 'CUB', 'CYP', 'CZE', 'CIV', 'PRK', 'COD', 'DNK', - 'DJI', 'DMA', 'DOM', 'ECU', 'EGY', 'SLV', 'GNQ', 'ERI', 'EST', 'ETH', - 'FLK', 'FRO', 'FJI', 'FIN', 'FRA', 'GUF', 'PYF', 'ATF', 'GAB', 'GMB', - 'GEO', 'DEU', 'GHA', 'GRC', 'GRL', 'GRD', 'GLP', 'GUM', 'GTM', 'GIN', - 'GNB', 'GUY', 'HTI', 'HMD', 'HND', 'HKG', 'HUN', 'ISL', 'IND', 'IOSID', - 'IDN', 'IRN', 'IRQ', 'IRL', 'IMN', 'ISR', 'ITA', 'JAM', 'JKX', 'JPN', - 'JOR', 'KAZ', 'KEN', 'KIR', 'KWT', 'KGZ', 'LAO', 'LVA', 'LBN', 'LSO', - 'LBR', 'LBY', 'LTU', 'LUX', 'MDG', 'MWI', 'MYS', 'MLI', 'MLT', 'MTQ', - 'MRT', 'MUS', 'MYT', 'MEX', 'FSM', 'MDA', 'MNG', 'MNE', 'MAR', 'MOZ', - 'MMR', 'NAM', 'NPL', 'NLD', 'ANT', 'NCL', 'NZL', 'NIC', 'NER', 'NGA', - 'NIU', 'NOR', 'OMN', 'PSID', 'PAK', 'PLW', 'PSE', 'PAN', 'PNG', 'PRY', - 'PER', 'PHL', 'POL', 'PRT', 'PRI', 'QAT', 'KOR', 'ROU', 'RUS', 'RWA', - 'REU', 'LCA', 'SPM', 'VCT', 'WSM', 'STP', 'SAU', 'SEN', 'SRB', 'SLE', - 'SGP', 'SVK', 'SVN', 'SLB', 'SOM', 'ZAF', 'SGS', 'SSD', 'ESP', 'LKA', - 'SDN', 'SUR', 'SJM', 'SWZ', 'SWE', 'CHE', 'SYR', 'TWN', 'TJK', 'THA', - 'MKD', 'TLS', 'TGO', 'TON', 'TTO', 'TUN', 'TUR', 'TKM', 'GBR', 'UGA', - 'UKR', 'ARE', 'TZA', 'VIR', 'USA', 'URY', 'UZB', 'VUT', 'VEN', 'VNM', - 'ESH', 'YEM', 'ZMB', 'ZWE' -] - -LANDSEAMASK_FILE_PATH = Path(os.getenv('LANDSEAMASK_FILE_PATH', 'landseamask.nc')) diff --git a/isimip_files_api/tasks.py b/isimip_files_api/tasks.py index 5e41f90..c46a9ef 100644 --- a/isimip_files_api/tasks.py +++ b/isimip_files_api/tasks.py @@ -1,78 +1,90 @@ -import shutil from pathlib import Path -from tempfile import mkdtemp -from zipfile import ZipFile + +from flask import current_app as app from rq import get_current_job -from .cdo import mask_bbox, mask_country, mask_landonly, select_bbox, select_country, select_point -from .nco import cutout_bbox -from .settings import INPUT_PATH, OUTPUT_PATH, OUTPUT_PREFIX -from .utils import get_output_name, get_zip_file_name +from .operations import OperationRegistry +from .utils import get_input_path, get_job_path, get_zip_file, mask_paths, remove_job_path -def run_task(paths, args): +def run_task(paths, operations): # get current job and init metadata job = get_current_job() job.meta['created_files'] = 0 job.meta['total_files'] = len(paths) job.save_meta() - # create output paths - output_path = OUTPUT_PATH / get_zip_file_name(job.id) - output_path.parent.mkdir(parents=True, exist_ok=True) - - # create a temporary directory - tmp = Path(mkdtemp(prefix=OUTPUT_PREFIX)) + # get the temporary directory + job_path = get_job_path(job.id) - # open zipfile - z = ZipFile(output_path, 'w') + # create the output zip file + zip_file = get_zip_file(job.id) # open readme - readme_path = tmp / 'README.txt' + readme_path = job_path / 'README.txt' readme = readme_path.open('w') readme.write('The following commands were used to create the files in this container:\n\n') - for path in paths: - input_path = INPUT_PATH / path - if args['task'] in ['select_country', 'select_bbox', 'select_point']: - tmp_name = get_output_name(path, args, suffix='.csv') - else: - tmp_name = get_output_name(path, args) - - tmp_path = tmp / tmp_name - - if args['task'] == 'cutout_bbox': - cmd = cutout_bbox(input_path, tmp_path, args['bbox']) - - elif args['task'] == 'mask_country': - cmd = mask_country(input_path, tmp_path, args['country']) - - elif args['task'] == 'mask_bbox': - cmd = mask_bbox(input_path, tmp_path, args['bbox']) + # construct command list from the operations + command_list = OperationRegistry().get_command_list(operations) - elif args['task'] == 'mask_landonly': - cmd = mask_landonly(input_path, tmp_path) - - elif args['task'] == 'select_country': - cmd = select_country(input_path, tmp_path, args['country']) - - elif args['task'] == 'select_bbox': - cmd = select_bbox(input_path, tmp_path, args['bbox']) - - elif args['task'] == 'select_point': - cmd = select_point(input_path, tmp_path, args['point']) - - # write cmd into readme file - readme.write(cmd + '\n') - - if tmp_path.is_file(): - z.write(tmp_path, tmp_name) - else: - error_path = Path(tmp_path).with_suffix('.txt') - error_path.write_text('Something went wrong with processing the input file.' - ' Probably it is not using a global grid.') - z.write(error_path, error_path.name) + for path in paths: + input_path = get_input_path() / path + output_path = Path(input_path.name) + output_region = None + + for command in command_list: + if command.perform_once and path != paths[0]: + continue + + # update region tag in output_path + region = command.get_region() + if region: + if output_region is None: + if app.config['GLOBAL_TAG'] in output_path.name: + # replace the _global_ specifier + output_name = output_path.name.replace(app.config['GLOBAL_TAG'], f'_{region}_') + else: + output_name = output_path.stem + f'_{region}' + output_path.suffix + else: + region = f'{output_region}+{region}' + output_name = output_path.name.replace(output_region, region) + + output_region = region + output_path = output_path.with_name(output_name) + + # update suffix in output_path + suffix = command.get_suffix() + if suffix: + output_path = output_path.with_suffix(suffix) + + # execute the command and obtain the command_string + command_string = command.execute(job_path, input_path, output_path) + + # write the command_string into readme file + readme.write(mask_paths(command_string) + '\n') + + # write the artefacts into the zipfile + if command.artefacts: + for artefact_path in command.artefacts: + if (job_path / artefact_path).is_file(): + zip_file.write(job_path / artefact_path, artefact_path.name) + + # write the outputs into the zipfile and set the new input path + if command.outputs: + for output_path in command.outputs: + # set the new input path to the output path + input_path = output_path + + if (job_path / output_path).is_file(): + # write the output into the zipfile + zip_file.write(job_path / output_path, output_path.name) + else: + error_path = output_path.with_suffix('.txt') + error_path.write_text('Something went wrong with processing the input file.' + ' Probably it is not using a global grid.') + zip_file.write(error_path, error_path.name) # update the current job and store progress job.meta['created_files'] += 1 @@ -80,13 +92,13 @@ def run_task(paths, args): # close and write readme file readme.close() - z.write(readme_path, readme_path.name) + zip_file.write(readme_path, readme_path.name) # close zip file - z.close() + zip_file.close() # delete temporary directory - shutil.rmtree(tmp) + remove_job_path(job.id) # return True to indicate success return True diff --git a/isimip_files_api/tests/__init__.py b/isimip_files_api/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/isimip_files_api/tests/conftest.py b/isimip_files_api/tests/conftest.py new file mode 100644 index 0000000..9af8c47 --- /dev/null +++ b/isimip_files_api/tests/conftest.py @@ -0,0 +1,30 @@ +import pytest + +from redis import Redis + +from ..app import create_app + + +@pytest.fixture() +def app(): + app = create_app() + app.config.update({ + 'TESTING': True, + 'INPUT_PATH': 'testing/input', + 'OUTPUT_PATH': 'testing/output', + 'MAX_FILES': 8, + 'MAX_COMMANDS': 2, + 'MAX_OPERATIONS': 8 + }) + + yield app + + +@pytest.fixture() +def client(app): + return app.test_client() + + +@pytest.fixture() +def redis(app): + return Redis.from_url(app.config['REDIS_URL']) diff --git a/isimip_files_api/tests/test_create.py b/isimip_files_api/tests/test_create.py new file mode 100644 index 0000000..3566e41 --- /dev/null +++ b/isimip_files_api/tests/test_create.py @@ -0,0 +1,136 @@ +def test_empty(client): + response = client.post('/', json={}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == {'data': ['No json data provided with POST']} + + +def test_list(client): + response = client.post('/', json=[1, 2, 3]) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == {'data': ['Provided json data is malformatted']} + + +def test_missing(client): + response = client.post('/', json={'foo': 'bar'}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'paths': ['This field is required.'], + 'operations': ['This field is required.'] + } + + +def test_malformatted(client): + response = client.post('/', json={'paths': {'foo': 'bar'}, 'operations': {'foo': 'bar'}}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'paths': ['Provided json data is malformatted.'], + 'operations': ['Provided json data is malformatted.'] + } + + +def test_paths_to_many_files(client): + response = client.post('/', json={'paths': [ + 'test1.nc', + 'test2.nc', + 'test3.nc', + 'test4.nc', + 'test5.nc', + 'test6.nc', + 'test7.nc', + 'test8.nc', + 'test9.nc' + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'paths': ['To many files match that dataset (max: 8).'], + 'operations': ['This field is required.'] + } + + +def test_paths_below_root(client): + response = client.post('/', json={'paths': [ + '../test.nc' + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'paths': ['../test.nc is below the root path.'], + 'operations': ['This field is required.'] + } + + +def test_paths_not_netcdf(client): + response = client.post('/', json={'paths': [ + 'test.txt' + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'paths': ['test.txt is not a NetCDF file.'], + 'operations': ['This field is required.'] + } + + +def test_paths_not_found(client): + response = client.post('/', json={ + 'paths': [ + 'test.nc' + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'paths': ['test.nc was not found on the server.'], + 'operations': ['This field is required.'] + } + + +def test_operations_not_found(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'invalid' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['operation "invalid" was not found'] + } + + +def test_operations_to_many_commands(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'cutout_bbox', + 'bbox': [-10, 10, -10, 10] + }, + { + 'operation': 'mask_landonly' + }, + { + 'operation': 'cutout_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['Operations result in to many commands (max: 2).'] + } + + +def test_operations_to_many_operations(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_landonly' + } for i in range(10) + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['To many operations provided (max: 8).'] + } diff --git a/isimip_files_api/tests/test_create_compute_mean.py b/isimip_files_api/tests/test_create_compute_mean.py new file mode 100644 index 0000000..6757c9d --- /dev/null +++ b/isimip_files_api/tests/test_create_compute_mean.py @@ -0,0 +1,11 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'compute_mean' + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None diff --git a/isimip_files_api/tests/test_create_create_mask.py b/isimip_files_api/tests/test_create_create_mask.py new file mode 100644 index 0000000..a51df9c --- /dev/null +++ b/isimip_files_api/tests/test_create_create_mask.py @@ -0,0 +1,114 @@ +import io +import json +from pathlib import Path + +shapefile_path = Path('testing') / 'shapes' / 'pm.zip' +wrong_path = Path('testing') / 'shapes' / 'wrong.zip' +geojson_path = Path('testing') / 'shapes' / 'pm.json' + + +def test_shape(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + data = { + 'paths': ['constant.nc'], + 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.zip', + 'mask': 'pm.nc' + } + ] + } + + response = client.post('/', data={ + 'data': (io.BytesIO(json.dumps(data).encode()), 'data', 'application/json'), + 'pm.zip': (shapefile_path.open('rb'), 'pm.zip', 'application/zip') + }) + + assert response.status_code == 201, response.text + assert response.json.get('errors') is None + + +def test_missing_shape(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'create_mask', + 'mask': 'pm.nc' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['shape is missing for operation "create_mask"'] + } + + +def test_missing_mask(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.zip' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['mask is missing for operation "create_mask"'] + } + + +def test_invalid_mask1(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.zip', + 'mask': 'pm.nc ; wrong' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['only letters, numbers, hyphens, and periods are permitted in "mask"' + ' for operation "create_mask"'] + } + + +def test_invalid_mask2(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.zip', + 'mask': '..pm.nc' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['consecutive periods are not permitted in "mask" for operation "create_mask"'] + } + + +def test_missing_file(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + data = { + 'paths': ['constant.nc'], + 'operations': [ + { + 'operation': 'create_mask', + 'shape': 'pm.zip', + 'mask': 'pm.nc' + } + ] + } + + response = client.post('/', data={ + 'data': (io.BytesIO(json.dumps(data).encode()), 'data', 'application/json') + }) + + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['File "pm.zip" for operation "create_mask" is not part of the uploads'] + } diff --git a/isimip_files_api/tests/test_create_cutout_bbox.py b/isimip_files_api/tests/test_create_cutout_bbox.py new file mode 100644 index 0000000..196e963 --- /dev/null +++ b/isimip_files_api/tests/test_create_cutout_bbox.py @@ -0,0 +1,39 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'cutout_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None + + +def test_missing_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'cutout_bbox' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['bbox is missing for operation "cutout_bbox"'] + } + + +def test_wrong_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'cutout_bbox', + 'bbox': [-23.43651, 23.43651, -180, 'wrong'] + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['bbox is not of the form [%f, %f, %f, %f] for operation "cutout_bbox"'] + } diff --git a/isimip_files_api/tests/test_create_mask_bbox.py b/isimip_files_api/tests/test_create_mask_bbox.py new file mode 100644 index 0000000..bf5fcf0 --- /dev/null +++ b/isimip_files_api/tests/test_create_mask_bbox.py @@ -0,0 +1,39 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None + + +def test_missing_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_bbox' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['bbox is missing for operation "mask_bbox"'] + } + + +def test_wrong_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_bbox', + 'bbox': [-23.43651, 23.43651, -180, 'wrong'] + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['bbox is not of the form [%f, %f, %f, %f] for operation "mask_bbox"'] + } diff --git a/isimip_files_api/tests/test_create_mask_country.py b/isimip_files_api/tests/test_create_mask_country.py new file mode 100644 index 0000000..3914275 --- /dev/null +++ b/isimip_files_api/tests/test_create_mask_country.py @@ -0,0 +1,39 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_country', + 'country': 'deu' + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None + + +def test_missing_country(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_country' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['country is missing for operation "mask_country"'] + } + + +def test_wrong_country(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_country', + 'country': 'wrong' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['country not in the list of supported countries (e.g. deu) for operation "mask_country"'] + } diff --git a/isimip_files_api/tests/test_create_mask_landonly.py b/isimip_files_api/tests/test_create_mask_landonly.py new file mode 100644 index 0000000..e3e78b6 --- /dev/null +++ b/isimip_files_api/tests/test_create_mask_landonly.py @@ -0,0 +1,11 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_landonly' + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None diff --git a/isimip_files_api/tests/test_create_mask_mask.py b/isimip_files_api/tests/test_create_mask_mask.py new file mode 100644 index 0000000..52070a5 --- /dev/null +++ b/isimip_files_api/tests/test_create_mask_mask.py @@ -0,0 +1,73 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_mask', + 'mask': 'pm.nc' + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None + + +def test_missing_mask(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_mask' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['mask is missing for operation "mask_mask"'] + } + + +def test_invalid_mask1(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_mask', + 'shape': 'pm.zip', + 'mask': 'pm.nc ; wrong' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['only letters, numbers, hyphens, and periods are permitted in "mask"' + ' for operation "mask_mask"'] + } + + +def test_invalid_mask2(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_mask', + 'shape': 'pm.zip', + 'mask': '..pm.nc' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['consecutive periods are not permitted in "mask" for operation "mask_mask"'] + } + + +def test_invalid_var(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'mask_mask', + 'shape': 'pm.zip', + 'mask': 'pm.nc', + 'var': 'm_0 ; wrong' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['only letters, numbers, underscores are permitted in "var"' + ' for operation "mask_mask"'] + } diff --git a/isimip_files_api/tests/test_create_output_csv.py b/isimip_files_api/tests/test_create_output_csv.py new file mode 100644 index 0000000..46cc16e --- /dev/null +++ b/isimip_files_api/tests/test_create_output_csv.py @@ -0,0 +1,11 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'output_csv' + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None diff --git a/isimip_files_api/tests/test_create_select_bbox.py b/isimip_files_api/tests/test_create_select_bbox.py new file mode 100644 index 0000000..541f092 --- /dev/null +++ b/isimip_files_api/tests/test_create_select_bbox.py @@ -0,0 +1,39 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'select_bbox', + 'bbox': [-23.43651, 23.43651, -180, 180] + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None + + +def test_missing_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'select_bbox' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['bbox is missing for operation "select_bbox"'] + } + + +def test_wrong_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'select_bbox', + 'bbox': [-23.43651, 23.43651, -180, 'wrong'] + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['bbox is not of the form [%f, %f, %f, %f] for operation "select_bbox"'] + } diff --git a/isimip_files_api/tests/test_create_select_point.py b/isimip_files_api/tests/test_create_select_point.py new file mode 100644 index 0000000..15fbeb6 --- /dev/null +++ b/isimip_files_api/tests/test_create_select_point.py @@ -0,0 +1,39 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.create_job', mocker.Mock(return_value=({}, 201))) + + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'select_point', + 'point': [52.380551, 13.064332] + } + ]}) + + assert response.status_code == 201 + assert response.json.get('errors') is None + + +def test_missing_bbox(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'select_point' + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['point is missing for operation "select_point"'] + } + + +def test_wrong_point(client): + response = client.post('/', json={'paths': ['constant.nc'], 'operations': [ + { + 'operation': 'select_point', + 'point': [52.380551, 'wrong'] + } + ]}) + assert response.status_code == 400 + assert response.json.get('status') == 'error' + assert response.json.get('errors') == { + 'operations': ['point is not of the form [%f, %f] for operation "select_point"'] + } diff --git a/isimip_files_api/tests/test_delete.py b/isimip_files_api/tests/test_delete.py new file mode 100644 index 0000000..f598ece --- /dev/null +++ b/isimip_files_api/tests/test_delete.py @@ -0,0 +1,21 @@ +def mocked_delete_job(job_id): + if job_id == 'test': + return {}, 204 + else: + return {}, 404 + + +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.fetch_job', mocked_delete_job) + + response = client.get('/test') + + assert response.status_code == 204 + + +def test_wrong_id(client, mocker): + mocker.patch('isimip_files_api.app.fetch_job', mocked_delete_job) + + response = client.get('/wrong') + + assert response.status_code == 404 diff --git a/isimip_files_api/tests/test_detail.py b/isimip_files_api/tests/test_detail.py new file mode 100644 index 0000000..d8b53de --- /dev/null +++ b/isimip_files_api/tests/test_detail.py @@ -0,0 +1,21 @@ +def mocked_fetch_job(job_id): + if job_id == 'test': + return {}, 200 + else: + return {}, 404 + + +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.fetch_job', mocked_fetch_job) + + response = client.get('/test') + + assert response.status_code == 200 + + +def test_wrong_id(client, mocker): + mocker.patch('isimip_files_api.app.fetch_job', mocked_fetch_job) + + response = client.get('/wrong') + + assert response.status_code == 404 diff --git a/isimip_files_api/tests/test_root.py b/isimip_files_api/tests/test_root.py new file mode 100644 index 0000000..a090c34 --- /dev/null +++ b/isimip_files_api/tests/test_root.py @@ -0,0 +1,24 @@ +def test_success(client, mocker): + mocker.patch('isimip_files_api.app.count_jobs', mocker.Mock(return_value={})) + + response = client.get('/') + + assert response.status_code == 200 + assert response.json.get('status') == 'ok' + assert response.json.get('commands') == [ + 'cdo', + 'create_mask', + 'ncks' + ] + assert response.json.get('operations') == [ + 'select_bbox', + 'select_point', + 'mask_bbox', + 'mask_mask', + 'mask_country', + 'mask_landonly', + 'compute_mean', + 'output_csv', + 'create_mask', + 'cutout_bbox' + ] diff --git a/isimip_files_api/utils.py b/isimip_files_api/utils.py index 1d9f9b8..f016e4a 100644 --- a/isimip_files_api/utils.py +++ b/isimip_files_api/utils.py @@ -1,66 +1,99 @@ import hashlib +import importlib +import json import re +import shutil from pathlib import Path +from zipfile import ZipFile -from .settings import BASE_URL, GLOBAL, OUTPUT_PREFIX, OUTPUT_URL, WORKER_RESULT_TTL +from flask import current_app as app -def get_response(job, http_status): - file_name = get_zip_file_name(job.id) +def get_config_path(config_file): + if config_file is not None: + config_path = Path(config_file) + if not config_path.is_absolute(): + config_path = Path().cwd() / config_path - return { - 'id': job.id, - 'job_url': BASE_URL + '/' + job.id, - 'file_name': file_name, - 'file_url': OUTPUT_URL + '/' + file_name, - 'meta': job.meta, - 'ttl': WORKER_RESULT_TTL, - 'status': job.get_status(), - }, http_status + if config_path.is_file(): + return config_path -def get_errors_response(errors): - return { - 'status': 'error', - 'errors': errors - }, 400 +def handle_post_request(request): + data = {} + files = {} + if request.content_type.startswith('multipart/form-data'): + for file_name, file_storage in request.files.items(): + if file_name == 'data': + data = json.loads(file_storage.read()) + else: + files[file_name] = file_storage + else: + data = request.json + + return data, files -def get_output_name(path, args, suffix=None): - if args.get('bbox'): - south, north, west, east = args['bbox'] - region = f'lat{south}to{north}lon{west}to{east}' - elif args.get('country'): - region = args['country'].lower() +def get_hash(data, uploads): + m = hashlib.sha1() + m.update(str(data).encode()) + for file_name, file_storage in uploads.items(): + m.update(file_storage.read()) + return m.hexdigest() - elif args.get('point'): - lat, lon = args['point'] - region = f'lat{lat}lon{lon}' - else: - region = 'landonly' +def get_input_path(): + return Path(app.config['INPUT_PATH']).expanduser().resolve() - path = Path(path) - suffix = suffix if suffix else path.suffix - if GLOBAL in path.name: - # replace the _global_ specifier - return path.with_suffix(suffix).name.replace(GLOBAL, f'_{region}_') - else: - # append region specifier - return path.stem + f'_{region}' + suffix + +def get_tmp_path(): + return Path(app.config['TMP_PATH']).expanduser().resolve() + + +def get_output_path(): + return Path(app.config['OUTPUT_PATH']).expanduser().resolve() + + +def get_job_path(job_id): + job_path = get_tmp_path().joinpath(app.config['OUTPUT_PREFIX'] + job_id) + job_path.mkdir(parents=True, exist_ok=True) + return job_path def get_zip_file_name(job_id): - return Path(OUTPUT_PREFIX + job_id).with_suffix('.zip').as_posix() + return Path(app.config['OUTPUT_PREFIX'] + job_id).with_suffix('.zip').as_posix() -def get_hash(paths, args): - m = hashlib.sha1() - m.update(str(paths).encode()) - m.update(str(args).encode()) - return m.hexdigest() +def get_zip_path(job_id): + zip_path = get_output_path() / get_zip_file_name(job_id) + zip_path.parent.mkdir(parents=True, exist_ok=True) + return zip_path + + +def get_zip_file(job_id): + zip_path = get_zip_path(job_id) + return ZipFile(zip_path.as_posix(), 'w') + + +def store_uploads(job_id, uploads): + job_path = get_job_path(job_id) + + for file_name, file_storage in uploads.items(): + with open(job_path / file_name, 'wb') as fp: + file_storage.seek(0) + file_storage.save(fp) + + +def remove_job_path(job_id): + job_path = get_job_path(job_id) + shutil.rmtree(job_path) + + +def mask_paths(string): + return re.sub(r'\/\S+\/', '', string) -def mask_cmd(cmd): - return re.sub(r'\/\S+\/', '', cmd) +def import_class(string): + module_name, class_name = string.rsplit('.', 1) + return getattr(importlib.import_module(module_name), class_name) diff --git a/isimip_files_api/validators.py b/isimip_files_api/validators.py index d6ca97f..4628327 100644 --- a/isimip_files_api/validators.py +++ b/isimip_files_api/validators.py @@ -1,108 +1,92 @@ -from .netcdf import check_resolution, open_dataset -from .settings import COUNTRYMASKS_COUNTRIES, INPUT_PATH, MAX_FILES, TASKS +from collections import defaultdict +from pathlib import Path +from flask import current_app as app -def validate_data(data, errors): - # check if any data is provided - if (not data) or (data is None): - errors['data'] = 'No json data provided with POST' - elif not isinstance(data, dict): - errors['data'] = 'Provided json data is malformatted' - else: - paths = validate_paths(data, errors) - args = validate_args(data, errors) - return paths, args +from .operations import OperationRegistry -def validate_paths(data, errors): - # check if path is given - if 'paths' not in data: - errors['paths'].append('This field is required.') - return None - - if len(data['paths']) > MAX_FILES: - errors['paths'].append(f'To many files match that dataset (max: {MAX_FILES}).') - return None - - for path in data['paths']: - # prevent tree traversal - try: - absolute_path = INPUT_PATH / path - absolute_path.parent.resolve().relative_to(INPUT_PATH.resolve()) - except ValueError: - errors['paths'].append(f'{path} is below the root path.') - - # check if the file exists - if not absolute_path.is_file(): - errors['paths'].append(f'{path} was not found on the server.') - - # check if the file exists - if absolute_path.suffix not in ['.nc', '.nc4']: - errors['paths'].append(f'{path} is not a NetCDF file..') - - return data['paths'] - - -def validate_args(data, errors): - args = { - 'task': validate_task(data, errors), - 'bbox': validate_bbox(data, errors), - 'point': validate_point(data, errors), - 'country': validate_country(data, errors) - } - if args['task'] in ['select_country', 'mask_country'] and not args['country']: - errors['args'] = 'country needs to be provided' - elif args['task'] in ['cutout_bbox', 'mask_bbox', 'select_bbox'] and not args['bbox']: - errors['args'] = 'bbox needs to be provided' - elif args['task'] in ['select_point'] and not args['point']: - errors['args'] = 'point needs to be provided' - else: - return args +def validate_data(data): + errors = defaultdict(list) + if (not data) or (data is None): + errors['data'].append('No json data provided with POST') + elif not isinstance(data, dict): + errors['data'].append('Provided json data is malformatted') -def validate_task(data, errors): - if 'task' not in data or data['task'] not in TASKS.keys(): - errors['task'] = 'task needs to be provided' - else: - return data['task'] + return errors -def validate_bbox(data, errors): - if 'bbox' in data: - try: - return [float(data['bbox'][0]), float(data['bbox'][1]), float(data['bbox'][2]), float(data['bbox'][3])] - except (ValueError, IndexError): - errors['bbox'] = 'bbox is not of the form [%f, %f, %f, %f]' - else: - return None +def validate_paths(data): + errors = defaultdict(list) - -def validate_point(data, errors): - if 'point' in data: - try: - return [float(data['point'][0]), float(data['point'][1])] - except (ValueError, IndexError): - errors['bbox'] = 'bbox is not of the form [%f, %f]' + if not data.get('paths'): + errors['paths'].append('This field is required.') + elif not isinstance(data['paths'], list): + errors['paths'].append('Provided json data is malformatted.') else: - return None - - -def validate_country(data, errors): - if 'country' in data: - country = data['country'].lower() - - if country.upper() not in COUNTRYMASKS_COUNTRIES: - errors['country'] = 'country not in the list of supported countries (e.g. DEU)' - - return country + if len(data['paths']) > app.config['MAX_FILES']: + errors['paths'].append('To many files match that dataset (max: {MAX_FILES}).'.format(**app.config)) + else: + for path in data['paths']: + # prevent tree traversal + try: + input_path = Path(app.config['INPUT_PATH']).expanduser() + absolute_path = input_path / path + absolute_path.parent.resolve().relative_to(input_path.resolve()) + except ValueError: + errors['paths'].append(f'{path} is below the root path.') + else: + # check if the file exists + if absolute_path.suffix not in ['.nc', '.nc4']: + errors['paths'].append(f'{path} is not a NetCDF file.') + # check if the file exists + elif not absolute_path.is_file(): + errors['paths'].append(f'{path} was not found on the server.') + + return errors + + +def validate_operations(data): + errors = defaultdict(list) + + if not data.get('operations'): + errors['operations'].append('This field is required.') + elif not isinstance(data['operations'], list): + errors['operations'].append('Provided json data is malformatted.') + elif len(data['operations']) > app.config['MAX_OPERATIONS']: + errors['operations'].append('To many operations provided (max: {MAX_OPERATIONS}).'.format(**app.config)) else: - return None - - -def validate_datasets(paths, args, errors): - for path in paths: - absolute_path = INPUT_PATH / path - with open_dataset(absolute_path) as ds: - resolutions = TASKS[args.get('task')] - if not any(check_resolution(ds, resolution) for resolution in resolutions): - errors['paths'].append(f'{path} is not using the correct grid: {resolutions}.') + operation_registry = OperationRegistry() + for index, config in enumerate(data['operations']): + if 'operation' in config: + operation = operation_registry.get(config) + if operation is None: + errors['operations'].append('operation "{operation}" was not found'.format(**config)) + else: + operation_errors = operation.validate() + if operation_errors: + errors['operations'] += operation_errors + else: + errors['operations'].append(f'operation [{index}] does not have a "operation" key') + + if not errors and len(operation_registry.get_command_list(data['operations'])) > app.config['MAX_COMMANDS']: + errors['operations'].append('Operations result in to many commands (max: {MAX_COMMANDS}).'.format( + **app.config + )) + + return errors + + +def validate_uploads(data, uploads): + errors = defaultdict(list) + + operation_registry = OperationRegistry() + for index, config in enumerate(data['operations']): + if 'operation' in config: + operation = operation_registry.get(config) + operation_errors = operation.validate_uploads(uploads) + if operation_errors: + errors['operations'] += operation_errors + + return errors diff --git a/isimip_files_api/worker.py b/isimip_files_api/worker.py index 7001b0d..7d7d3d3 100644 --- a/isimip_files_api/worker.py +++ b/isimip_files_api/worker.py @@ -1,12 +1,16 @@ -import logging +from rq import Worker as RQWorker -from rq import Worker as Worker +from dotenv import load_dotenv -from .settings import WORKER_LOG_FILE, WORKER_LOG_LEVEL +from .app import create_app -logging.basicConfig(level=WORKER_LOG_LEVEL, filename=WORKER_LOG_FILE, - format='[%(asctime)s] %(levelname)s %(name)s: %(message)s') +class Worker(RQWorker): -class LogWorker(Worker): - pass + def work(self, *args, **kwargs): + load_dotenv() + + app = create_app() + + with app.app_context(): + super().work(*args, **kwargs) diff --git a/pyproject.toml b/pyproject.toml index c31cab2..af6b88c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,7 @@ [build-system] -requires = ["setuptools", "setuptools-scm"] +requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" -__author__ = 'Jochen Klar' -__email__ = 'jochen.klar@pik-potsdam.de' -__license__ = 'MIT' -__copyright__ = 'Copyright (c) 2020 Potsdam Institute for Climate Impact Research' - - [project] name = "isimip-files-api" authors = [ @@ -31,37 +25,43 @@ classifiers = [ 'Programming Language :: Python :: 3.12' ] dependencies = [ - "Flask~=1.1.1", - "Flask-Cors~=3.0.8", - "gunicorn~=20.0.4", - "netCDF4~=1.5.3", - "numpy~=1.18.2", - "python-dotenv~=0.12.0", - "rq~=1.3.0" + "colorlog~=6.8.2", + "Flask~=3.0.0", + "Flask-Cors~=4.0.0", + "geopandas~=0.14.3", + "gunicorn~=21.2.0", + "netCDF4~=1.6.5", + "numpy~=1.26.3", + "python-dotenv~=1.0.0", + "tomli", + "rioxarray~=0.15.0", + "rq~=1.15.1", ] dynamic = ["version"] +[project.scripts] +create-mask = "isimip_files_api.scripts.create_mask:main" + [project.urls] Repository = "https://github.com/ISI-MIP/isimip-files-api" -[project.scripts] -isimip-files-api-select = "isimip_files_api.scripts:select" -isimip-files-api-mask = "isimip_files_api.scripts:mask" -isimip-files-api-cutout = "isimip_files_api.scripts:cutout" -isimip-files-api-clean = "isimip_files_api.scripts:clean" - [project.optional-dependencies] dev = [ "build", "pre-commit", "pytest", "pytest-cov", + "pytest-mock", "ruff", "twine" ] -[tool.setuptools] -packages = ["isimip_files_api"] +[tool.setuptools.packages.find] +include = ["isimip_files_api*"] +exclude = ["*tests*"] + +[tool.setuptools.package-data] +"*" = ["*"] [tool.setuptools.dynamic] version = { attr = "isimip_files_api.__version__" } @@ -95,11 +95,19 @@ known-first-party = [ section-order = [ "future", "standard-library", + "pytest", + "flask", + "rq", "third-party", "first-party", "local-folder" ] +[tool.ruff.isort.sections] +pytest = ["pytest"] +flask = ["flask"] +rq = ["rq"] + [tool.pytest.ini_options] testpaths = ["isimip_files_api"] diff --git a/testing/input/constant.nc b/testing/input/constant.nc new file mode 100644 index 0000000..5e4a231 Binary files /dev/null and b/testing/input/constant.nc differ diff --git a/testing/shapes/pm.json b/testing/shapes/pm.json new file mode 100644 index 0000000..ce9943b --- /dev/null +++ b/testing/shapes/pm.json @@ -0,0 +1,105 @@ +{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + 12.89353, + 52.46076 + ], + [ + 13.01813, + 52.35544 + ], + [ + 13.1659, + 52.39428 + ], + [ + 13.31193, + 52.39919 + ], + [ + 12.98098, + 52.1479 + ], + [ + 12.96593, + 52.04049 + ], + [ + 12.76978, + 51.97927 + ], + [ + 12.37612, + 52.04512 + ], + [ + 12.27672, + 52.10402 + ], + [ + 12.23625, + 52.17559 + ], + [ + 12.31718, + 52.4541 + ], + [ + 12.36118, + 52.44277 + ], + [ + 12.45535, + 52.34313 + ], + [ + 12.55783, + 52.39808 + ], + [ + 12.54333, + 52.42865 + ], + [ + 12.42503, + 52.45544 + ], + [ + 12.46984, + 52.54388 + ], + [ + 12.89353, + 52.46076 + ] + ] + ] + }, + "properties": { + "NUTS_ID": "DE40E", + "LEVL_CODE": 3, + "CNTR_CODE": "DE", + "NAME_LATN": "Potsdam-Mittelmark", + "NUTS_NAME": "Potsdam-Mittelmark", + "MOUNT_TYPE": 4, + "URBN_TYPE": 2, + "COAST_TYPE": 3, + "FID": "DE40E" + }, + "id": "DE40E" + } + ], + "crs": { + "type": "name", + "properties": { + "name": "urn:ogc:def:crs:EPSG::4326" + } + } +} \ No newline at end of file diff --git a/testing/shapes/pm.nc b/testing/shapes/pm.nc new file mode 100644 index 0000000..2a90ca7 Binary files /dev/null and b/testing/shapes/pm.nc differ diff --git a/testing/shapes/pm.zip b/testing/shapes/pm.zip new file mode 100644 index 0000000..f75c011 Binary files /dev/null and b/testing/shapes/pm.zip differ diff --git a/testing/shapes/wrong.txt b/testing/shapes/wrong.txt new file mode 100644 index 0000000..e69de29 diff --git a/testing/shapes/wrong.zip b/testing/shapes/wrong.zip new file mode 100644 index 0000000..041a244 Binary files /dev/null and b/testing/shapes/wrong.zip differ