diff --git a/examples/pyfunc/Pyfunc.ipynb b/examples/pyfunc/Pyfunc.ipynb index 3419bb40e..ae0efbed0 100644 --- a/examples/pyfunc/Pyfunc.ipynb +++ b/examples/pyfunc/Pyfunc.ipynb @@ -35,7 +35,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, "outputs": [], "source": [ @@ -142,9 +142,24 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[12:10:08] WARNING: /Users/runner/work/xgboost/xgboost/python-package/build/temp.macosx-10.9-x86_64-cpython-37/xgboost/src/learner.cc:627: \n", + "Parameters: { \"silent\" } might not be used.\n", + "\n", + " This could be a false alarm, with some parameters getting used by language bindings but\n", + " then being mistakenly passed down to XGBoost core, or some parameter actually being used\n", + " but getting flagged wrongly here. Please open an issue if you find any such cases.\n", + "\n", + "\n" + ] + } + ], "source": [ "model_1_dir = \"xgboost-model\"\n", "BST_FILE = \"model_1.bst\"\n", @@ -174,9 +189,20 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['sklearn-model/model_2.joblib']" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "model_2_dir = \"sklearn-model\"\n", "MODEL_FILE = \"model_2.joblib\"\n", @@ -209,7 +235,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ @@ -240,15 +266,110 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'predictions': [[0.22366186074437142, 0.3047382124338789, 0.4715999044700078],\n", + " [0.43930651004457333, 0.2117506019399637, 0.3489429066419144]]}" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "m = EnsembleModel()\n", "m.initialize({\"xgb_model\": model_1_path, \"sklearn_model\": model_2_path})\n", "m.infer({\"instances\": [[1,2,3,4], [2,1,2,4]] })" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2.4 Run PyFunc Model Server Locally (Optional)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "For faster development iteration, you can also simulate running the PyFunc model server in your local machine, by calling `merlin.run_pyfunc_model` function. `run_pyfunc_model` is a blocking function that will start a PyFunc model server in port 8080 by default.\n", + "\n", + "As preqrequisites, you need to have Docker installed in your machine." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024/01/15 12:10:13 INFO mlflow.tracking.fluent: Experiment with name 'ensemblemodel' does not exist. Creating a new experiment.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Logging model to local MLflow\n", + "Building Docker image ensemblemodel-dev\n", + "Running PyFunc local server\n", + "b'/opt/conda/envs/merlin-model/lib/python3.8/site-packages/sklearn/base.py:348: InconsistentVersionWarning: Trying to unpickle estimator SVC from version 1.0.2 when using version 1.3.2. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:\\n'\n", + "b'https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations\\n'\n", + "b' warnings.warn(\\n'\n", + "b'INFO:root:Registering model: name: ensemblemodel, version: dev, fullname: ensemblemodel-dev\\n'\n", + "b'INFO:root:Listening on port 8080\\n'\n", + "b'INFO:root:Will fork 1 workers\\n'\n" + ] + } + ], + "source": [ + "merlin.run_pyfunc_model(\n", + " model_instance=EnsembleModel(), \n", + " conda_env=\"env.yaml\", \n", + " artifacts={\"xgb_model\": model_1_path, \"sklearn_model\": model_2_path},\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Once the PyFunc model server is running, you will see a log like this:\n", + "\n", + "```\n", + "Running PyFunc local server\n", + "b'INFO:root:Registering model: name: ensemblemodel, version: dev, fullname: ensemblemodel-dev\\n'\n", + "b'INFO:root:Listening on port 8080\\n'\n", + "b'INFO:root:Will fork 1 workers\\n'\n", + "```\n", + "\n", + "Now, you can send the request using curl from your terminal:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%bash\n", + "curl -X POST \"http://localhost:8080/v1/models/ensemblemodel-dev:predict\" -d '{\n", + " \"instances\": [\n", + " [2.8, 1.0, 6.8, 0.4],\n", + " [3.1, 1.4, 4.5, 1.6]\n", + " ]\n", + "}'" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -304,9 +425,11 @@ "outputs": [], "source": [ "with merlin.new_model_version() as v: \n", - " merlin.log_pyfunc_model(model_instance=EnsembleModel(), \n", - " conda_env=\"env.yaml\", \n", - " artifacts={\"xgb_model\": model_1_path, \"sklearn_model\": model_2_path})" + " merlin.log_pyfunc_model(\n", + " model_instance=EnsembleModel(), \n", + " conda_env=\"env.yaml\", \n", + " artifacts={\"xgb_model\": model_1_path, \"sklearn_model\": model_2_path},\n", + " )" ] }, { @@ -386,17 +509,91 @@ "merlin.undeploy(v)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 4. Testing PyFunc Model Server Locally" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Up until this point, you have successfully built and run your PyFunc model on the Merlin server. To have faster development iteration, you can also simulate running the PyFunc model server in your local machine, by calling `merlin.run_pyfunc_model` function." + ] + }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Logging model to local MLflow\n", + "Building Docker image ensemblemodel-dev\n", + "Running PyFunc local server\n", + "b'2024/01/09 06:58:58 WARNING mlflow.pyfunc: The version of Python that the model was saved in, `Python 3.10.13`, differs from the version of Python that is currently running, `Python 3.8.18`, and may be incompatible\\n'\n", + "b'INFO:root:Registering model: name: ensemblemodel, version: dev, fullname: ensemblemodel-dev\\n'\n", + "b'INFO:root:Listening on port 8080\\n'\n", + "b'INFO:root:Will fork 1 workers\\n'\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[5], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mmerlin\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun_pyfunc_model\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2\u001b[0m \u001b[43m \u001b[49m\u001b[43mmodel_instance\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mEnsembleModel\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\n\u001b[1;32m 3\u001b[0m \u001b[43m \u001b[49m\u001b[43mconda_env\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43menv.yaml\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[43m \u001b[49m\u001b[43martifacts\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m{\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mxgb_model\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[43mmodel_1_path\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43msklearn_model\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[43mmodel_2_path\u001b[49m\u001b[43m}\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 5\u001b[0m \u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/merlin/pyfunc.py:448\u001b[0m, in \u001b[0;36mrun_pyfunc_model\u001b[0;34m(model_instance, conda_env, code_dir, artifacts, pyfunc_base_image, port, env_vars, protocol, debug)\u001b[0m\n\u001b[1;32m 444\u001b[0m shutil\u001b[38;5;241m.\u001b[39mcopy(conda_env, dependencies_path)\n\u001b[1;32m 446\u001b[0m artifact_path \u001b[38;5;241m=\u001b[39m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mmodel_info\u001b[38;5;241m.\u001b[39mrun_id\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m/artifacts/model\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m--> 448\u001b[0m \u001b[43mrun_pyfunc_local_server\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 449\u001b[0m \u001b[43m \u001b[49m\u001b[43mcontext_path\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcontext_path\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 450\u001b[0m \u001b[43m \u001b[49m\u001b[43mdependencies_path\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdependencies_path\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 451\u001b[0m \u001b[43m \u001b[49m\u001b[43martifact_path\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43martifact_path\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 452\u001b[0m \u001b[43m \u001b[49m\u001b[43mmodel_name\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mmodel_name\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 453\u001b[0m \u001b[43m \u001b[49m\u001b[43mmodel_version\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mdev\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 454\u001b[0m \u001b[43m \u001b[49m\u001b[43mpyfunc_base_image\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mpyfunc_base_image\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 455\u001b[0m \u001b[43m \u001b[49m\u001b[43mport\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mport\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 456\u001b[0m \u001b[43m \u001b[49m\u001b[43menv_vars\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43menv_vars\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 457\u001b[0m \u001b[43m \u001b[49m\u001b[43mprotocol\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mprotocol\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 458\u001b[0m \u001b[43m \u001b[49m\u001b[43mdebug\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdebug\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 459\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/merlin/pyfunc.py:506\u001b[0m, in \u001b[0;36mrun_pyfunc_local_server\u001b[0;34m(context_path, dependencies_path, artifact_path, model_name, model_version, pyfunc_base_image, port, env_vars, protocol, debug)\u001b[0m\n\u001b[1;32m 495\u001b[0m _build_image(\n\u001b[1;32m 496\u001b[0m image_tag\u001b[38;5;241m=\u001b[39mimage_tag,\n\u001b[1;32m 497\u001b[0m context_path\u001b[38;5;241m=\u001b[39mcontext_path,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 502\u001b[0m debug\u001b[38;5;241m=\u001b[39mdebug,\n\u001b[1;32m 503\u001b[0m )\n\u001b[1;32m 505\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mRunning PyFunc local server\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m--> 506\u001b[0m \u001b[43m_run_container\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 507\u001b[0m \u001b[43m \u001b[49m\u001b[43mimage_tag\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mimage_tag\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 508\u001b[0m \u001b[43m \u001b[49m\u001b[43mmodel_name\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mmodel_name\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 509\u001b[0m \u001b[43m \u001b[49m\u001b[43mmodel_version\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mmodel_version\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 510\u001b[0m \u001b[43m \u001b[49m\u001b[43mmodel_full_name\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43mf\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;132;43;01m{\u001b[39;49;00m\u001b[43mmodel_name\u001b[49m\u001b[38;5;132;43;01m}\u001b[39;49;00m\u001b[38;5;124;43m-\u001b[39;49m\u001b[38;5;132;43;01m{\u001b[39;49;00m\u001b[43mmodel_version\u001b[49m\u001b[38;5;132;43;01m}\u001b[39;49;00m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 511\u001b[0m \u001b[43m \u001b[49m\u001b[43mport\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mport\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 512\u001b[0m \u001b[43m \u001b[49m\u001b[43menv_vars\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43menv_vars\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 513\u001b[0m \u001b[43m \u001b[49m\u001b[43mprotocol\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mprotocol\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 514\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/merlin/pyfunc.py:597\u001b[0m, in \u001b[0;36m_run_container\u001b[0;34m(image_tag, model_name, model_version, model_full_name, port, env_vars, protocol)\u001b[0m\n\u001b[1;32m 586\u001b[0m container \u001b[38;5;241m=\u001b[39m docker_client\u001b[38;5;241m.\u001b[39mcontainers\u001b[38;5;241m.\u001b[39mrun(\n\u001b[1;32m 587\u001b[0m image\u001b[38;5;241m=\u001b[39mimage_tag,\n\u001b[1;32m 588\u001b[0m name\u001b[38;5;241m=\u001b[39mmodel_name,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 593\u001b[0m remove\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m,\n\u001b[1;32m 594\u001b[0m )\n\u001b[1;32m 596\u001b[0m \u001b[38;5;66;03m# continously print docker log until the process is interrupted\u001b[39;00m\n\u001b[0;32m--> 597\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m log \u001b[38;5;129;01min\u001b[39;00m container\u001b[38;5;241m.\u001b[39mlogs(stream\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m):\n\u001b[1;32m 598\u001b[0m \u001b[38;5;28mprint\u001b[39m(log)\n\u001b[1;32m 599\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/docker/types/daemon.py:29\u001b[0m, in \u001b[0;36mCancellableStream.__next__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 27\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m__next__\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n\u001b[1;32m 28\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m---> 29\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mnext\u001b[39;49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_stream\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 30\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m urllib3\u001b[38;5;241m.\u001b[39mexceptions\u001b[38;5;241m.\u001b[39mProtocolError:\n\u001b[1;32m 31\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mStopIteration\u001b[39;00m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/docker/api/client.py:386\u001b[0m, in \u001b[0;36mAPIClient._multiplexed_response_stream_helper\u001b[0;34m(self, response)\u001b[0m\n\u001b[1;32m 383\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_disable_socket_timeout(socket)\n\u001b[1;32m 385\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m--> 386\u001b[0m header \u001b[38;5;241m=\u001b[39m \u001b[43mresponse\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mraw\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43mSTREAM_HEADER_SIZE_BYTES\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 387\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m header:\n\u001b[1;32m 388\u001b[0m \u001b[38;5;28;01mbreak\u001b[39;00m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/urllib3/response.py:879\u001b[0m, in \u001b[0;36mHTTPResponse.read\u001b[0;34m(self, amt, decode_content, cache_content)\u001b[0m\n\u001b[1;32m 876\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_decoded_buffer) \u001b[38;5;241m>\u001b[39m\u001b[38;5;241m=\u001b[39m amt:\n\u001b[1;32m 877\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_decoded_buffer\u001b[38;5;241m.\u001b[39mget(amt)\n\u001b[0;32m--> 879\u001b[0m data \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_raw_read\u001b[49m\u001b[43m(\u001b[49m\u001b[43mamt\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 881\u001b[0m flush_decoder \u001b[38;5;241m=\u001b[39m amt \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mor\u001b[39;00m (amt \u001b[38;5;241m!=\u001b[39m \u001b[38;5;241m0\u001b[39m \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m data)\n\u001b[1;32m 883\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m data \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_decoded_buffer) \u001b[38;5;241m==\u001b[39m \u001b[38;5;241m0\u001b[39m:\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/urllib3/response.py:814\u001b[0m, in \u001b[0;36mHTTPResponse._raw_read\u001b[0;34m(self, amt)\u001b[0m\n\u001b[1;32m 811\u001b[0m fp_closed \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mgetattr\u001b[39m(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_fp, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mclosed\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;28;01mFalse\u001b[39;00m)\n\u001b[1;32m 813\u001b[0m \u001b[38;5;28;01mwith\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_error_catcher():\n\u001b[0;32m--> 814\u001b[0m data \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_fp_read\u001b[49m\u001b[43m(\u001b[49m\u001b[43mamt\u001b[49m\u001b[43m)\u001b[49m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m fp_closed \u001b[38;5;28;01melse\u001b[39;00m \u001b[38;5;124mb\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 815\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m amt \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mand\u001b[39;00m amt \u001b[38;5;241m!=\u001b[39m \u001b[38;5;241m0\u001b[39m \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m data:\n\u001b[1;32m 816\u001b[0m \u001b[38;5;66;03m# Platform-specific: Buggy versions of Python.\u001b[39;00m\n\u001b[1;32m 817\u001b[0m \u001b[38;5;66;03m# Close the connection when no data is returned\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 822\u001b[0m \u001b[38;5;66;03m# not properly close the connection in all cases. There is\u001b[39;00m\n\u001b[1;32m 823\u001b[0m \u001b[38;5;66;03m# no harm in redundantly calling close.\u001b[39;00m\n\u001b[1;32m 824\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_fp\u001b[38;5;241m.\u001b[39mclose()\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/site-packages/urllib3/response.py:799\u001b[0m, in \u001b[0;36mHTTPResponse._fp_read\u001b[0;34m(self, amt)\u001b[0m\n\u001b[1;32m 796\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m buffer\u001b[38;5;241m.\u001b[39mgetvalue()\n\u001b[1;32m 797\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 798\u001b[0m \u001b[38;5;66;03m# StringIO doesn't like amt=None\u001b[39;00m\n\u001b[0;32m--> 799\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_fp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43mamt\u001b[49m\u001b[43m)\u001b[49m \u001b[38;5;28;01mif\u001b[39;00m amt \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;28;01melse\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_fp\u001b[38;5;241m.\u001b[39mread()\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/http/client.py:460\u001b[0m, in \u001b[0;36mHTTPResponse.read\u001b[0;34m(self, amt)\u001b[0m\n\u001b[1;32m 457\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;124mb\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 459\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mchunked:\n\u001b[0;32m--> 460\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_read_chunked\u001b[49m\u001b[43m(\u001b[49m\u001b[43mamt\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 462\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m amt \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 463\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mlength \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mand\u001b[39;00m amt \u001b[38;5;241m>\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mlength:\n\u001b[1;32m 464\u001b[0m \u001b[38;5;66;03m# clip the read to the \"end of response\"\u001b[39;00m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/http/client.py:583\u001b[0m, in \u001b[0;36mHTTPResponse._read_chunked\u001b[0;34m(self, amt)\u001b[0m\n\u001b[1;32m 581\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 582\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m--> 583\u001b[0m chunk_left \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_get_chunk_left\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 584\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m chunk_left \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 585\u001b[0m \u001b[38;5;28;01mbreak\u001b[39;00m\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/http/client.py:566\u001b[0m, in \u001b[0;36mHTTPResponse._get_chunk_left\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 564\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_safe_read(\u001b[38;5;241m2\u001b[39m) \u001b[38;5;66;03m# toss the CRLF at the end of the chunk\u001b[39;00m\n\u001b[1;32m 565\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 566\u001b[0m chunk_left \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_read_next_chunk_size\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 567\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m:\n\u001b[1;32m 568\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m IncompleteRead(\u001b[38;5;124mb\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m'\u001b[39m)\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/http/client.py:526\u001b[0m, in \u001b[0;36mHTTPResponse._read_next_chunk_size\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 524\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_read_next_chunk_size\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n\u001b[1;32m 525\u001b[0m \u001b[38;5;66;03m# Read the next chunk size from the file\u001b[39;00m\n\u001b[0;32m--> 526\u001b[0m line \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mreadline\u001b[49m\u001b[43m(\u001b[49m\u001b[43m_MAXLINE\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m+\u001b[39;49m\u001b[43m \u001b[49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 527\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(line) \u001b[38;5;241m>\u001b[39m _MAXLINE:\n\u001b[1;32m 528\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m LineTooLong(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mchunk size\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n", + "File \u001b[0;32m~/.pyenv/versions/3.10.13/lib/python3.10/socket.py:705\u001b[0m, in \u001b[0;36mSocketIO.readinto\u001b[0;34m(self, b)\u001b[0m\n\u001b[1;32m 703\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[1;32m 704\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 705\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_sock\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrecv_into\u001b[49m\u001b[43m(\u001b[49m\u001b[43mb\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 706\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m timeout:\n\u001b[1;32m 707\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_timeout_occurred \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "merlin.run_pyfunc_model(\n", + " model_instance=EnsembleModel(), \n", + " conda_env=\"env.yaml\", \n", + " artifacts={\"xgb_model\": model_1_path, \"sklearn_model\": model_2_path},\n", + ")" + ] + }, + { + "cell_type": "markdown", "metadata": {}, - "outputs": [], - "source": [] + "source": [ + "Once the PyFunc model server is running, you can send the requests using curl from your terminal:\n", + "\n", + "```\n", + "curl -X POST \"http://localhost:8080/v1/models/ensemblemodel-dev:predict\" -d '{\n", + " \"instances\": [\n", + " [2.8, 1.0, 6.8, 0.4],\n", + " [3.1, 1.4, 4.5, 1.6]\n", + " ]\n", + "}'\n", + "```" + ] } ], "metadata": { "kernelspec": { - "display_name": "merlin-sdk", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/examples/pyfunc/sklearn-model/model_2.joblib b/examples/pyfunc/sklearn-model/model_2.joblib index 22280815b..5754bd6a0 100644 Binary files a/examples/pyfunc/sklearn-model/model_2.joblib and b/examples/pyfunc/sklearn-model/model_2.joblib differ diff --git a/examples/pyfunc/xgboost-model/model_1.bst b/examples/pyfunc/xgboost-model/model_1.bst index 302803341..2a59ef64d 100644 Binary files a/examples/pyfunc/xgboost-model/model_1.bst and b/examples/pyfunc/xgboost-model/model_1.bst differ diff --git a/python/pyfunc-server/README.md b/python/pyfunc-server/README.md index 713f004de..6dc4cacf3 100644 --- a/python/pyfunc-server/README.md +++ b/python/pyfunc-server/README.md @@ -5,15 +5,17 @@ It leverages mlflow.pyfunc model for model loading. ## Usage -### HTTP Server +### HTTP Server Run following command to load sample `echo-model` model and start HTTP server: + ```bash PROMETHEUS_MULTIPROC_DIR=prometheus \ python -m pyfuncserver --model_dir echo-model/model ``` This will start http server at port 8080 which you can test using curl command + ```bash curl localhost:8080/v1/models/model-1:predict -H "Content-Type: application/json" -d '{}' ``` @@ -21,19 +23,19 @@ curl localhost:8080/v1/models/model-1:predict -H "Content-Type: application/json ### UPI V1 Server Run following command to load sample `echo-model` model and start UPI v1 server: + ```bash PROMETHEUS_MULTIPROC_DIR=prometheus \ CARAML_PROTOCOL=UPI_V1 \ WORKERS=2 python -m pyfuncserver --model_dir echo-model/model ``` - Since UPI v1 interface is gRPC then you can use grpcurl to send request + ```bash grpcurl -plaintext -d '{}' localhost:9000 caraml.upi.v1.UniversalPredictionService/PredictValues ``` - ## Development Requirements: @@ -54,48 +56,29 @@ make test ``` To run benchmark -```bash -make benchmark -``` - -## Building Docker Image - -To create docker image locally you'll need to first download model artifact. ```bash -gsutil cp -r gs://bucket-name/mlflow/11/68eb8538374c4053b3ecad99a44170bd/artifacts/model . -``` - -Build the docker image - -```bash -docker build -t mymodel:latest -f docker/local.Dockerfile . -``` - -And run the model service - -```bash -docker run -e MODEL_NAME=model -p 8080:8080 mymodel:latest +make benchmark ``` ## Configuration Pyfunc server can be configured via following environment variables -| Environment Variable | Description | -| ------------------------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| CARAML_PROTOCOL | Protocol to be used, the valid values are `HTTP_JSON` and `UPI_V1` | -| CARAML_HTTP_PORT | Pyfunc server will start http server listening to this port when `CARAML_PROTOCOL` = `HTTP_JSON` | -| CARAML_GRPC_PORT | Pyfunc server will start grpc server listening to this port when `CARAML_PROTOCOL` = `UPI_V1` | -| CARAML_MODEL_NAME | Model name | -| CARAML_MODEL_VERSION | Model version | -| CARAML_MODEL_FULL_NAME | Model full name in the format of `${CARAML_MODEL_NAME}-${CARAML_MODEL_FULL_NAME}` | -| WORKERS | Number of Python processes that will be created to allow multi processing (default = 1) | -| LOG_LEVEL | Log level, valid values are `INFO`, `ERROR`, `DEBUG`, `WARN`, `CRITICAL` (default='INFO') | -| GRPC_OPTIONS | GRPC options to configure UPI server as json string. The possible options can be found in [grpc_types.h](https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h). Example: '{"grpc.max_concurrent_streams":100}' | -| GRPC_CONCURRENCY | Size of grpc handler threadpool per worker (default = 10) | -| PUSHGATEWAY_ENABLED | Enable pushing metrics to prometheus push gateway, only available when `CARAML_PROTOCOL` is set to `UPI_V1` (default = false) | -| PUSHGATEWAY_URL | Url of the prometheus push gateway (default = localhost:9091) | +| Environment Variable | Description | +| ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| CARAML_PROTOCOL | Protocol to be used, the valid values are `HTTP_JSON` and `UPI_V1` | +| CARAML_HTTP_PORT | Pyfunc server will start http server listening to this port when `CARAML_PROTOCOL` = `HTTP_JSON` | +| CARAML_GRPC_PORT | Pyfunc server will start grpc server listening to this port when `CARAML_PROTOCOL` = `UPI_V1` | +| CARAML_MODEL_NAME | Model name | +| CARAML_MODEL_VERSION | Model version | +| CARAML_MODEL_FULL_NAME | Model full name in the format of `${CARAML_MODEL_NAME}-${CARAML_MODEL_FULL_NAME}` | +| WORKERS | Number of Python processes that will be created to allow multi processing (default = 1) | +| LOG_LEVEL | Log level, valid values are `INFO`, `ERROR`, `DEBUG`, `WARN`, `CRITICAL` (default='INFO') | +| GRPC_OPTIONS | GRPC options to configure UPI server as json string. The possible options can be found in [grpc_types.h](https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h). Example: '{"grpc.max_concurrent_streams":100}' | +| GRPC_CONCURRENCY | Size of grpc handler threadpool per worker (default = 10) | +| PUSHGATEWAY_ENABLED | Enable pushing metrics to prometheus push gateway, only available when `CARAML_PROTOCOL` is set to `UPI_V1` (default = false) | +| PUSHGATEWAY_URL | Url of the prometheus push gateway (default = localhost:9091) | | PUSHGATEWAY_PUSH_INTERVAL_SEC | Interval in seconds for pushing metrics to prometheus push gateway (default = 30) | ## Directory Structure @@ -104,10 +87,8 @@ Pyfunc server can be configured via following environment variables ├── benchmark <- Benchmarking artifacts ├── docker <- Dockerfiles and environment files ├── Dockerfile <- Dockerfile that will be used by kaniko to build user image in the cluster - ├── base.Dockerfile <- Base docker image that will be used by `Dockerfile` and `local.Dockerfile` - ├── local.Dockerfile <- Dockerfile that can be used to perform local testing - ├── envXY.yaml <- Conda environment for python version X.Y that will be created within `base.Dockerfile` -├── echo-model <- Simple model for testing + ├── base.Dockerfile <- Base docker image that will be used by `Dockerfile` +├── examples <- Examples of PyFunc models implementation ├── test <- Test package ├── pyfuncserver <- Source code of this workflow │ ├── __main__.py <- Entry point of pyfuncserver @@ -120,10 +101,10 @@ Pyfunc server can be configured via following environment variables │ └── rest <- Server implementation for HTTP_JSON protocol │ └── upi <- Server implementation for UPI_V1 protocol ├── .gitignore -├── Makefile <- Makefile +├── Makefile <- Makefile ├── README.md <- The top-level README for developers using this project. ├── requirements.txt <- pyfuncserver dependencies ├── setup.py <- setup.py ├── run.sh <- Script to activate `merlin-model` environment and run pyfuncserver when `docker run` is invoked -``` \ No newline at end of file +``` diff --git a/python/pyfunc-server/docker/local.Dockerfile b/python/pyfunc-server/docker/local.Dockerfile deleted file mode 100644 index f91f8b8ea..000000000 --- a/python/pyfunc-server/docker/local.Dockerfile +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2020 The Merlin Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -ARG BASE_IMAGE - -FROM ${BASE_IMAGE} - -WORKDIR /pyfunc-server -COPY pyfunc-server/echo-model/model model -RUN /bin/bash -c ". activate merlin-model && \ - sed -i 's/pip$/pip=20.2.4/' model/conda.yaml && \ - conda env update --name merlin-model --file model/conda.yaml && \ - python -m pyfuncserver --model_dir model --dry_run" - -CMD ["/bin/bash", "./run.sh"] diff --git a/python/pyfunc-server/echo-model/model/MLmodel b/python/pyfunc-server/echo-model/model/MLmodel deleted file mode 100644 index a93443686..000000000 --- a/python/pyfunc-server/echo-model/model/MLmodel +++ /dev/null @@ -1,10 +0,0 @@ -artifact_path: model -flavors: - python_function: - cloudpickle_version: 2.0.0 - env: conda.yaml - loader_module: mlflow.pyfunc.model - python_model: python_model.pkl - python_version: 3.7.9 -run_id: 0937fb78c9b2477ab49625eb2b6b02c4 -utc_time_created: '2022-09-14 06:27:38.786852' diff --git a/python/pyfunc-server/echo-model/model/conda.yaml b/python/pyfunc-server/echo-model/model/conda.yaml deleted file mode 100644 index 043c4e983..000000000 --- a/python/pyfunc-server/echo-model/model/conda.yaml +++ /dev/null @@ -1,8 +0,0 @@ -channels: -- defaults -dependencies: -- python=3.7.9 -- pip: - - mlflow - - cloudpickle==2.0.0 -name: mlflow-env diff --git a/python/pyfunc-server/echo-model/model/python_model.pkl b/python/pyfunc-server/echo-model/model/python_model.pkl deleted file mode 100644 index d599e7b0e..000000000 Binary files a/python/pyfunc-server/echo-model/model/python_model.pkl and /dev/null differ diff --git a/python/pyfunc-server/examples/__init__.py b/python/pyfunc-server/examples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/pyfunc-server/examples/echo_http/__init__.py b/python/pyfunc-server/examples/echo_http/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/pyfunc-server/examples/echo_http/echo_http.py b/python/pyfunc-server/examples/echo_http/echo_http.py new file mode 100644 index 000000000..83a455474 --- /dev/null +++ b/python/pyfunc-server/examples/echo_http/echo_http.py @@ -0,0 +1,21 @@ +import logging + +import merlin +from merlin.model import PyFuncModel + + +class EchoModel(PyFuncModel): + def initialize(self, artifacts): + pass + + def infer(self, request): + logging.info("request: %s", request) + return request + + +if __name__ == "__main__": + # Run pyfunc model locally without uploading to Merlin server + merlin.run_pyfunc_model( + model_instance=EchoModel(), + conda_env="env.yaml", + ) diff --git a/python/pyfunc-server/examples/echo_http/env.yaml b/python/pyfunc-server/examples/echo_http/env.yaml new file mode 100644 index 000000000..f035843cc --- /dev/null +++ b/python/pyfunc-server/examples/echo_http/env.yaml @@ -0,0 +1,2 @@ +dependencies: + - python=3.10 diff --git a/python/pyfunc-server/examples/echo_upi/README.md b/python/pyfunc-server/examples/echo_upi/README.md new file mode 100644 index 000000000..e0cf3ac34 --- /dev/null +++ b/python/pyfunc-server/examples/echo_upi/README.md @@ -0,0 +1,13 @@ +# Echo UPI Model Examples + +Run the server locally: + +``` +python upi_server.py +``` + +In different terminal session, run the client: + +``` +python upi_client.py +``` diff --git a/python/pyfunc-server/examples/echo_upi/__init__.py b/python/pyfunc-server/examples/echo_upi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/pyfunc-server/examples/echo_upi/env.yaml b/python/pyfunc-server/examples/echo_upi/env.yaml new file mode 100644 index 000000000..f035843cc --- /dev/null +++ b/python/pyfunc-server/examples/echo_upi/env.yaml @@ -0,0 +1,2 @@ +dependencies: + - python=3.10 diff --git a/python/pyfunc-server/examples/echo_upi/upi_client.py b/python/pyfunc-server/examples/echo_upi/upi_client.py new file mode 100644 index 000000000..1d55effa3 --- /dev/null +++ b/python/pyfunc-server/examples/echo_upi/upi_client.py @@ -0,0 +1,29 @@ +import grpc +import pandas as pd +from caraml.upi.utils import df_to_table +from caraml.upi.v1 import upi_pb2, upi_pb2_grpc + + +def create_upi_request() -> upi_pb2.PredictValuesRequest: + target_name = "echo" + df = pd.DataFrame( + [[4, 1, "hi"]] * 3, + columns=["int_value", "int_value_2", "string_value"], + index=["0000", "1111", "2222"], + ) + prediction_id = "12345" + + return upi_pb2.PredictValuesRequest( + target_name=target_name, + prediction_table=df_to_table(df, "predict"), + metadata=upi_pb2.RequestMetadata(prediction_id=prediction_id), + ) + + +if __name__ == "__main__": + channel = grpc.insecure_channel(f"localhost:8080") + stub = upi_pb2_grpc.UniversalPredictionServiceStub(channel) + + request = create_upi_request() + response = stub.PredictValues(request=request) + print(response) diff --git a/python/pyfunc-server/echo-model/__init__.py b/python/pyfunc-server/examples/echo_upi/upi_server.py similarity index 66% rename from python/pyfunc-server/echo-model/__init__.py rename to python/pyfunc-server/examples/echo_upi/upi_server.py index 39138bd53..57b320fd5 100644 --- a/python/pyfunc-server/echo-model/__init__.py +++ b/python/pyfunc-server/examples/echo_upi/upi_server.py @@ -1,11 +1,11 @@ +import logging import os -from typing import List import grpc -import mlflow -import logging +import merlin from caraml.upi.v1 import upi_pb2 from merlin.model import PyFuncModel +from merlin.protocol import Protocol from prometheus_client import Counter, Gauge @@ -20,8 +20,9 @@ def initialize(self, artifacts: dict): self._req_count = Counter("request_count", "Number of incoming request") self._temp = Gauge("some_gauge", "Number of incoming request") - def upiv1_infer(self, request: upi_pb2.PredictValuesRequest, - context: grpc.ServicerContext) -> upi_pb2.PredictValuesResponse: + def upiv1_infer( + self, request: upi_pb2.PredictValuesRequest, context: grpc.ServicerContext + ) -> upi_pb2.PredictValuesResponse: logging.info(f"PID: {os.getpid()}") return upi_pb2.PredictValuesResponse( prediction_result_table=request.prediction_table, @@ -30,12 +31,21 @@ def upiv1_infer(self, request: upi_pb2.PredictValuesRequest, metadata=upi_pb2.ResponseMetadata( prediction_id=request.metadata.prediction_id, # TODO: allow user to get model name and version from PyFuncModel - models=[upi_pb2.ModelMetadata(name=self._model_name, version=self._model_version)] - ) + models=[ + upi_pb2.ModelMetadata( + name=self._model_name, version=self._model_version + ) + ], + ), ) if __name__ == "__main__": model_name = "echo-model" model_version = "1" - mlflow.pyfunc.log_model("model", python_model=EchoUPIModel(model_name, model_version)) + + merlin.run_pyfunc_model( + model_instance=EchoUPIModel(model_name, model_version), + conda_env="env.yaml", + protocol=Protocol.UPI_V1, + ) diff --git a/python/pyfunc-server/examples/iris_http/__init__.py b/python/pyfunc-server/examples/iris_http/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/pyfunc-server/examples/iris_http/env.yaml b/python/pyfunc-server/examples/iris_http/env.yaml new file mode 100644 index 000000000..7ca8e4e89 --- /dev/null +++ b/python/pyfunc-server/examples/iris_http/env.yaml @@ -0,0 +1,7 @@ +dependencies: + - python=3.10.* + - pip: + - joblib>=0.13.0,<1.2.0 # >=1.2.0 upon upgrade of kserve's version + - numpy<=1.23.5 # Temporary pin numpy due to https://numpy.org/doc/stable/release/1.20.0-notes.html#numpy-1-20-0-release-notes + - scikit-learn>=1.1.2 + - xgboost==1.6.2 diff --git a/python/pyfunc-server/examples/iris_http/iris_http.py b/python/pyfunc-server/examples/iris_http/iris_http.py new file mode 100644 index 000000000..6b87d2f8d --- /dev/null +++ b/python/pyfunc-server/examples/iris_http/iris_http.py @@ -0,0 +1,75 @@ +import os + +import joblib +import merlin +import numpy as np +import xgboost as xgb +from joblib import dump +from merlin.model import PyFuncModel +from sklearn import svm +from sklearn.datasets import load_iris + +XGB_PATH = os.path.join("models/", "model_1.bst") +SKLEARN_PATH = os.path.join("models/", "model_2.joblib") + + +class IrisModel(PyFuncModel): + def initialize(self, artifacts): + self._model_1 = xgb.Booster(model_file=artifacts["xgb_model"]) + self._model_2 = joblib.load(artifacts["sklearn_model"]) + + def infer(self, model_input): + inputs = np.array(model_input["instances"]) + dmatrix = xgb.DMatrix(model_input["instances"]) + result_1 = self._model_1.predict(dmatrix) + result_2 = self._model_2.predict_proba(inputs) + return {"predictions": ((result_1 + result_2) / 2).tolist()} + + +def train_models(): + iris = load_iris() + y = iris["target"] + X = iris["data"] + + # train xgboost model + dtrain = xgb.DMatrix(X, label=y) + param = { + "max_depth": 6, + "eta": 0.1, + "silent": 1, + "nthread": 4, + "num_class": 3, + "objective": "multi:softprob", + } + xgb_model = xgb.train(params=param, dtrain=dtrain) + xgb_model.save_model(XGB_PATH) + + # train sklearn model + clf = svm.SVC(gamma="scale", probability=True) + clf.fit(X, y) + dump(clf, SKLEARN_PATH) + + +if __name__ == "__main__": + train_models() + + # test pyfunc model locally + iris_model = IrisModel() + iris_model.initialize( + artifacts={ + "xgb_model": XGB_PATH, + "sklearn_model": SKLEARN_PATH, + } + ) + pred = iris_model.infer({"instances": [[2.8, 1.0, 6.8, 0.4], [3.1, 1.4, 4.5, 1.6]]}) + print(pred) + + # run pyfunc model locally + merlin.run_pyfunc_model( + model_instance=IrisModel(), + conda_env="env.yaml", + artifacts={ + "xgb_model": XGB_PATH, + "sklearn_model": SKLEARN_PATH, + }, + ) diff --git a/python/pyfunc-server/examples/iris_http/models/model_1.bst b/python/pyfunc-server/examples/iris_http/models/model_1.bst new file mode 100644 index 000000000..2a59ef64d Binary files /dev/null and b/python/pyfunc-server/examples/iris_http/models/model_1.bst differ diff --git a/python/pyfunc-server/examples/iris_http/models/model_2.joblib b/python/pyfunc-server/examples/iris_http/models/model_2.joblib new file mode 100644 index 000000000..65c3c078c Binary files /dev/null and b/python/pyfunc-server/examples/iris_http/models/model_2.joblib differ diff --git a/python/pyfunc-server/pyfuncserver/protocol/rest/server.py b/python/pyfunc-server/pyfuncserver/protocol/rest/server.py index 247461089..0b974e4c4 100644 --- a/python/pyfunc-server/pyfuncserver/protocol/rest/server.py +++ b/python/pyfunc-server/pyfuncserver/protocol/rest/server.py @@ -1,16 +1,14 @@ import asyncio import logging -import os import signal -import sys import tornado from prometheus_client import CollectorRegistry - from pyfuncserver.config import Config from pyfuncserver.metrics.handler import MetricsHandler from pyfuncserver.model.model import PyFuncModel -from pyfuncserver.protocol.rest.handler import HealthHandler, LivenessHandler, PredictHandler +from pyfuncserver.protocol.rest.handler import (HealthHandler, LivenessHandler, + PredictHandler) from pyfuncserver.publisher.kafka import KafkaProducer from pyfuncserver.publisher.publisher import Publisher from pyfuncserver.sampler.sampler import RatioSampling @@ -26,24 +24,37 @@ async def sig_handler(server): class Application(tornado.web.Application): - def __init__(self, config: Config, metrics_registry: CollectorRegistry, registered_models: dict): + def __init__( + self, + config: Config, + metrics_registry: CollectorRegistry, + registered_models: dict, + ): self.publisher = None self.model_manifest = config.model_manifest handlers = [ # Server Liveness API returns 200 if server is alive. (r"/", LivenessHandler), # Model Health API returns 200 if model is ready to serve. - (r"/v1/models/([a-zA-Z0-9_-]+)", - HealthHandler, dict(models=registered_models)), - (r"/v1/models/([a-zA-Z0-9_-]+):predict", - PredictHandler, dict(models=registered_models)), - (r"/metrics", MetricsHandler, dict(metrics_registry=metrics_registry)) + ( + r"/v1/models/([a-zA-Z0-9_-]+)", + HealthHandler, + dict(models=registered_models), + ), + ( + r"/v1/models/([a-zA-Z0-9_-]+):predict", + PredictHandler, + dict(models=registered_models), + ), + (r"/metrics", MetricsHandler, dict(metrics_registry=metrics_registry)), ] - super().__init__(handlers) # type: ignore # noqa + super().__init__(handlers) # type: ignore # noqa class HTTPServer: - def __init__(self, model: PyFuncModel, config: Config, metrics_registry: CollectorRegistry): + def __init__( + self, model: PyFuncModel, config: Config, metrics_registry: CollectorRegistry + ): self.config = config self.workers = config.workers self.model_manifest = config.model_manifest @@ -65,17 +76,26 @@ def start(self): # kafka producer must be initialize after fork the process if self.config.publisher is not None: - kafka_producer = KafkaProducer(self.config.publisher, self.config.model_manifest) + kafka_producer = KafkaProducer( + self.config.publisher, self.config.model_manifest + ) sampler = RatioSampling(self.config.publisher.sampling_ratio) application.publisher = Publisher(kafka_producer, sampler) - for signame in ('SIGINT', 'SIGTERM'): - asyncio.get_event_loop().add_signal_handler(getattr(signal, signame), - lambda: asyncio.create_task(sig_handler(self._http_server))) + for signame in ("SIGINT", "SIGTERM"): + asyncio.get_event_loop().add_signal_handler( + getattr(signal, signame), + lambda: asyncio.create_task(sig_handler(self._http_server)), + ) tornado.ioloop.IOLoop.current().start() def register_model(self, model: PyFuncModel): self.registered_models[model.full_name] = model - logging.info("Registering model: name: %s, version: %s, fullname: %s", model.name, model.version, - model.full_name) + logging.info( + "Registering model: name: %s, version: %s, fullname: %s, predict endpoint: /v1/models/%s:predict", + model.name, + model.version, + model.full_name, + model.full_name, + ) diff --git a/python/pyfunc-server/setup.py b/python/pyfunc-server/setup.py index ac1a4fe07..07cee86cf 100644 --- a/python/pyfunc-server/setup.py +++ b/python/pyfunc-server/setup.py @@ -13,19 +13,23 @@ # limitations under the License. import os -from setuptools import setup, find_packages + +from setuptools import find_packages, setup tests_require = [ - 'pytest', - 'pytest-tornasync', - 'requests', - 'types-requests', - 'types-protobuf', - 'mypy', - 'pytest-benchmark', + "joblib>=0.13.0,<1.2.0", # >=1.2.0 upon upgrade of kserve's version + "mypy", + "pytest-benchmark", + "pytest-tornasync", + "pytest", + "requests", + "scikit-learn>=1.1.2", + "types-protobuf", + "types-requests", + "xgboost==1.6.2", ] -with open('requirements.txt') as f: +with open("requirements.txt") as f: REQUIRE = f.read().splitlines() @@ -36,18 +40,20 @@ merlin_sdk_package = "merlin-sdk" for index, item in enumerate(REQUIRE): if merlin_sdk_package in item: - REQUIRE[index] = f"{merlin_sdk_package} @ file://localhost/{merlin_path}#egg={merlin_sdk_package}" + REQUIRE[ + index + ] = f"{merlin_sdk_package} @ file://localhost/{merlin_path}#egg={merlin_sdk_package}" setup( - name='pyfuncserver', - version='0.6.0', - author_email='merlin-dev@gojek.com', - description='Model Server implementation for mlflow pyfunc model', - long_description=open('README.md').read(), - long_description_content_type='text/markdown', - python_requires='>=3.8,<3.11', + name="pyfuncserver", + version="0.6.0", + author_email="merlin-dev@gojek.com", + description="Model Server implementation for mlflow pyfunc model", + long_description=open("README.md").read(), + long_description_content_type="text/markdown", + python_requires=">=3.8,<3.11", packages=find_packages(exclude=["test"]), install_requires=REQUIRE, tests_require=tests_require, - extras_require={'test': tests_require} + extras_require={"test": tests_require}, ) diff --git a/python/pyfunc-server/test/test_examples.py b/python/pyfunc-server/test/test_examples.py new file mode 100644 index 000000000..2408acf29 --- /dev/null +++ b/python/pyfunc-server/test/test_examples.py @@ -0,0 +1,90 @@ +import os +import random +import socket +import time +from multiprocessing import Process + +import merlin +import pytest +import requests +from examples.iris_http.iris_http import IrisModel + +request_json = {"instances": [[2.8, 1.0, 6.8, 0.4], [3.1, 1.4, 4.5, 1.6]]} + +if os.environ.get("CI_SERVER"): + host = "172.17.0.1" +else: + host = "localhost" + + +def _get_free_port(): + sock = None + try: + while True: + port = random.randint(8000, 9000) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(("127.0.0.1", port)) + if result != 0: + return port + finally: + if sock is not None: + sock.close() + + +def _wait_server_ready(proc, url, timeout_second=600, tick_second=10): + time.sleep(5) + + ellapsed_second = 0 + while ellapsed_second < timeout_second: + if not proc.is_alive(): + if proc.exitcode is not None and proc.exitcode != 0: + raise RuntimeError("server failed to start") + + try: + resp = requests.get(url) + if resp.status_code == 200: + return + except Exception as e: + print(f"{url} is not ready: {e}") + + time.sleep(tick_second) + ellapsed_second += tick_second + + if ellapsed_second >= timeout_second: + raise TimeoutError("server is not ready within specified timeout duration") + + +def _get_local_endpoint(model_full_name, port): + return f"http://{host}:{port}/v1/models/{model_full_name}:predict" + + +@pytest.mark.local_server_test +def test_examples_iris(): + XGB_PATH = os.path.join("examples/iris_http/models/", "model_1.bst") + SKLEARN_PATH = os.path.join("examples/iris_http/models/", "model_2.joblib") + + port = _get_free_port() + + p = Process( + target=merlin.run_pyfunc_model, + kwargs={ + "model_instance": IrisModel(), + "conda_env": "examples/iris_http/env.yaml", + "code_dir": ["examples"], + "artifacts": { + "xgb_model": XGB_PATH, + "sklearn_model": SKLEARN_PATH, + }, + "debug": True, + "port": port, + }, + ) + p.start() + + _wait_server_ready(p, f"http://{host}:{port}", timeout_second=600) + + resp = requests.post(_get_local_endpoint("irismodel-dev", port), json=request_json) + assert resp.status_code == 200 + assert resp.json() is not None + assert len(resp.json()["predictions"]) == len(request_json["instances"]) + p.terminate() diff --git a/python/sdk/merlin/__init__.py b/python/sdk/merlin/__init__.py index ac45482d1..27310523c 100644 --- a/python/sdk/merlin/__init__.py +++ b/python/sdk/merlin/__init__.py @@ -17,15 +17,15 @@ from __future__ import absolute_import -import sys import signal +import sys + +import merlin.autoscaling +import merlin.deployment_mode import merlin.fluent import merlin.resource_request -import merlin.deployment_mode -import merlin.autoscaling from merlin.version import VERSION as __version__ - # Merlin URL set_url = merlin.fluent.set_url get_url = merlin.fluent.get_url @@ -68,7 +68,7 @@ undeploy = merlin.fluent.undeploy # Model serving -set_traffic = merlin.fluent.set_traffic # deprecated +set_traffic = merlin.fluent.set_traffic # deprecated serve_traffic = merlin.fluent.serve_traffic stop_serving_traffic = merlin.fluent.stop_serving_traffic @@ -80,25 +80,49 @@ DeploymentMode = merlin.deployment_mode.DeploymentMode AutoscalingPolicy = merlin.autoscaling.AutoscalingPolicy MetricsType = merlin.autoscaling.MetricsType + # Batch create_prediction_job = merlin.fluent.create_prediction_job +# Run server locally +run_pyfunc_model = merlin.pyfunc.run_pyfunc_model __all__ = [ - "set_url", "get_url", - "list_project", "set_project", "active_project", - "list_environment", "get_environment", "get_default_environment", - "set_model", "active_model", - "new_model_version", "log_param", "log_metric", "set_tag", "delete_tag", - "log_artifact", "log_pyfunc_model", "log_pytorch_model", "log_model", - "deploy", "undeploy", - "set_traffic", "serve_traffic", - "ResourceRequest", "DeploymentMode", "AutoscalingPolicy", "MetricsType", - "create_prediction_job" + "set_url", + "get_url", + "list_project", + "set_project", + "active_project", + "list_environment", + "get_environment", + "get_default_environment", + "set_model", + "active_model", + "new_model_version", + "log_param", + "log_metric", + "set_tag", + "delete_tag", + "log_artifact", + "log_pyfunc_model", + "log_pytorch_model", + "log_model", + "deploy", + "undeploy", + "set_traffic", + "serve_traffic", + "ResourceRequest", + "DeploymentMode", + "AutoscalingPolicy", + "MetricsType", + "create_prediction_job", + "run_pyfunc_model", ] + def sigterm_handler(_signo, _stack_frame): # Raises SystemExit(0): sys.exit(0) -signal.signal(signal.SIGTERM, sigterm_handler) \ No newline at end of file + +signal.signal(signal.SIGTERM, sigterm_handler) diff --git a/python/sdk/merlin/docker/docker.py b/python/sdk/merlin/docker/docker.py index 298bcf249..7e7b4b333 100644 --- a/python/sdk/merlin/docker/docker.py +++ b/python/sdk/merlin/docker/docker.py @@ -13,14 +13,16 @@ # limitations under the License. import os -import shutil import pathlib +import re +import shutil + +from docker.errors import BuildError try: import importlib.resources as pkg_resources except ImportError: # Try backported to PY<37 `importlib_resources`. - import importlib_resources as pkg_resources # type: ignore @@ -54,3 +56,21 @@ def _copy_dockerfile(dst, dockerfile_name): full_path = os.path.join(package_path, dockerfile_name) shutil.copy(full_path, dst_dockerfile_path) return dst_dockerfile_path + + +def wait_build_complete(logs, debug: bool = False): + for chunk in logs: + if "error" in chunk: + raise BuildError(chunk["error"], logs) + if "stream" in chunk: + if debug: + print(chunk["stream"].replace("\n", "")) + + match = re.search( + r"(^Successfully built |sha256:)([0-9a-f]+)$", chunk["stream"] + ) + if match: + image_id = match.group(2) + if image_id: + return + raise BuildError("Unknown", logs) diff --git a/python/sdk/merlin/docker/pyfunc.Dockerfile b/python/sdk/merlin/docker/pyfunc.Dockerfile index bda870e25..ae49e30fa 100644 --- a/python/sdk/merlin/docker/pyfunc.Dockerfile +++ b/python/sdk/merlin/docker/pyfunc.Dockerfile @@ -12,18 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -ARG BASE_IMAGE -ARG MODEL_PATH - +ARG BASE_IMAGE=ghcr.io/caraml-dev/merlin/merlin-pyfunc-base:0.38.1 FROM ${BASE_IMAGE} -COPY ${MODEL_PATH} /model +# Download and install user model dependencies +ARG MODEL_DEPENDENCIES_URL +COPY ${MODEL_DEPENDENCIES_URL} conda.yaml +RUN conda env create --name merlin-model --file conda.yaml + +# Copy and install pyfunc-server and merlin-sdk dependencies +COPY merlin/python/pyfunc-server /pyfunc-server +COPY merlin/python/sdk /sdk +ENV SDK_PATH=/sdk -# Disable pip caching to minimize image size -ENV PIP_NO_CACHE_DIR=1 +WORKDIR /pyfunc-server +RUN /bin/bash -c ". activate merlin-model && pip uninstall -y merlin-sdk && pip install -r /pyfunc-server/requirements.txt" -RUN conda env create --name model_env -f ./model/conda.yaml -RUN /bin/bash -c ". activate model_env && \ - pip install -e . " +# Download and dry-run user model artifacts and code +ARG MODEL_ARTIFACTS_URL +COPY ${MODEL_ARTIFACTS_URL} model +RUN /bin/bash -c ". activate merlin-model && python -m pyfuncserver --model_dir model --dry_run" -CMD ["/bin/bash", "./run.sh"] +CMD ["/bin/bash", "/pyfunc-server/run.sh"] diff --git a/python/sdk/merlin/fluent.py b/python/sdk/merlin/fluent.py index d029ab1a2..004ae88d8 100644 --- a/python/sdk/merlin/fluent.py +++ b/python/sdk/merlin/fluent.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + from contextlib import contextmanager from typing import Any, Dict, List, Optional @@ -297,7 +298,6 @@ def log_pyfunc_model( :param model_instance: instance of python function model :param conda_env: path to conda env.yaml file :param code_dir: additional code directory that will be loaded with ModelType.PYFUNC model - :param code_dir: additional code directory to be uploaded :param artifacts: dictionary of artifact that will be stored together with the model. This will be passed to PythonModel.initialize. Example: {"config": "config/staging.yaml"} """ _check_active_model_version() @@ -357,23 +357,23 @@ def deploy( deployment_mode: DeploymentMode = None, autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, - enable_model_observability: bool = False + enable_model_observability: bool = False, ) -> VersionEndpoint: """ Deploy a model version. - :param model_version: If model_version is not given it will deploy active model version - :param environment_name: target environment to which the model version will be deployed to. If left empty it will deploy to default environment. - :param resource_request: The resource requirement and replicas requests for model version endpoint. - :param image_builder_resource_request: The resource requirement and replicas requests for image builder job. - :param env_vars: List of environment variables to be passed to the model container. - :param transformer: The service to be deployed alongside the model for pre/post-processing steps. - :param logger: Response/Request logging configuration for model or transformer. - :param deployment_mode: mode of deployment for the endpoint (default: DeploymentMode.SERVERLESS) - :param autoscaling_policy: autoscaling policy to be used for the deployment (default: None) - :param protocol: protocol to be used by the deployed model (default: HTTP_JSON) - :param enable_model_observability: flag to determine whether model observability enabled for the endpoint - :return: VersionEndpoint object + :param model_version: If model_version is not given it will deploy active model version + :param environment_name: target environment to which the model version will be deployed to. If left empty it will deploy to default environment. + :param resource_request: The resource requirement and replicas requests for model version endpoint. + :param image_builder_resource_request: The resource requirement and replicas requests for image builder job. + :param env_vars: List of environment variables to be passed to the model container. + :param transformer: The service to be deployed alongside the model for pre/post-processing steps. + :param logger: Response/Request logging configuration for model or transformer. + :param deployment_mode: mode of deployment for the endpoint (default: DeploymentMode.SERVERLESS) + :param autoscaling_policy: autoscaling policy to be used for the deployment (default: None) + :param protocol: protocol to be used by the deployed model (default: HTTP_JSON) + :param enable_model_observability: flag to determine whether model observability enabled for the endpoint + :return: VersionEndpoint object """ _check_active_client() if model_version is None: @@ -388,7 +388,7 @@ def deploy( deployment_mode, autoscaling_policy, protocol, - enable_model_observability + enable_model_observability, ) return _merlin_client.deploy( # type: ignore @@ -402,7 +402,7 @@ def deploy( deployment_mode, autoscaling_policy, protocol, - enable_model_observability + enable_model_observability, ) diff --git a/python/sdk/merlin/model.py b/python/sdk/merlin/model.py index 33d08658e..357440d41 100644 --- a/python/sdk/merlin/model.py +++ b/python/sdk/merlin/model.py @@ -24,40 +24,49 @@ from time import sleep from typing import Any, Dict, List, Optional, Tuple, Union +import client import docker import mlflow import pyprind import yaml +from client import ( + EndpointApi, + EnvironmentApi, + ModelEndpointsApi, + ModelsApi, + SecretApi, + VersionApi, +) from docker import APIClient -from docker.errors import BuildError from docker.models.containers import Container -from mlflow.entities import Run, RunData -from mlflow.exceptions import MlflowException -from mlflow.pyfunc import PythonModel - -import client -from client import (EndpointApi, EnvironmentApi, ModelEndpointsApi, ModelsApi, - SecretApi, VersionApi) from merlin import pyfunc -from merlin.autoscaling import (RAW_DEPLOYMENT_DEFAULT_AUTOSCALING_POLICY, - SERVERLESS_DEFAULT_AUTOSCALING_POLICY, - AutoscalingPolicy) +from merlin.autoscaling import ( + RAW_DEPLOYMENT_DEFAULT_AUTOSCALING_POLICY, + SERVERLESS_DEFAULT_AUTOSCALING_POLICY, + AutoscalingPolicy, +) from merlin.batch.config import PredictionJobConfig from merlin.batch.job import PredictionJob from merlin.batch.sink import BigQuerySink from merlin.batch.source import BigQuerySource from merlin.deployment_mode import DeploymentMode -from merlin.docker.docker import (copy_pyfunc_dockerfile, - copy_standard_dockerfile) +from merlin.docker.docker import copy_standard_dockerfile, wait_build_complete from merlin.endpoint import ModelEndpoint, Status, VersionEndpoint from merlin.logger import Logger from merlin.protocol import Protocol +from merlin.pyfunc import run_pyfunc_local_server from merlin.resource_request import ResourceRequest from merlin.transformer import Transformer -from merlin.util import (autostr, download_files_from_gcs, guess_mlp_ui_url, - valid_name_check) +from merlin.util import ( + autostr, + download_files_from_gcs, + guess_mlp_ui_url, + valid_name_check, +) from merlin.validation import validate_model_dir -from merlin.version import VERSION +from mlflow.entities import Run, RunData +from mlflow.exceptions import MlflowException +from mlflow.pyfunc import PythonModel # Ensure backward compatibility after moving PyFuncModel and PyFuncV2Model to pyfunc.py # This allows users to do following import statement @@ -686,7 +695,7 @@ def __init__( self._labels = version.labels self._custom_predictor = version.custom_predictor self._python_version = version.python_version - mlflow.set_tracking_uri(model.project.mlflow_tracking_url) # type: ignore # noqa + mlflow.set_tracking_uri(model.project.mlflow_tracking_url) # type: ignore # noqa @property def id(self) -> int: @@ -811,7 +820,7 @@ def get_run(self) -> Optional[Run]: Get MLFlow Run in a model version """ try: - return mlflow.get_run(self._mlflow_run_id) # type: ignore # noqa + return mlflow.get_run(self._mlflow_run_id) # type: ignore # noqa except MlflowException: return None @@ -936,7 +945,9 @@ def log_pyfunc_model( and self._model.type != ModelType.PYFUNC_V2 and self._model.type != ModelType.PYFUNC_V3 ): - raise ValueError("log_pyfunc_model is only for PyFunc, PyFuncV2 and PyFuncV3 model") + raise ValueError( + "log_pyfunc_model is only for PyFunc, PyFuncV2 and PyFuncV3 model" + ) # add/replace python version in conda to match that used to create model version conda_env = _process_conda_env(conda_env, self._python_version) @@ -1016,7 +1027,7 @@ def log_custom_model( writer.write(f"args = {args}\n") validate_model_dir(self._model.type, model_dir) - mlflow.log_artifacts(model_dir, DEFAULT_MODEL_PATH) # type: ignore # noqa + mlflow.log_artifacts(model_dir, DEFAULT_MODEL_PATH) # type: ignore # noqa if is_using_temp_dir: """ @@ -1062,7 +1073,7 @@ def deploy( deployment_mode: DeploymentMode = None, autoscaling_policy: AutoscalingPolicy = None, protocol: Protocol = None, - enable_model_observability: bool = False + enable_model_observability: bool = False, ) -> VersionEndpoint: """ Deploy current model to MLP One of log_model, log_pytorch_model, @@ -1079,7 +1090,11 @@ def deploy( :return: VersionEndpoint object """ env_list = self._get_env_list() - target_env_name = _get_default_target_env_name(env_list) if environment_name is None else environment_name + target_env_name = ( + _get_default_target_env_name(env_list) + if environment_name is None + else environment_name + ) current_endpoint = self._get_endpoint_in_environment(target_env_name) @@ -1097,9 +1112,13 @@ def deploy( if current_endpoint is None: target_deployment_mode = DeploymentMode.SERVERLESS.value target_protocol = Protocol.HTTP_JSON.value - target_resource_request = ModelVersion._get_default_resource_request(target_env_name, env_list) + target_resource_request = ModelVersion._get_default_resource_request( + target_env_name, env_list + ) target_autoscaling_policy = ModelVersion._get_default_autoscaling_policy( - deployment_mode.value if deployment_mode is not None else target_deployment_mode + deployment_mode.value + if deployment_mode is not None + else target_deployment_mode ) if deployment_mode is not None: @@ -1117,8 +1136,8 @@ def deploy( resource_request.memory_request, ) if ( - resource_request.gpu_request is not None - and resource_request.gpu_name is not None + resource_request.gpu_request is not None + and resource_request.gpu_name is not None ): for env in env_list: for gpu in env.gpus: @@ -1150,8 +1169,13 @@ def deploy( target_env_vars = ModelVersion._add_env_vars(target_env_vars, env_vars) if transformer is not None: - target_transformer = ModelVersion._create_transformer_spec(transformer, target_env_name, env_list) - if current_endpoint is not None and current_endpoint.transformer is not None: + target_transformer = ModelVersion._create_transformer_spec( + transformer, target_env_name, env_list + ) + if ( + current_endpoint is not None + and current_endpoint.transformer is not None + ): target_transformer.id = current_endpoint.transformer.id if logger is not None: @@ -1170,7 +1194,7 @@ def deploy( deployment_mode=target_deployment_mode, autoscaling_policy=target_autoscaling_policy, protocol=target_protocol, - enable_model_observability=enable_model_observability + enable_model_observability=enable_model_observability, ) if current_endpoint is not None: # This allows a serving deployment to be updated while it is serving @@ -1388,20 +1412,24 @@ def start_server( self, env_vars: Dict[str, str] = None, port: int = 8080, + protocol: Protocol = Protocol.HTTP_JSON, pyfunc_base_image: str = None, kill_existing_server: bool = False, tmp_dir: Optional[str] = os.environ.get("MERLIN_TMP_DIR"), build_image: bool = False, + debug: bool = False, ): """ Start a local server running the model version :param env_vars: dictionary of environment variables to be passed to the server :param port: host port that will be used to expose model server + :param protocol: protocol to be used by the deployed model (default: HTTP_JSON) :param pyfunc_base_image: (optional, default=None) docker image to be used as base image for building pyfunc model :param kill_existing_server: (optional, default=False) kill existing server if has been started previously :param tmp_dir: (optional, default=None) specify base path for storing model artifact :param build_image: (optional, default=False) build image for standard model instead of directly mounting the model artifact to model container + :param debug: (optional, default=False) enable debug mode that will print docker build log :return: """ if tmp_dir is None: @@ -1426,8 +1454,27 @@ def start_server( model_type = self.model.type if model_type == ModelType.PYFUNC: - self._run_pyfunc_local_server( - artifact_path, env_vars, port, pyfunc_base_image + context_path = ( + f"{tmp_dir}/merlin/{self.model.project.name}/{self.model.name}" + ) + + conda_env = f"{tmp_dir}/merlin/{self.model.project.name}/{self.model.name}/{self.id}/{DEFAULT_MODEL_PATH}/conda.yaml" + dependencies_path = f"{context_path}/env.yaml" + shutil.copy(conda_env, dependencies_path) + + artifact_path = f"{self.id}/{DEFAULT_MODEL_PATH}" + + run_pyfunc_local_server( + context_path=context_path, + dependencies_path=dependencies_path, + artifact_path=artifact_path, + model_name=self.model.name, + model_version=f"{self.id}", + pyfunc_base_image=pyfunc_base_image, + port=port, + env_vars=env_vars, + protocol=protocol, + debug=debug, ) return @@ -1482,7 +1529,7 @@ def _run_standard_model_local_server( dockerfile=os.path.basename(dockerfile_path), decode=True, ) - self._wait_build_complete(logs) + wait_build_complete(logs) image_name = image_tag print(f"Starting model server {container_name} at port: {port}") @@ -1511,79 +1558,15 @@ def _run_standard_model_local_server( if container is not None: container.remove(force=True) - def _run_pyfunc_local_server( - self, artifact_path, env_vars, port, pyfunc_base_image - ): - if pyfunc_base_image is None: - if "dev" in VERSION: - pyfunc_base_image = "ghcr.io/caraml-dev/merlin-pyfunc-base:dev" - else: - pyfunc_base_image = f"ghcr.io/caraml-dev/merlin-pyfunc-base:v{VERSION}" - - dockerfile_path = copy_pyfunc_dockerfile(artifact_path) - image_tag = f"{self.model.project.name}-{self.model.name}:{self.id}" - client = docker.from_env() - apiClient = APIClient() - print(f"Building pyfunc image: {image_tag}") - logs = apiClient.build( - path=artifact_path, - tag=image_tag, - buildargs={"BASE_IMAGE": pyfunc_base_image, "MODEL_PATH": artifact_path}, - dockerfile=os.path.basename(dockerfile_path), - decode=True, - ) - self._wait_build_complete(logs) - - container: Optional[Container] = None # type: ignore - try: - container_name = self._container_name() - print(f"Starting model server {container_name} at port: {port}") - - if env_vars is None: - env_vars = {} - - env_vars["MODEL_NAME"] = f"{self.model.name}-{self.id}" - env_vars["WORKERS"] = 1 - env_vars["PORT"] = 8080 - container = client.containers.run( - image=image_tag, - name=container_name, - labels={"managed-by": "merlin"}, - ports={"8080/tcp": port}, - environment=env_vars, - detach=True, - remove=True, - ) - - # continously print docker log until the process is interrupted - for log in container.logs(stream=True): - print(log) - finally: - if container is not None: - container.remove(force=True) - def _container_name(self): return f"{self.model.project.name}-{self.model.name}-{self.id}" - def _wait_build_complete(self, logs): - for chunk in logs: - if "error" in chunk: - raise BuildError(chunk["error"], logs) - if "stream" in chunk: - match = re.search( - r"(^Successfully built |sha256:)([0-9a-f]+)$", chunk["stream"] - ) - if match: - image_id = match.group(2) - last_event = chunk - if image_id: - return - raise BuildError("Unknown", logs) - def _get_env_list(self) -> List[client.models.Environment]: return EnvironmentApi(self._api_client).environments_get() - def _get_endpoint_in_environment(self, environment_name: Optional[str]) -> Optional[VersionEndpoint]: + def _get_endpoint_in_environment( + self, environment_name: Optional[str] + ) -> Optional[VersionEndpoint]: """ Return the FIRST endpoint of this model version that is deployed in the environment specified @@ -1596,7 +1579,9 @@ def _get_endpoint_in_environment(self, environment_name: Optional[str]) -> Optio return None @staticmethod - def _get_default_resource_request(env_name: str, env_list: List[client.models.Environment]) -> client.ResourceRequest: + def _get_default_resource_request( + env_name: str, env_list: List[client.models.Environment] + ) -> client.ResourceRequest: resource_request = None for env in env_list: if env.name == env_name: @@ -1609,53 +1594,78 @@ def _get_default_resource_request(env_name: str, env_list: List[client.models.En # This case is when the default resource request is not specified in the environment config if resource_request is None: - raise ValueError("default resource request not found in the environment config") + raise ValueError( + "default resource request not found in the environment config" + ) resource_request.validate() return client.ResourceRequest( - resource_request.min_replica, resource_request.max_replica, - resource_request.cpu_request, resource_request.memory_request) + resource_request.min_replica, + resource_request.max_replica, + resource_request.cpu_request, + resource_request.memory_request, + ) @staticmethod - def _get_default_autoscaling_policy(deployment_mode: str) -> client.AutoscalingPolicy: + def _get_default_autoscaling_policy( + deployment_mode: str, + ) -> client.AutoscalingPolicy: if deployment_mode == DeploymentMode.RAW_DEPLOYMENT.value: autoscaling_policy = RAW_DEPLOYMENT_DEFAULT_AUTOSCALING_POLICY else: autoscaling_policy = SERVERLESS_DEFAULT_AUTOSCALING_POLICY - return client.AutoscalingPolicy(autoscaling_policy.metrics_type.value, autoscaling_policy.target_value) + return client.AutoscalingPolicy( + autoscaling_policy.metrics_type.value, autoscaling_policy.target_value + ) @staticmethod def _add_env_vars(target_env_vars, new_env_vars): if not isinstance(new_env_vars, dict): raise ValueError( - f"env_vars should be dictionary, got: {type(new_env_vars)}") + f"env_vars should be dictionary, got: {type(new_env_vars)}" + ) if len(new_env_vars) > 0: for name, value in new_env_vars.items(): - target_env_vars.append( - client.EnvVar(str(name), str(value))) + target_env_vars.append(client.EnvVar(str(name), str(value))) return target_env_vars @staticmethod - def _create_transformer_spec(transformer: Transformer, target_env_name: str, env_list: List[client.models.Environment]) -> client.Transformer: + def _create_transformer_spec( + transformer: Transformer, + target_env_name: str, + env_list: List[client.models.Environment], + ) -> client.Transformer: resource_request = transformer.resource_request if resource_request is None: - target_resource_request = ModelVersion._get_default_resource_request(target_env_name, env_list) + target_resource_request = ModelVersion._get_default_resource_request( + target_env_name, env_list + ) else: resource_request.validate() target_resource_request = client.ResourceRequest( - resource_request.min_replica, resource_request.max_replica, - resource_request.cpu_request, resource_request.memory_request) + resource_request.min_replica, + resource_request.max_replica, + resource_request.cpu_request, + resource_request.memory_request, + ) target_env_vars: List[client.models.Environment] = [] if transformer.env_vars is not None: - target_env_vars = ModelVersion._add_env_vars(target_env_vars, transformer.env_vars) + target_env_vars = ModelVersion._add_env_vars( + target_env_vars, transformer.env_vars + ) return client.Transformer( - id=transformer.id, enabled=transformer.enabled, - transformer_type=transformer.transformer_type.value, image=transformer.image, - command=transformer.command, args=transformer.args, - resource_request=target_resource_request, env_vars=target_env_vars) + id=transformer.id, + enabled=transformer.enabled, + transformer_type=transformer.transformer_type.value, + image=transformer.image, + command=transformer.command, + args=transformer.args, + resource_request=target_resource_request, + env_vars=target_env_vars, + ) def delete_model_version(self) -> int: """ @@ -1676,8 +1686,9 @@ def _get_default_target_env_name(env_list: List[client.models.Environment]) -> s if env.is_default: target_env_name = env.name if target_env_name is None: - raise ValueError("Unable to find default environment, " - "pass environment_name to the method") + raise ValueError( + "Unable to find default environment, " "pass environment_name to the method" + ) return target_env_name diff --git a/python/sdk/merlin/pyfunc.py b/python/sdk/merlin/pyfunc.py index 2d6714f38..24e8467f3 100644 --- a/python/sdk/merlin/pyfunc.py +++ b/python/sdk/merlin/pyfunc.py @@ -1,15 +1,22 @@ +import os +import shutil from abc import abstractmethod -from typing import Union, List, Optional from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Union +import docker import grpc import numpy import pandas from caraml.upi.v1 import upi_pb2 +from docker import APIClient +from git import Repo +from merlin.docker.docker import copy_pyfunc_dockerfile, wait_build_complete +from merlin.protocol import Protocol +from merlin.version import VERSION from mlflow.pyfunc import PythonModel - -from merlin.protocol import Protocol +import mlflow PYFUNC_EXTRA_ARGS_KEY = "__EXTRA_ARGS__" PYFUNC_MODEL_INPUT_KEY = "__INPUT__" @@ -56,12 +63,15 @@ def predict(self, context, model_input): def _do_http_predict(self, model_input, **kwargs): if self._use_kwargs_infer: try: - http_response = self.infer(model_input, **kwargs) + http_response = self.infer(model_input, **kwargs) return http_response except TypeError as e: if "infer() got an unexpected keyword argument" in str(e): print( - 'Fallback to the old infer() method, got TypeError exception: {}'.format(e)) + "Fallback to the old infer() method, got TypeError exception: {}".format( + e + ) + ) self._use_kwargs_infer = False else: raise e @@ -101,8 +111,9 @@ def infer(self, request: dict, **kwargs) -> dict: raise NotImplementedError("infer is not implemented") @abstractmethod - def upiv1_infer(self, request: upi_pb2.PredictValuesRequest, - context: grpc.ServicerContext) -> upi_pb2.PredictValuesResponse: + def upiv1_infer( + self, request: upi_pb2.PredictValuesRequest, context: grpc.ServicerContext + ) -> upi_pb2.PredictValuesResponse: """ Do inference. @@ -114,9 +125,8 @@ def upiv1_infer(self, request: upi_pb2.PredictValuesRequest, :return: Prediction result as PredictValuesResponse proto """ raise NotImplementedError("upiv1_infer is not implemented") - - + @dataclass class Values: columns: List[str] @@ -126,11 +136,8 @@ def to_dict(self): data = self.data if isinstance(self.data, numpy.ndarray): data = self.data.tolist() - - return { - "columns": self.columns, - "data": data - } + + return {"columns": self.columns, "data": data} @dataclass @@ -148,25 +155,29 @@ class ModelInput: def features_dict(self) -> Optional[dict]: if self.features is None: return None - + val = self.features if isinstance(self.features, pandas.DataFrame): - val = Values(self.features.columns.values.tolist(), self.features.values.tolist()) - + val = Values( + self.features.columns.values.tolist(), self.features.values.tolist() + ) + result = val.to_dict() result["row_ids"] = self.prediction_ids return result - + def entities_dict(self) -> Optional[dict]: if self.entities is None: return None val = self.entities if isinstance(self.entities, pandas.DataFrame): - val = Values(self.entities.columns.values.tolist(), self.entities.values.tolist()) - result = val.to_dict() + val = Values( + self.entities.columns.values.tolist(), self.entities.values.tolist() + ) + result = val.to_dict() result["row_ids"] = self.prediction_ids return result - + @dataclass class ModelOutput: @@ -180,15 +191,19 @@ class ModelOutput: def predictions_dict(self) -> dict: if self.predictions is None: return None - + predictions = self.predictions if isinstance(self.predictions, pandas.DataFrame): - predictions = Values(self.predictions.columns.values.tolist(), self.predictions.values.tolist()) - + predictions = Values( + self.predictions.columns.values.tolist(), + self.predictions.values.tolist(), + ) + result = predictions.to_dict() result["row_ids"] = self.prediction_ids return result + @dataclass class PyFuncOutput: # final pyfunc response payload when using `HTTP_JSON` protocol @@ -207,7 +222,7 @@ def get_session_id(self) -> Optional[str]: def contains_prediction_log(self): return self.model_input is not None and self.model_output is not None - + class PyFuncV3Model(PythonModel): def load_context(self, context): @@ -242,7 +257,11 @@ def predict(self, context, model_input): ml_model_input = self.upiv1_preprocess(input, grpc_context) ml_model_output = self.infer(ml_model_input) final_output = self.upiv1_postprocess(ml_model_output, input) - return PyFuncOutput(upi_response=final_output, model_input=ml_model_input, model_output=ml_model_output) + return PyFuncOutput( + upi_response=final_output, + model_input=ml_model_input, + model_output=ml_model_output, + ) else: raise NotImplementedError(f"protocol {protocol} is not supported") @@ -251,7 +270,11 @@ def _do_http_predict(self, model_input, **kwargs): ml_model_output = self.infer(ml_model_input) final_output = self.postprocess(ml_model_output, model_input) - return PyFuncOutput(http_response=final_output, model_input=ml_model_input, model_output=ml_model_output) + return PyFuncOutput( + http_response=final_output, + model_input=ml_model_input, + model_output=ml_model_output, + ) @abstractmethod def initialize(self, artifacts: dict): @@ -263,7 +286,6 @@ def initialize(self, artifacts: dict): """ pass - @abstractmethod def preprocess(self, request: dict, **kwargs) -> ModelInput: """ @@ -283,7 +305,7 @@ def infer(self, model_input: ModelInput) -> ModelOutput: :return: model output """ raise NotImplementedError("infer is not implemented") - + @abstractmethod def postprocess(self, model_output: ModelOutput, request: dict) -> dict: """ @@ -296,8 +318,9 @@ def postprocess(self, model_output: ModelOutput, request: dict) -> dict: raise NotImplementedError("postprocess is not implemented") @abstractmethod - def upiv1_preprocess(self, request: upi_pb2.PredictValuesRequest, - context: grpc.ServicerContext) -> ModelInput: + def upiv1_preprocess( + self, request: upi_pb2.PredictValuesRequest, context: grpc.ServicerContext + ) -> ModelInput: """ upiv1_preprocess is the preprocessing method that only applicable for UPI_V1 protocol. basically the method is the same with `preprocess` the difference is on the type of the incoming request @@ -307,9 +330,11 @@ def upiv1_preprocess(self, request: upi_pb2.PredictValuesRequest, :return: model input """ raise NotImplementedError("upiv1_preprocess is not implemented") - + @abstractmethod - def upiv1_postprocess(self, model_output: ModelOutput, request: upi_pb2.PredictValuesRequest) -> upi_pb2.PredictValuesResponse: + def upiv1_postprocess( + self, model_output: ModelOutput, request: upi_pb2.PredictValuesRequest + ) -> upi_pb2.PredictValuesResponse: """ upiv1_postprocess is the postprocessing method that only applicable for UPI_V1 protocol. :param model_output: the output of the `infer` function @@ -350,9 +375,9 @@ def initialize(self, artifacts: dict): """ pass - def infer(self, model_input: pandas.DataFrame) -> Union[numpy.ndarray, - pandas.Series, - pandas.DataFrame]: + def infer( + self, model_input: pandas.DataFrame + ) -> Union[numpy.ndarray, pandas.Series, pandas.DataFrame]: """ Infer method is the main method that will be called when calculating the inference result for both online prediction and batch @@ -369,3 +394,210 @@ def infer(self, model_input: pandas.DataFrame) -> Union[numpy.ndarray, """ raise NotImplementedError("infer is not implemented") + + +def run_pyfunc_model( + model_instance: Any, + conda_env: str, + code_dir: List[str] = None, + artifacts: Dict[str, str] = None, + pyfunc_base_image: str = None, + port: int = 8080, + env_vars: Dict[str, str] = None, + protocol: Protocol = Protocol.HTTP_JSON, + debug: bool = False, +): + """ + Run pyfunc model locally using Docker. The function will log the model artifacts onto local mlflow, build a docker image, and run the container locally. + + :param model_instance: instance of python function model + :param conda_env: path to conda env.yaml file + :param code_dir: additional code directory that will be loaded with ModelType.PYFUNC model + :param artifacts: dictionary of artifact that will be stored together with the model. This will be passed to PythonModel.initialize. Example: {"config": "config/staging.yaml"} + :param pyfunc_base_image: base image for building pyfunc model + :param port: port to expose the model + :param env_vars: dictionary of environment variables to be passed to the server + :param protocol: protocol to be used by the deployed model (default: HTTP_JSON) + :param debug: flag to enable debug mode that will print docker build log + """ + + model_name = str.lower(model_instance.__class__.__name__) + + # Log model to local mlflow + print("Logging model to local MLflow") + tracking_uri = f"file:///tmp/merlin/pyfunc-models" + mlflow.set_tracking_uri(tracking_uri) + experiment = mlflow.set_experiment(model_name) + + model_info = mlflow.pyfunc.log_model( + artifact_path="model", + python_model=model_instance, + conda_env=conda_env, + code_path=code_dir, + artifacts=artifacts, + ) + + context_path = experiment.artifact_location + if "file://" in context_path: + context_path = context_path.replace("file://", "") + + dependencies_path = f"{context_path}/env.yaml" + shutil.copy(conda_env, dependencies_path) + + artifact_path = f"{model_info.run_id}/artifacts/model" + + run_pyfunc_local_server( + context_path=context_path, + dependencies_path=dependencies_path, + artifact_path=artifact_path, + model_name=model_name, + model_version="dev", + pyfunc_base_image=pyfunc_base_image, + port=port, + env_vars=env_vars, + protocol=protocol, + debug=debug, + ) + + +def run_pyfunc_local_server( + context_path: str, + dependencies_path: str, + artifact_path: str, + model_name: str, + model_version: str, + pyfunc_base_image: str = None, + port: int = 8080, + env_vars: Dict[str, str] = None, + protocol: Protocol = Protocol.HTTP_JSON, + debug: bool = False, +): + if pyfunc_base_image is None: + if "dev" in VERSION or "0.0.0" in VERSION: + pyfunc_base_image = "ghcr.io/caraml-dev/merlin/merlin-pyfunc-base:0.38.1" + else: + pyfunc_base_image = ( + f"ghcr.io/caraml-dev/merlin/merlin-pyfunc-base:{VERSION}" + ) + + dockerfile_path = copy_pyfunc_dockerfile(context_path) + + _clone_merlin_repo(context_path) + + image_tag = f"{model_name}-{model_version}" + + print(f"Building Docker image {image_tag}") + if debug: + print("Context path:", context_path) + print("Dockerfile path:", dockerfile_path) + print("Dependencies path:", dependencies_path) + print("Model artifacts path:", artifact_path) + + _build_image( + image_tag=image_tag, + context_path=context_path, + dependencies_path=dependencies_path, + artifact_path=artifact_path, + pyfunc_base_image=pyfunc_base_image, + dockerfile_path=dockerfile_path, + debug=debug, + ) + + print("Running PyFunc local server") + _run_container( + image_tag=image_tag, + model_name=model_name, + model_version=model_version, + model_full_name=f"{model_name}-{model_version}", + port=port, + env_vars=env_vars, + protocol=protocol, + ) + + +def _clone_merlin_repo(context_path: str): + repo_url = "https://github.com/caraml-dev/merlin.git" + repo_path = f"{context_path}/merlin" + if not os.path.isdir(repo_path): + repo = Repo.clone_from(repo_url, repo_path) + if "dev" in VERSION or "0.0.0" in VERSION: + repo.git.checkout("main") + else: + repo.git.checkout(f"v{VERSION}") + + +def _build_image( + image_tag, + context_path, + dependencies_path, + artifact_path, + pyfunc_base_image, + dockerfile_path, + debug, +): + docker_api_client = APIClient() + logs = docker_api_client.build( + path=context_path, + tag=image_tag, + buildargs={ + "BASE_IMAGE": pyfunc_base_image, + "MODEL_DEPENDENCIES_URL": os.path.basename(dependencies_path), + "MODEL_ARTIFACTS_URL": artifact_path, + }, + dockerfile=os.path.basename(dockerfile_path), + decode=True, + ) + + wait_build_complete(logs, debug) + + +def _run_container( + image_tag, + model_name, + model_version, + model_full_name, + port, + env_vars: Dict[str, str] = None, + protocol: Protocol = Protocol.HTTP_JSON, +): + docker_client = docker.from_env() + + # Stop all previous containers to avoid port conflict + started_containers = docker_client.containers.list( + filters={"name": model_full_name} + ) + for started_container in started_containers: + started_container.remove(force=True) + + if env_vars is None: + env_vars = {} + + env_vars["CARAML_HTTP_PORT"] = "8080" + env_vars["CARAML_GRPC_PORT"] = "9000" + env_vars["CARAML_MODEL_NAME"] = model_name + env_vars["CARAML_MODEL_VERSION"] = model_version + env_vars["CARAML_MODEL_FULL_NAME"] = model_full_name + env_vars["WORKERS"] = "1" + + ports = {"8080/tcp": port} + if protocol == Protocol.UPI_V1: + env_vars["CARAML_PROTOCOL"] = protocol.value + ports = {"9000/tcp": port} + + try: + container = docker_client.containers.run( + image=image_tag, + name=model_name, + labels={"managed-by": "merlin"}, + ports=ports, + environment=env_vars, + detach=True, + remove=True, + ) + + # continously print docker log until the process is interrupted + for log in container.logs(stream=True): + print(log) + finally: + if container is not None: + container.remove(force=True) diff --git a/python/sdk/setup.py b/python/sdk/setup.py index 1bd914f26..ca7c7ca36 100644 --- a/python/sdk/setup.py +++ b/python/sdk/setup.py @@ -29,8 +29,9 @@ "Click>=7.0,<8.1.4", "cloudpickle==2.0.0", # used by mlflow "cookiecutter>=1.7.2", - "dataclasses-json>=0.5.2", # allow Flyte version 1.2.0 or above to import Merlin SDK - "docker>=4.2.1", + "dataclasses-json>=0.5.2", # allow Flyte version 1.2.0 or above to import Merlin SDK + "docker<=6.1.3", + "GitPython>=3.1.40", "google-cloud-storage>=1.19.0", "protobuf>=3.12.0,<5.0.0", # Determined by the mlflow dependency "mlflow>=1.26.1,<2.0.0", diff --git a/python/sdk/test/local_server_test.py b/python/sdk/test/local_server_test.py index da39446ee..4bf00aabe 100644 --- a/python/sdk/test/local_server_test.py +++ b/python/sdk/test/local_server_test.py @@ -23,12 +23,7 @@ import requests from merlin.model import ModelType -request_json = { - "instances": [ - [2.8, 1.0, 6.8, 0.4], - [3.1, 1.4, 4.5, 1.6] - ] -} +request_json = {"instances": [[2.8, 1.0, 6.8, 0.4], [3.1, 1.4, 4.5, 1.6]]} if os.environ.get("CI_SERVER"): host = "172.17.0.1" @@ -38,7 +33,9 @@ @pytest.mark.integration @pytest.mark.local_server_test -@pytest.mark.dependency(depends=["test/integration_test.py::test_sklearn"], scope='session') +@pytest.mark.dependency( + depends=["test/integration_test.py::test_sklearn"], scope="session" +) def test_sklearn(integration_test_url, project_name, use_google_oauth): merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) merlin.set_project(project_name) @@ -52,13 +49,15 @@ def test_sklearn(integration_test_url, project_name, use_google_oauth): assert resp.status_code == 200 assert resp.json() is not None - assert len(resp.json()['predictions']) == len(request_json['instances']) + assert len(resp.json()["predictions"]) == len(request_json["instances"]) p.terminate() @pytest.mark.integration @pytest.mark.local_server_test -@pytest.mark.dependency(depends=["test/integration_test.py::test_xgboost"], scope='session') +@pytest.mark.dependency( + depends=["test/integration_test.py::test_xgboost"], scope="session" +) def test_xgboost(integration_test_url, project_name, use_google_oauth): merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) merlin.set_project(project_name) @@ -72,13 +71,15 @@ def test_xgboost(integration_test_url, project_name, use_google_oauth): assert resp.status_code == 200 assert resp.json() is not None - assert len(resp.json()['predictions']) == len(request_json['instances']) + assert len(resp.json()["predictions"]) == len(request_json["instances"]) p.terminate() @pytest.mark.integration @pytest.mark.local_server_test -@pytest.mark.dependency(depends=["test/integration_test.py::test_tensorflow"], scope='session') +@pytest.mark.dependency( + depends=["test/integration_test.py::test_tensorflow"], scope="session" +) def test_tensorflow(integration_test_url, project_name, use_google_oauth): merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) merlin.set_project(project_name) @@ -91,23 +92,33 @@ def test_tensorflow(integration_test_url, project_name, use_google_oauth): request_json = { "signature_name": "predict", "instances": [ - {"sepal_length": 2.8, "sepal_width": 1.0, "petal_length": 6.8, - "petal_width": 0.4}, - {"sepal_length": 0.1, "sepal_width": 0.5, "petal_length": 1.8, - "petal_width": 2.4} - ] + { + "sepal_length": 2.8, + "sepal_width": 1.0, + "petal_length": 6.8, + "petal_width": 0.4, + }, + { + "sepal_length": 0.1, + "sepal_width": 0.5, + "petal_length": 1.8, + "petal_width": 2.4, + }, + ], } resp = requests.post(_get_local_endpoint(v, port), json=request_json) assert resp.status_code == 200 assert resp.json() is not None - assert len(resp.json()['predictions']) == len(request_json['instances']) + assert len(resp.json()["predictions"]) == len(request_json["instances"]) p.terminate() @pytest.mark.integration @pytest.mark.local_server_test -@pytest.mark.dependency(depends=["test/integration_test.py::test_pytorch"], scope='session') +@pytest.mark.dependency( + depends=["test/integration_test.py::test_pytorch"], scope="session" +) def test_pytorch(integration_test_url, project_name, use_google_oauth): merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) merlin.set_project(project_name) @@ -125,13 +136,15 @@ def test_pytorch(integration_test_url, project_name, use_google_oauth): assert resp.status_code == 200 assert resp.json() is not None - assert len(resp.json()['predictions']) == len(request_json['instances']) + assert len(resp.json()["predictions"]) == len(request_json["instances"]) p.terminate() @pytest.mark.local_server_test @pytest.mark.integration -@pytest.mark.dependency(depends=["test/pyfunc_integration_test.py::test_pyfunc"], scope='session') +@pytest.mark.dependency( + depends=["test/pyfunc_integration_test.py::test_pyfunc"], scope="session" +) def test_pyfunc(integration_test_url, project_name, use_google_oauth): merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) merlin.set_project(project_name) @@ -145,7 +158,7 @@ def test_pyfunc(integration_test_url, project_name, use_google_oauth): assert resp.status_code == 200 assert resp.json() is not None - assert len(resp.json()['predictions']) == len(request_json['instances']) + assert len(resp.json()["predictions"]) == len(request_json["instances"]) p.terminate() @@ -158,10 +171,10 @@ def _get_latest_version(model): def _get_free_port(): sock = None try: - while (True): + while True: port = random.randint(8000, 9000) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - result = sock.connect_ex(('127.0.0.1', port)) + result = sock.connect_ex(("127.0.0.1", port)) if result != 0: return port finally: @@ -171,7 +184,7 @@ def _get_free_port(): def _wait_server_ready(url, timeout_second=300, tick_second=5): ellapsed_second = 0 - while (ellapsed_second < timeout_second): + while ellapsed_second < timeout_second: try: resp = requests.get(url) if resp.status_code == 200: diff --git a/python/sdk/test/pyfunc_integration_test.py b/python/sdk/test/pyfunc_integration_test.py index 736257c9c..da97c8edc 100644 --- a/python/sdk/test/pyfunc_integration_test.py +++ b/python/sdk/test/pyfunc_integration_test.py @@ -17,16 +17,13 @@ from test.utils import undeploy_all_version import joblib +import merlin import numpy as np import pytest import xgboost as xgb - -import merlin -from merlin.model import ModelType, PyFuncModel -from merlin.resource_request import ResourceRequest from merlin.model import ModelType, PyFuncModel, PyFuncV3Model from merlin.pyfunc import ModelInput, ModelOutput, Values -from sklearn import svm +from merlin.resource_request import ResourceRequest from sklearn.datasets import load_iris warnings.filterwarnings("ignore") @@ -49,6 +46,7 @@ def infer(self, model_input): result_2 = self._model_2.predict_proba(inputs) return {"predictions": ((result_1 + result_2) / 2).tolist()} + class EnvVarModel(PyFuncModel): def initialize(self, artifacts): self.env_var = {} @@ -58,21 +56,24 @@ def initialize(self, artifacts): def infer(self, model_input): return self.env_var - + + class ModelObservabilityModel(PyFuncV3Model): def initialize(self, artifacts): - self._feature_names = ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'] - self._target_names = ['setosa', 'versicolor', 'virginica'] + self._feature_names = [ + "sepal length (cm)", + "sepal width (cm)", + "petal length (cm)", + "petal width (cm)", + ] + self._target_names = ["setosa", "versicolor", "virginica"] self._model = xgb.Booster(model_file=artifacts["xgb_model"]) def preprocess(self, request: dict, **kwargs) -> ModelInput: - features_data = request['instances'] + features_data = request["instances"] return ModelInput( prediction_ids=["prediction_1", "prediction_2"], - features=Values( - columns=self._feature_names, - data=features_data - ) + features=Values(columns=self._feature_names, data=features_data), ) def infer(self, model_input: ModelInput) -> ModelOutput: @@ -80,16 +81,11 @@ def infer(self, model_input: ModelInput) -> ModelOutput: outputs = self._model.predict(dmatrix).tolist() return ModelOutput( prediction_ids=model_input.prediction_ids, - predictions=Values( - columns=self._target_names, - data = outputs - ) + predictions=Values(columns=self._target_names, data=outputs), ) def postprocess(self, model_output: ModelOutput, request: dict) -> dict: - return { - "predictions": model_output.predictions.data - } + return {"predictions": model_output.predictions.data} @pytest.mark.pyfunc @@ -186,10 +182,13 @@ def test_pyfunc_env_vars( merlin.undeploy(v) + @pytest.mark.pyfunc @pytest.mark.integration @pytest.mark.dependency() -def test_pyfunc_model_observability(integration_test_url, project_name, use_google_oauth, requests): +def test_pyfunc_model_observability( + integration_test_url, project_name, use_google_oauth, requests +): merlin.set_url(integration_test_url, use_google_oauth=use_google_oauth) merlin.set_project(project_name) merlin.set_model("pyfunc-mlobs", ModelType.PYFUNC_V3) @@ -197,14 +196,16 @@ def test_pyfunc_model_observability(integration_test_url, project_name, use_goog undeploy_all_version() with merlin.new_model_version() as v: iris = load_iris() - y = iris['target'] - X = iris['data'] + y = iris["target"] + X = iris["data"] xgb_path = train_xgboost_model(X, y) - v.log_pyfunc_model(model_instance=ModelObservabilityModel(), - conda_env="test/pyfunc/env.yaml", - code_dir=["test"], - artifacts={"xgb_model": xgb_path}) + v.log_pyfunc_model( + model_instance=ModelObservabilityModel(), + conda_env="test/pyfunc/env.yaml", + code_dir=["test"], + artifacts={"xgb_model": xgb_path}, + ) endpoint = merlin.deploy(v, enable_model_observability=True) @@ -212,7 +213,7 @@ def test_pyfunc_model_observability(integration_test_url, project_name, use_goog assert resp.status_code == 200 assert resp.json() is not None - assert len(resp.json()['predictions']) == len(request_json['instances']) + assert len(resp.json()["predictions"]) == len(request_json["instances"]) merlin.undeploy(v)