This experiment uses PyTorch-based and scikit-learn clustering model for multi-stage machine learning pipeline using Python and Apache Beam.
The task is to detect anomalies within the data based on the input values. In our example we use Autoembedder to train the encoder and HDBSCAN clustering model for detecting the anomalies in processed data. We use RunInference to wrap the models and add them to the custom Beam PTransform.
All the steps are performed on Apache Beam using Python API. You can check out train and test by running the Jupyter Notebooks:
Prepared models and parameters tailored for our data are located in ./pretrained
. You can use them for testing the pipeline
instead of training the model from the ground.
For the demo, we use mock Salesforce data with client opportunities. To demonstrate the pipeline, we encode the values with PyTorch and then apply a clustering model to detect the anomalies.
You have two options to include a custom Python transform in the multi-language pipeline:
Use PythonExternalTransform to stage PyPI package dependencies
If your custom Python transform is available in PyPI, you can use the withExtraPackages
method of the
PythonExternalTransform
class and specify the dependencies required by the RunInference model handler in the arguments.
More details in Apache Beam documentation: Creating cross-language Python transforms
We've published the anomaly detection package and can use it along with other necessary Python packages in our pipeline leaving the expansion service parameter empty:
PythonExternalTransform.<PCollection<?>, PCollection<KV<String, Row>>>from(
ANOMALY_DETECTION_TRANSFORM, options.getExpansionService())
.withExtraPackages(Lists.newArrayList("akvelon-test-anomaly-detection",
"category_encoders", "torch", "hdbscan", "autoembedder"));
NOTE: Only use the Expansion Service when you need a custom environment that includes packages not available in the Beam SDK or not published in PyPI. In other cases, use the
withExtraPackages
method to pass Python PTransforms dependencies
Use Custom Expansion Service
In order to make the inference on Python available in a multi-language pipeline, all model inference and data preprocessing is packed into a custom PTransform that is installed in an expansion service.
The expansion service image must be supplied with all the dependencies that are used in the Python part of the pipeline. If these imports are available in PyPI,
we can include them directly in Dockerfile
, otherwise we can copy the package source to the image and install it directly with setup.py
.
In our example, we use both pip
to collect the required packages and our module setup.py
in Dockerfile to build an expansion service and then deploy it on a remote host.
By default, the implementation of the expansion service used by the runner doesn't support remote connections thus working only for a pipeline on the same machine.
Our packed pipeline image includes a modified version of the service with allowed outside connections from any address (0.0.0.0
) to the container.
You can find the original implementation of the expansion service here:
All the files that are needed to run the service locally or on the cloud are located in ./pipeline
:
setup.py
file and ./anomaly_detection
directory are used to install the custom transform in the Docker image and also hold the main pipeline code.
To start an expansion service for loading custom PTransform with RunInference, you need to:
docker build ./pipeline -t exp_service
docker run -p 8088:8088 exp_service
gcloud builds submit ./pipeline --tag gs://{gc_project_name}/{project_name}/{tag}:latest
We use Google Cloud CLI for building the image. After that the image will be available by the following path, and we can use it for launching the Cloud Compute instance.
To run the expansion service with all the required modules, the Compute Engine instance needs at least
25 GB of a boot disk.
Create a Cloud Compute instance and use the image that we built using Google CLI. You can also check out our
prepared Docker image for anomaly detection (expansion-service
) and include it instead:
- You also need to make sure that port 8088 is open for incoming connections in GCP Firewall.
- Upload the model weights to the bucket:
gsutil -m cp -r /pretrained gs://{bucket-path}
- Include links to the uploaded files as parameters
encoder_uri
,model_uri
andparams_uri
when importing PTransform.
The parameters you'll also need to pass are the URIs to the saved model weights from ./pretrained
. You also need to specify the hosted expansion service public address in the Java pipeline.
Launch the prepared machine.
You can now call for custom Beam transform by accessing the external IP with port 8088
More details in Apache Beam documentation: Java multi-language pipelines quickstart