diff --git a/examples/ml_pipeline.ipynb b/examples/ml_pipeline.ipynb new file mode 100644 index 00000000..d6bebdf9 --- /dev/null +++ b/examples/ml_pipeline.ipynb @@ -0,0 +1,207 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# ML Pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import geoengine as ge\n", + "from geoengine.ml import MlModelConfig\n", + "\n", + "from geoengine_openapi_client.models import MlModelMetadata, RasterDataType\n", + "\n", + "from sklearn.tree import DecisionTreeClassifier\n", + "import numpy as np\n", + "from skl2onnx import to_onnx\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "ge.initialize(\"http://localhost:3030/api\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Train a dummy model (TODO: feed with data from Geo Engine)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Predictions: [33 42]\n" + ] + } + ], + "source": [ + "np.random.seed(0) \n", + "X = np.random.rand(100, 2).astype(np.float32) # 100 instances, 2 features\n", + "y = np.where(X[:, 0] > X[:, 1], 42, 33) # 1 if feature 0 > feature 42, else 33\n", + "\n", + "clf = DecisionTreeClassifier()\n", + "clf.fit(X, y)\n", + "\n", + "test_samples = np.array([[0.1, 0.2], [0.2, 0.1]])\n", + "predictions = clf.predict(test_samples)\n", + "print(\"Predictions:\", predictions)\n", + "\n", + "# Convert into ONNX format.\n", + "from skl2onnx import to_onnx\n", + "\n", + "onx = to_onnx(clf, X[:1], target_opset=9) # target_opset is the ONNX version to use" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Register it with Geo Engine" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "\n", + "model_name = f\"{ge.get_session().user_id}:decision_tree\"\n", + "\n", + "metadata = MlModelMetadata(\n", + " file_name=\"model.onnx\",\n", + " input_type=RasterDataType.F32,\n", + " num_input_bands=2,\n", + " output_type=RasterDataType.I64,\n", + ")\n", + "\n", + "model_config = MlModelConfig(\n", + " name=model_name,\n", + " metadata=metadata,\n", + " display_name=\"Decision Tree\",\n", + " description=\"A simple decision tree model\",\n", + ")\n", + "\n", + "ge.register_ml_model(onnx_model=onx, model_config=model_config)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Apply model using the ONNX operator\n", + "\n", + "The image shows rise and fall in ndvi in 2014-04 with respect to the previous month." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/michael/git/geoengine-python/env/lib/python3.10/site-packages/owslib/coverage/wcs110.py:85: FutureWarning: The behavior of this method will change in future versions. Use specific 'len(elem)' or 'elem is not None' test instead.\n", + " elem = self._capabilities.find(self.ns.OWS('ServiceProvider')) or self._capabilities.find(self.ns.OWS('ServiceProvider')) # noqa\n" + ] + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# create raster with two bands, one ndvi of current month and one ndvi of the previous month\n", + "bands = [ge.workflow_builder.operators.GdalSource(\"ndvi\"),\n", + " ge.workflow_builder.operators.TimeShift(source=ge.workflow_builder.operators.GdalSource(\"ndvi\"), \n", + " shift_type=\"relative\", \n", + " granularity=\"months\", \n", + " value=-1)]\n", + "stack = ge.workflow_builder.operators.RasterStacker(sources = bands)\n", + "\n", + "# normalize the input to 0-1 and convert to float32\n", + "normalized = ge.workflow_builder.operators.BandwiseExpression(expression=\"x/255\", source=stack, output_type=\"F32\")\n", + "\n", + "# use the registered ml model for prediction\n", + "onnx = ge.workflow_builder.operators.Onnx(source=normalized, model=model_name)\n", + "\n", + "# convert the predictions to U8 because ONNX outputs I64 which our Gdal version output currently\n", + "converted_output = ge.workflow_builder.operators.RasterTypeConversion(source=onnx, output_data_type=\"U8\")\n", + "\n", + "workflow_dict = converted_output.to_workflow_dict()\n", + "\n", + "workflow = ge.register_workflow(workflow_dict)\n", + "\n", + "query = ge.QueryRectangle(\n", + " ge.BoundingBox2D(-180, -90, 180, 90),\n", + " ge.TimeInterval(np.datetime64('2014-04-01')),\n", + " ge.SpatialResolution(0.1, 0.1)\n", + ")\n", + "\n", + "data = workflow.get_xarray(\n", + " query\n", + ")\n", + "\n", + "data.plot()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "env", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/geoengine/__init__.py b/geoengine/__init__.py index bce610d9..a9bee083 100644 --- a/geoengine/__init__.py +++ b/geoengine/__init__.py @@ -18,6 +18,7 @@ from .layers import Layer, LayerCollection, LayerListing, LayerCollectionListing, \ LayerId, LayerCollectionId, LayerProviderId, \ layer_collection, layer +from .ml import register_ml_model, MlModelConfig, SerializableModel from .permissions import add_permission, remove_permission, add_role, remove_role, assign_role, revoke_role, \ ADMIN_ROLE_ID, REGISTERED_USER_ROLE_ID, ANONYMOUS_USER_ROLE_ID, Permission, Resource, UserId, RoleId from .tasks import Task, TaskId diff --git a/geoengine/ml.py b/geoengine/ml.py new file mode 100644 index 00000000..57252ec9 --- /dev/null +++ b/geoengine/ml.py @@ -0,0 +1,58 @@ +''' +Util functions for machine learning +''' + +from pathlib import Path +import tempfile +from typing import Protocol +from dataclasses import dataclass +from geoengine_openapi_client.models import MlModelMetadata, MlModel +import geoengine_openapi_client +from geoengine.auth import get_session +from geoengine.datasets import UploadId + + +# pylint: disable=invalid-name +class SerializableModel(Protocol): + '''A protocol for serializable models''' + + def SerializeToString(self) -> bytes: + ... + + +@dataclass +class MlModelConfig: + '''Configuration for an ml model''' + name: str + metadata: MlModelMetadata + file_name: str = "model.onnx" + display_name: str = "My Ml Model" + description: str = "My Ml Model Description" + + +def register_ml_model(onnx_model: SerializableModel, + model_config: MlModelConfig, + upload_timeout: int = 3600, + register_timeout: int = 60): + '''Uploads an onnx file and registers it as an ml model''' + + session = get_session() + + with geoengine_openapi_client.ApiClient(session.configuration) as api_client: + with tempfile.TemporaryDirectory() as temp_dir: + file_name = Path(temp_dir) / model_config.file_name + + with open(file_name, 'wb') as file: + file.write(onnx_model.SerializeToString()) + + uploads_api = geoengine_openapi_client.UploadsApi(api_client) + response = uploads_api.upload_handler([str(file_name)], + _request_timeout=upload_timeout) + + upload_id = UploadId.from_response(response) + + ml_api = geoengine_openapi_client.MLApi(api_client) + + model = MlModel(name=model_config.name, upload=str(upload_id), metadata=model_config.metadata, + display_name=model_config.display_name, description=model_config.description) + ml_api.add_ml_model(model, _request_timeout=register_timeout) diff --git a/geoengine/workflow_builder/operators.py b/geoengine/workflow_builder/operators.py index 0d01090d..aea279fd 100644 --- a/geoengine/workflow_builder/operators.py +++ b/geoengine/workflow_builder/operators.py @@ -728,7 +728,7 @@ def __init__(self, self.map_no_data = map_no_data def name(self) -> str: - return 'Expression' + return 'BandwiseExpression' def to_dict(self) -> Dict[str, Any]: params = { @@ -1157,7 +1157,7 @@ def __init__(self, source: RasterOperator, aggregate: BandNeighborhoodAggregateParams ): - '''Creates a new RasterStacker operator.''' + '''Creates a new BandNeighborhoodAggregate operator.''' self.source = source self.aggregate = aggregate @@ -1250,3 +1250,46 @@ def to_dict(self) -> Dict[str, Any]: "type": "average", "windowSize": self.window_size } + + +class Onnx(RasterOperator): + '''Onnx ML operator.''' + + source: RasterOperator + model: str + + # pylint: disable=too-many-arguments + def __init__(self, + source: RasterOperator, + model: str + ): + '''Creates a new Onnx operator.''' + self.source = source + self.model = model + + def name(self) -> str: + return 'Onnx' + + def to_dict(self) -> Dict[str, Any]: + return { + "type": self.name(), + "params": { + "model": self.model + }, + "sources": { + "raster": self.source.to_dict() + } + } + + @classmethod + def from_operator_dict(cls, operator_dict: Dict[str, Any]) -> 'Onnx': + if operator_dict["type"] != "Onnx": + raise ValueError("Invalid operator type") + + source = RasterOperator.from_operator_dict(operator_dict["sources"]["raster"]) + model = operator_dict["params"]["model"] + + return Onnx( + source=source, + model=model + ) diff --git a/setup.cfg b/setup.cfg index 25c4cfc3..2cafd8a4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ package_dir = packages = find: python_requires = >=3.8 install_requires = - geoengine-openapi-client == 0.0.11 + geoengine-openapi-client == 0.0.12 geopandas >=0.9,<0.15 matplotlib >=3.5,<3.8 numpy >=1.21,<2 @@ -35,6 +35,7 @@ install_requires = xarray >=0.19,<2024.3 urllib3 >= 2.0, < 2.3 pydantic >= 1.10.5, < 2 + skl2onnx >=1.17,<2 [options.extras_require] dev =