You can create isolated Python virtual environments to package multiple Python libraries for a PySpark job. Here is an example of how you can package Great Expectations and profile a set of sample data.
- Access to EMR Serverless
- Docker with the BuildKit backend
- An S3 bucket in
us-east-1
and an IAM Role to run your EMR Serverless jobs
Note: If using Docker on Apple Silicon ensure you use
--platform linux/amd64
Set the following variables according to your environment.
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>
The example below builds a virtual environment with the necessary dependencies to use Great Expectations to profile a limited set of data from the New York City Taxi and Limo trip data.
All the commands below should be executed in this (examples/pyspark/dependencies
) directory.
- Build your virtualenv archive
This command builds the included Dockerfile
and exports the resulting pyspark_ge.tar.gz
file to your local filesystem.
Note The included Dockerfile builds for x86 - if you would like to build for Graviton, update the Dockerfile to use
linux/arm64
as the platform and see the EMR Serverless architecture options for more detail.
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build --output . .
aws s3 cp pyspark_ge.tar.gz s3://${S3_BUCKET}/artifacts/pyspark/
- Copy your code
There's a sample ge_profile.py
script included here.
aws s3 cp ge_profile.py s3://${S3_BUCKET}/code/pyspark/
- Run your job
entryPoint
should point to your script on S3entryPointArguments
defines the output location of the Great Expectations profiler- The virtualenv archive is added via the
--archives
parameter - The driver and executor Python paths are configured via the various
--conf spark.emr-serverless
parameters
aws emr-serverless start-job-run \
--application-id $APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/ge_profile.py",
"entryPointArguments": ["s3://'${S3_BUCKET}'/tmp/ge-profile"],
"sparkSubmitParameters": "--conf spark.archives=s3://'${S3_BUCKET}'/artifacts/pyspark/pyspark_ge.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/logs/"
}
}
}'
When the job finishes, it will write a part-00000
file out to s3://${S3_BUCKET}/tmp/ge-profile
.
- Copy and view the output
aws s3 cp s3://${S3_BUCKET}/tmp/ge-profile/part-00000 ./ge.html
open ./ge.html
Sometimes you need to pull in Java dependencies like Kafka or PostgreSQL libraries. As of release label emr-6.7.0
, you can use either spark.jars.packages
or the --packages
flag in your sparkSubmitParameters
as shown below. Be sure to create your application within a VPC so that it can download the necessary dependencies.
# First create an application with release label emr-6.7.0 and a network configuration
aws emr-serverless create-application \
--release-label emr-6.7.0 \
--type SPARK \
--name spark-packages \
--network-configuration '{
"subnetIds": ["subnet-abcdef01234567890", "subnet-abcdef01234567891"],
"securityGroupIds": ["sg-abcdef01234567893"]
}'
# Then submit a job (replacing the application id, arn, and your code/packages)
aws emr-serverless start-job-run \
--name pg-query \
--application-id $APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
"sparkSubmitParameters": "--packages org.postgresql:postgresql:42.4.0"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/logs/"
}
}
}'
While --packages
will let you easily specify additional dependencies for your job, these dependencies are not cached between job runs. In other words, each job run will need to re-fetch the dependencies potentially leading to increased startup time. To mitigate this, and to create reproducible builds, you can create a dependency uberjar and upload that to S3.
This approach can also be used with EMR release label emr-6.6.0
.
To do this, we'll create a pom.xml
that specifies our dependencies and use a maven Docker container to build the uberjar. In this example, we'll package org.postgresql:postgresql:42.4.0
and use the example script in ./pg_query.py to query a Postgres database.
Note: The code in
pg_query.py
is for demonstration purposes only - never store credentials directly in your code. 😁
- Build an uberjar with your dependencies
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build -f Dockerfile.jars --output . .
This will create a uber-jars-1.0-SNAPSHOT.jar
file locally that you will copy to S3 in the next step.
- Copy your code and jar
aws s3 cp pg_query.py s3://${S3_BUCKET}/code/pyspark/
aws s3 cp uber-jars-1.0-SNAPSHOT.jar s3://${S3_BUCKET}/code/pyspark/jars/
- Set the following variables according to your environment.
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>
- Start your job with
--jars
aws emr-serverless start-job-run \
--name pg-query \
--application-id $APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
"sparkSubmitParameters": "--jars s3://'${S3_BUCKET}'/code/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/logs/"
}
}
}'
- See the output of your job!
Once your job finishes, you can copy the output locally to view the stdout.
export JOB_RUN_ID=<YOUR_JOB_RUN_ID>
aws s3 cp s3://${S3_BUCKET}/logs/applications/${APPLICATION_ID}/jobs/${JOB_RUN_ID}/SPARK_DRIVER/stdout.gz - | gunzip