From 9b59aebebea0833f322812b54923e31853a1c34c Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 10:59:03 -0500 Subject: [PATCH 01/29] config file for environment specific variable --- config.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 config.yaml diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..ae5327a --- /dev/null +++ b/config.yaml @@ -0,0 +1,12 @@ +production: + instanceName: "gpt-bayes" + serverName: "nextgen-mmm.pymc-labs.com" + ipAddress: "35.208.203.115" + port: "5000" + region: "us-central1" +development: + instanceName: "dev-gpt-bayes" + serverName: "dev-nextgen-mmm.pymc-labs.com" + ipAddress: "" + port: "5000" + region: "us-central1" From cfe1f3d39631cec93616921089fece8debadfff1 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 11:02:05 -0500 Subject: [PATCH 02/29] update deploy script to require environment parameter --- deploy.sh | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/deploy.sh b/deploy.sh index 987c645..984c5e0 100755 --- a/deploy.sh +++ b/deploy.sh @@ -1,5 +1,24 @@ #!/bin/bash +# Validate environment argument +if [ "$1" != "production" ] && [ "$1" != "development" ]; then + echo "Error: Environment must be specified as either 'production' or 'development'" + echo "Usage: $0 " + exit 1 +fi + +ENVIRONMENT=$1 + +# Function to read config value +get_config() { + local key=$1 + yq eval ".$ENVIRONMENT.$key" config.yaml +} + +# Get configuration values +INSTANCE_NAME=$(get_config "instanceName") +ZONE=$(get_config "region")-a + # Function to check if rebuild is needed should_rebuild() { read -p "Did you modify Dockerfile, environment.yml or other dependencies? (y/N): " answer @@ -18,20 +37,20 @@ if should_rebuild; then gcloud builds submit echo "Cleaning up docker system..." - gcloud compute ssh gpt-bayes --zone us-central1-a --command 'docker system prune -f -a' + gcloud compute ssh "$INSTANCE_NAME" --zone "$ZONE" --command 'docker system prune -f -a' echo "Updating container..." - gcloud compute instances update-container gpt-bayes \ - --zone=us-central1-a \ - --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest + gcloud compute instances update-container "$INSTANCE_NAME" \ + --zone="$ZONE" \ + --container-image=$ZONE-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest else echo "Copying new files and restarting application..." # First, copy files to the instance - gcloud compute scp --zone=us-central1-a --recurse ./*.py gpt-bayes:/tmp/ + gcloud compute scp --zone="$ZONE" --recurse ./*.py "$INSTANCE_NAME":/tmp/ # Then copy files into the container and restart services - gcloud compute ssh gpt-bayes --zone=us-central1-a --command " - CONTAINER_ID=\$(docker ps --filter ancestor=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest -q) && \ + gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command " + CONTAINER_ID=\$(docker ps --filter ancestor=$ZONE-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q) && \ if [ -z \"\$CONTAINER_ID\" ]; then echo \"Error: Could not find running container\" exit 1 @@ -50,7 +69,7 @@ else # Restart the container echo "Restarting container..." - gcloud compute ssh gpt-bayes --zone=us-central1-a --command "docker restart \$(docker ps --filter ancestor=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest -q)" + gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command "docker restart \$(docker ps --filter ancestor=$ZONE-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q)" fi echo "Deployment complete!" From 1ac4ee72b019dbcbfc1c32f319da2fd7555201c8 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 11:19:33 -0500 Subject: [PATCH 03/29] new script to build a docker container and f ix to deploy script, update reqs --- build.sh | 24 ++++++++++++++++++++++++ deploy.sh | 2 +- environment.yml | 1 + 3 files changed, 26 insertions(+), 1 deletion(-) create mode 100755 build.sh diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..1ffcde7 --- /dev/null +++ b/build.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Validate environment argument +if [ "$1" != "production" ] && [ "$1" != "development" ]; then + echo "Error: Environment must be specified as either 'production' or 'development'" + echo "Usage: $0 " + exit 1 +fi + +ENVIRONMENT=$1 + +# Function to read config value +get_config() { + local key=$1 + yq ".$ENVIRONMENT.$key" config.yaml +} + +# Get configuration values +INSTANCE_NAME=$(get_config "instanceName") +ZONE=$(get_config "region")-a + +echo $INSTANCE_NAME + +gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$INSTANCE_NAME,_SERVICE_NAME=$INSTANCE_NAME,_REGION=$ZONE \ No newline at end of file diff --git a/deploy.sh b/deploy.sh index 984c5e0..69f6413 100755 --- a/deploy.sh +++ b/deploy.sh @@ -12,7 +12,7 @@ ENVIRONMENT=$1 # Function to read config value get_config() { local key=$1 - yq eval ".$ENVIRONMENT.$key" config.yaml + yq ".$ENVIRONMENT.$key" config.yaml } # Get configuration values diff --git a/environment.yml b/environment.yml index 70e2d7c..ed21a2a 100644 --- a/environment.yml +++ b/environment.yml @@ -12,4 +12,5 @@ dependencies: - vine=5.1.0 - redis-py=4.5.0 - procps-ng + - yq=2.12.0 From 45887fbb99f4051469f35863ea1819ba82f63976 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 12:05:21 -0500 Subject: [PATCH 04/29] update config with external IP for development server --- config.yaml | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/config.yaml b/config.yaml index ae5327a..ddba523 100644 --- a/config.yaml +++ b/config.yaml @@ -1,12 +1,14 @@ production: - instanceName: "gpt-bayes" - serverName: "nextgen-mmm.pymc-labs.com" - ipAddress: "35.208.203.115" - port: "5000" - region: "us-central1" + instanceName: gpt-bayes + serverName: nextgen-mmm.pymc-labs.com + ipAddress: 35.208.203.115 + port: 5000 + region: us-central1 + zone: us-central1-a development: - instanceName: "dev-gpt-bayes" - serverName: "dev-nextgen-mmm.pymc-labs.com" - ipAddress: "" - port: "5000" - region: "us-central1" + instanceName: dev-gpt-bayes + serverName: dev-nextgen-mmm.pymc-labs.com + ipAddress: 34.59.137.141 + port: 5000 + region: us-central1 + zone: us-central1-a From 882811c54e95cc5c8346f0321783811f233c9a8b Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 12:06:59 -0500 Subject: [PATCH 05/29] update build script to use env specific params --- build.sh | 8 ++++---- cloudbuild.yaml | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/build.sh b/build.sh index 1ffcde7..8685b43 100755 --- a/build.sh +++ b/build.sh @@ -12,13 +12,13 @@ ENVIRONMENT=$1 # Function to read config value get_config() { local key=$1 - yq ".$ENVIRONMENT.$key" config.yaml + yq ".$ENVIRONMENT.$key" config.yaml | tr -d '"' } # Get configuration values INSTANCE_NAME=$(get_config "instanceName") -ZONE=$(get_config "region")-a +REGION=$(get_config "region") -echo $INSTANCE_NAME +echo "Building $INSTANCE_NAME in $REGION" -gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$INSTANCE_NAME,_SERVICE_NAME=$INSTANCE_NAME,_REGION=$ZONE \ No newline at end of file +gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$INSTANCE_NAME,_SERVICE_NAME=$INSTANCE_NAME,_REGION=$REGION diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 5881765..32503cb 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -20,6 +20,3 @@ images: substitutions: _SERVICE_FOLDER: . - _REPO_NAME: gpt-bayes # Name of Google Artifact Registry (GAR)'s repo - _SERVICE_NAME: gpt-bayes # Name of Cloud Run Service - _REGION: us-central1 From 9bd8da50f2f83990fc44e77cd37cd05466e7b5eb Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 12:42:23 -0500 Subject: [PATCH 06/29] updated development IP and deployment script --- config.yaml | 2 +- deploy.sh | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/config.yaml b/config.yaml index ddba523..4fec6ac 100644 --- a/config.yaml +++ b/config.yaml @@ -8,7 +8,7 @@ production: development: instanceName: dev-gpt-bayes serverName: dev-nextgen-mmm.pymc-labs.com - ipAddress: 34.59.137.141 + ipAddress: 34.71.232.125 port: 5000 region: us-central1 zone: us-central1-a diff --git a/deploy.sh b/deploy.sh index 69f6413..3bde53c 100755 --- a/deploy.sh +++ b/deploy.sh @@ -12,12 +12,13 @@ ENVIRONMENT=$1 # Function to read config value get_config() { local key=$1 - yq ".$ENVIRONMENT.$key" config.yaml + yq ".$ENVIRONMENT.$key" config.yaml | tr -d '"' } # Get configuration values INSTANCE_NAME=$(get_config "instanceName") -ZONE=$(get_config "region")-a +REGION=$(get_config "region") +ZONE=$(get_config "zone") # Function to check if rebuild is needed should_rebuild() { @@ -34,7 +35,7 @@ should_rebuild() { if should_rebuild; then echo "Rebuilding container..." - gcloud builds submit + ./build.sh $ENVIRONMENT echo "Cleaning up docker system..." gcloud compute ssh "$INSTANCE_NAME" --zone "$ZONE" --command 'docker system prune -f -a' @@ -42,7 +43,7 @@ if should_rebuild; then echo "Updating container..." gcloud compute instances update-container "$INSTANCE_NAME" \ --zone="$ZONE" \ - --container-image=$ZONE-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest + --container-image=$REGION-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest else echo "Copying new files and restarting application..." # First, copy files to the instance @@ -50,7 +51,7 @@ else # Then copy files into the container and restart services gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command " - CONTAINER_ID=\$(docker ps --filter ancestor=$ZONE-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q) && \ + CONTAINER_ID=\$(docker ps --filter ancestor=$REGION-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q) && \ if [ -z \"\$CONTAINER_ID\" ]; then echo \"Error: Could not find running container\" exit 1 @@ -69,7 +70,7 @@ else # Restart the container echo "Restarting container..." - gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command "docker restart \$(docker ps --filter ancestor=$ZONE-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q)" + gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command "docker restart \$(docker ps --filter ancestor=$REGION-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q)" fi echo "Deployment complete!" From 7a728e27ba89c71a97f6ac918fdd37dba933dcbd Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 13:46:52 -0500 Subject: [PATCH 07/29] attempt at setting up nginx, deployment failed --- config.yaml | 20 +++++++++---- nginx/cloudbuild.yaml | 5 +--- nginx/deploy.sh | 36 +++++++++++++++++++++++ nginx/{nginx.conf => nginx.conf.template} | 4 +-- 4 files changed, 53 insertions(+), 12 deletions(-) create mode 100755 nginx/deploy.sh rename nginx/{nginx.conf => nginx.conf.template} (85%) diff --git a/config.yaml b/config.yaml index 4fec6ac..d5b02d6 100644 --- a/config.yaml +++ b/config.yaml @@ -1,14 +1,22 @@ production: instanceName: gpt-bayes - serverName: nextgen-mmm.pymc-labs.com - ipAddress: 35.208.203.115 - port: 5000 region: us-central1 zone: us-central1-a + nginx: + repoName: gcr2gce-proxy + serviceName: gcr2gce-proxy + serverName: nextgen-mmm.pymc-labs.com + ipAddress: 35.208.203.115 + port: 5000 + region: us-central1 development: instanceName: dev-gpt-bayes - serverName: dev-nextgen-mmm.pymc-labs.com - ipAddress: 34.71.232.125 - port: 5000 region: us-central1 zone: us-central1-a + nginx: + repoName: dev-gcr2gce-proxy + serviceName: dev-gcr2gce-proxy + serverName: dev-nextgen-mmm.pymc-labs.com + ipAddress: 34.71.232.125 + port: 5000 + region: us-central1 \ No newline at end of file diff --git a/nginx/cloudbuild.yaml b/nginx/cloudbuild.yaml index fb65fe7..0c9c84b 100644 --- a/nginx/cloudbuild.yaml +++ b/nginx/cloudbuild.yaml @@ -37,7 +37,4 @@ images: - ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPO_NAME}/${_SERVICE_NAME} substitutions: - _SERVICE_FOLDER: . - _REPO_NAME: gcr2gce-proxy # Name of Google Artifact Registry (GAR)'s repo - _SERVICE_NAME: gcr2gce-proxy # Name of Cloud Run Service - _REGION: us-central1 + _SERVICE_FOLDER: . \ No newline at end of file diff --git a/nginx/deploy.sh b/nginx/deploy.sh new file mode 100755 index 0000000..6b40fac --- /dev/null +++ b/nginx/deploy.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# Validate environment argument +if [ "$1" != "production" ] && [ "$1" != "development" ]; then + echo "Error: Environment must be specified as either 'production' or 'development'" + echo "Usage: $0 " + exit 1 +fi + +ENVIRONMENT=$1 + +# Function to read config value +get_nginx_config() { + local key=$1 + yq ".$ENVIRONMENT.nginx.$key" ../config.yaml | tr -d '"' +} + +# Get configuration values +SERVER_NAME=$(get_nginx_config "serverName") +IP_ADDRESS=$(get_nginx_config "ipAddress") +PORT=$(get_nginx_config "port") +REPO_NAME=$(get_nginx_config "repoName") +SERVICE_NAME=$(get_nginx_config "serviceName") +REGION=$(get_nginx_config "region") + +# Replace placeholders in nginx.conf.template +sed -e "s/\${SERVER_NAME}/$SERVER_NAME/g" \ + -e "s/\${IP_ADDRESS}/$IP_ADDRESS/g" \ + -e "s/\${PORT}/$PORT/g" nginx.conf.template > nginx.conf + + +# Deploy +gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$REPO_NAME,_SERVICE_NAME=$SERVICE_NAME,_REGION=$REGION + +# Clean up +rm nginx.conf diff --git a/nginx/nginx.conf b/nginx/nginx.conf.template similarity index 85% rename from nginx/nginx.conf rename to nginx/nginx.conf.template index 8687395..e62aeae 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf.template @@ -14,10 +14,10 @@ http { server { listen 8080; - server_name nextgen-mmm.pymc-labs.com; + server_name ${SERVER_NAME}; location / { - proxy_pass http://35.208.203.115:5000; + proxy_pass http://${IP_ADDRESS}:${PORT}; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; From f52d19e2383002ed8438fedc840149a497d872fc Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 14:01:54 -0500 Subject: [PATCH 08/29] update readme for development env --- README.md | 63 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index f39e2cf..97ed337 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ A cloud-based Marketing Mix Modeling (MMM) solution deployed on Google Cloud Pla ## Quick Deployment ```bash -./deploy.sh # Deploy the latest version to production +./deploy.sh production # Deploy the latest version to production +./deploy.sh development # Deploy the latest version to development ``` ## System Architecture @@ -18,6 +19,7 @@ GPT-Bayes consists of two main components: 2. **Backend Service** - Production URL: https://nextgen-mmm.pymc-labs.com + - Development URL: https://dev-nextgen-mmm.pymc-labs.com - Function: Handles model fitting and parameter management via API endpoints - Infrastructure: Hosted on Google Cloud Engine (GCE) under the `gpt-bayes` project @@ -31,8 +33,10 @@ GPT-Bayes consists of two main components: - `nginx/` - NGINX reverse proxy settings - `dockerfile` - Container specifications - `start.sh` - Container initialization +- `build.sh` - Build the container image - `deploy.sh` - Deployment automation - `environment.yml` - Development environment specifications +- `config.yaml` - Environment configuration settings ### AI Agent Settings - `gpt-agent/gpt_prompt.md` - System instructions @@ -45,23 +49,25 @@ GPT-Bayes consists of two main components: ## Deployment Guide -The application runs on Google Compute Engine (GCE) under the `gpt-bayes` project, accessible at `https://nextgen-mmm.pymc-labs.com`. +The application runs on Google Compute Engine (GCE) under the `gpt-bayes` project, accessible at `https://nextgen-mmm.pymc-labs.com` (production) and `https://dev-nextgen-mmm.pymc-labs.com` (development). ### Standard Deployment Use `deploy.sh` to update the application. This script handles: - Updating the container in Google Artifact Registry (GAR) -- Deploying to the production environment +- Deploying to the specified environment ```bash -./deploy.sh +./deploy.sh production # Deploy the latest version to production +./deploy.sh development # Deploy the latest version to development ``` ### Server Management -Access the production server: +Access the specified server: ```bash gcloud compute ssh gpt-bayes --zone us-central1-a +gcloud compute ssh dev-gpt-bayes --zone us-central1-a ``` Container management commands: @@ -81,10 +87,11 @@ docker exec -it CONTAINER_ID /bin/bash Build and publish to Google Artifact Registry: ```bash -gcloud builds submit +./build.sh production # Build and publish to production +./build.sh development # Build and publish to development ``` -Note: This updates the container image but doesn't affect the production deployment. +Note: This updates the container image but doesn't affect the specified deployment. ### Server Instance Management @@ -93,28 +100,41 @@ View available Container-Optimized OS images: gcloud compute images list --project cos-cloud --no-standard-images ``` -Update production container: +Update specified container: ```bash # Clear existing containers gcloud compute ssh gpt-bayes --zone us-central1-a --command 'docker system prune -f -a' +gcloud compute ssh dev-gpt-bayes --zone us-central1-a --command 'docker system prune -f -a' # Deploy new container gcloud compute instances update-container gpt-bayes \ --zone=us-central1-a \ --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest + +gcloud compute instances update-container dev-gpt-bayes \ + --zone=us-central1-a \ + --container-image=us-central1-docker.pkg.dev/bayes-gpt/dev-gpt-bayes/dev-gpt-bayes:latest ``` Create new server instance: ```bash -gcloud compute instances create gpt-bayes \ - --machine-type e2-standard-4 \ - --boot-disk-size 20GB \ - --image image-name \ - --image-project cos-cloud \ - --zone us-central1 \ - --metadata container-image=your-container-image-name \ - --tags http-server \ - --firewall-create allow-http + gcloud compute instances create-with-container gpt-bayes \ + --machine-type e2-standard-4 \ + --boot-disk-size 20GB \ + --image cos-stable-117-18613-164-4 \ + --image-project cos-cloud \ + --zone us-central1-a \ + --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest \ + --tags http-server,https-server + + gcloud compute instances create-with-container dev-gpt-bayes \ + --machine-type e2-standard-4 \ + --boot-disk-size 20GB \ + --image cos-stable-117-18613-164-4 \ + --image-project cos-cloud \ + --zone us-central1-a \ + --container-image=us-central1-docker.pkg.dev/bayes-gpt/dev-gpt-bayes/dev-gpt-bayes:latest \ + --tags http-server,https-server ``` ### NGINX Configuration (Advanced) @@ -122,13 +142,14 @@ gcloud compute instances create gpt-bayes \ Deploy NGINX reverse proxy updates: ```bash cd nginx -gcloud builds submit +./deploy.sh production # Deploy the latest version to production +./deploy.sh development # Deploy the latest version to development ``` Update backend IP address: -1. Navigate to `nginx/nginx.conf` -2. Modify the `proxy_pass` directive with the new IP -3. Example: `proxy_pass http://35.208.203.115:5000;` +1. Navigate to `config.yaml` +2. Modify the `ipAddress` directive with the new IP +3. Example: `ipAddress: 35.208.203.115` ## Local Development From bb90dcbde8e6ae7844d799635e5d36d26faaab12 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 30 Jan 2025 14:04:47 -0500 Subject: [PATCH 09/29] more readme updates --- README.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 97ed337..9160177 100644 --- a/README.md +++ b/README.md @@ -51,9 +51,17 @@ GPT-Bayes consists of two main components: The application runs on Google Compute Engine (GCE) under the `gpt-bayes` project, accessible at `https://nextgen-mmm.pymc-labs.com` (production) and `https://dev-nextgen-mmm.pymc-labs.com` (development). +### Build and Push Docker Image + +Build and push the Docker image to Google Artifact Registry (GAR). +```bash +./build.sh production # Build and publish to production +./build.sh development # Build and publish to development +``` + ### Standard Deployment -Use `deploy.sh` to update the application. This script handles: +Once the Docker image is built and pushed to GAR, use `deploy.sh` to update the application. This script handles: - Updating the container in Google Artifact Registry (GAR) - Deploying to the specified environment From 1815ac667b35aa2ecd16f6afdda9a2caf0a937ca Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Fri, 31 Jan 2025 12:57:57 -0500 Subject: [PATCH 10/29] update dev VM IP address --- config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index d5b02d6..9a866a9 100644 --- a/config.yaml +++ b/config.yaml @@ -17,6 +17,6 @@ development: repoName: dev-gcr2gce-proxy serviceName: dev-gcr2gce-proxy serverName: dev-nextgen-mmm.pymc-labs.com - ipAddress: 34.71.232.125 + ipAddress: 34.59.137.141 port: 5000 region: us-central1 \ No newline at end of file From 206c8c9385f51a4c6c2c8d9483181e4322449db3 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 4 Feb 2025 16:01:28 -0500 Subject: [PATCH 11/29] attempts to fix redis misconfiguration --- Dockerfile | 3 ++- app.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9909084..c7a2c9c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,8 @@ RUN micromamba install -y -n base -f /tmp/env.yaml && \ mkdir -p /opt/conda/var/db && \ chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db && \ mkdir -p /opt/conda/var/db/redis && \ - chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis + chown -R $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis && \ + chmod -R 750 /opt/conda/var/db/redis # Copy the start script COPY --chown=$MAMBA_USER:$MAMBA_USER start.sh $APP_HOME/start.sh diff --git a/app.py b/app.py index 6543466..79aa2f5 100644 --- a/app.py +++ b/app.py @@ -53,7 +53,6 @@ app.config['broker_url'] = 'redis://localhost:6379/0' app.config['result_backend'] = 'redis://localhost:6379/0' - celery = Celery(app.name, broker=app.config['broker_url']) celery.conf.update(app.config) celery.conf.update( @@ -62,9 +61,23 @@ task_time_limit=600, # Add 1-hour timeout broker_connection_retry_on_startup=True, # Retry broker connection on startup worker_redirect_stdouts=False, # Don't redirect stdout/stderr - worker_redirect_stdouts_level='DEBUG' # Log level for stdout/stderr + worker_redirect_stdouts_level='DEBUG', # Log level for stdout/stderr + broker_transport_options={ + 'retry_on_timeout': True, + 'max_retries': 3, + }, + redis_max_connections=10, + broker_pool_limit=None, # Disable connection pool size limit ) +# Add Redis error logging +logging.info("Initializing Celery with Redis backend") +try: + celery.backend.client.ping() + logging.info("Successfully connected to Redis backend") +except Exception as e: + logging.error("Failed to connect to Redis: %s", str(e), exc_info=True) + logging.info("App started. Version: %s", __version__) # Create a data directory if it doesn't exist From 4ba6eccb4cf7aae1fa558fa115a6fd6e4c5dd482 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 4 Feb 2025 16:25:08 -0500 Subject: [PATCH 12/29] update tests to include development server --- test_mmm_async.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test_mmm_async.py b/test_mmm_async.py index d228909..5ce8ae0 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -87,10 +87,12 @@ def test_async_mmm_run(base_url): if environment == "local": base_url = "http://localhost:5001" - elif environment == "deployed": + elif environment == "deployed-production": base_url = "https://nextgen-mmm.pymc-labs.com" + elif environment == "deployed-development": + base_url = "https://dev-nextgen-mmm.pymc-labs.com" else: - print("Invalid argument. Use 'local' or 'deployed'.") + print("Invalid argument. Use 'local' or 'deployed-production' or 'deployed-development'.") sys.exit(1) test_async_mmm_run(base_url) From 79742e46e477c687201b20fb1950bb2ad93d0f78 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 4 Feb 2025 16:26:27 -0500 Subject: [PATCH 13/29] fixes permissions, no R needed --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index c7a2c9c..bd99506 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,8 @@ RUN micromamba install -y -n base -f /tmp/env.yaml && \ mkdir -p /opt/conda/var/db && \ chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db && \ mkdir -p /opt/conda/var/db/redis && \ - chown -R $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis && \ - chmod -R 750 /opt/conda/var/db/redis + chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis && \ + chmod 750 /opt/conda/var/db/redis # Copy the start script COPY --chown=$MAMBA_USER:$MAMBA_USER start.sh $APP_HOME/start.sh From 911da849602fac12c819afa93d9a256f38cb11aa Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 4 Feb 2025 16:26:52 -0500 Subject: [PATCH 14/29] add allow-tcp-5000 to networking tags --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9160177..761e5e2 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,7 @@ Create new server instance: --image-project cos-cloud \ --zone us-central1-a \ --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest \ - --tags http-server,https-server + --tags http-server,https-server,allow-tcp-5000 gcloud compute instances create-with-container dev-gpt-bayes \ --machine-type e2-standard-4 \ @@ -142,7 +142,8 @@ Create new server instance: --image-project cos-cloud \ --zone us-central1-a \ --container-image=us-central1-docker.pkg.dev/bayes-gpt/dev-gpt-bayes/dev-gpt-bayes:latest \ - --tags http-server,https-server + --tags http-server,https-server,allow-tcp-5000 + ``` ### NGINX Configuration (Advanced) From fcd491d3edeacf59022efcd16728cbcaca9b0e9b Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 6 Feb 2025 10:01:10 -0500 Subject: [PATCH 15/29] checking in progress --- app.py | 94 +++++++++++++++++++++++------------------------ test_mmm_async.py | 2 +- 2 files changed, 46 insertions(+), 50 deletions(-) diff --git a/app.py b/app.py index 79aa2f5..5f432c6 100644 --- a/app.py +++ b/app.py @@ -87,8 +87,6 @@ os.chmod(DATA_DIR, 0o777) - - @celery.task(bind=True) def run_mmm_task(self, data): """Run Marketing Mix Model analysis task. @@ -140,10 +138,6 @@ def is_valid_dates(df, column): if not is_valid_dates(df, date_column): raise ValueError(f"Date column must be in YYYY-MM-DD format (e.g. 2023-12-31). Found values like: {df[date_column].iloc[0]} with dtype: {df[date_column].dtype}") - - # Define and fit the MMM model - # import ipdb; ipdb.set_trace() - logging.debug("Creating MMM model") mmm = MMM( adstock=GeometricAdstock(l_max=int(adstock_max_lag)), @@ -159,43 +153,15 @@ def is_valid_dates(df, column): y = df['sales'] logging.debug("Starting model fitting.") - - # mmm.fit(X, y, random_seed=42, cores=1) mmm.fit(X, y) logging.info("Model fitting completed.") - - # Extract and return summary statistics - summary = az.summary(mmm.fit_result) - - # Filter only the most important statistics - important_params = summary[summary.index.str.contains('alpha|beta|sigma|intercept|lam|gamma_control', case=False)] - # Limit decimal places and convert to more compact format - important_params = important_params.round(5) - - summary_json = important_params.to_json(orient="split", double_precision=5) - logging.info("Summary statistics extracted.") - logging.info("summary_json=%s", summary_json) - - # Add model metrics - response = { - "status": "completed", - "summary": summary_json, - # "model_info": { - # "num_observations": len(df), - # "channels": channel_columns, - # "adstock_max_lag": adstock_max_lag, - # "yearly_seasonality": yearly_seasonality - # } - } - logging.info("run_mmm_task completed successfully.") - logging.debug("response=%s", response) - return response + return mmm except Exception as e: logging.error("run_mmm_task failed: %s\nJSON data: %s", str(e), data, exc_info=True) - return {"status": "failed", "error": str(e)} + return {"status": "failed", "error": str(e)} @app.route('/run_mmm_async', methods=['POST']) @@ -208,21 +174,15 @@ def run_mmm_async(): task = run_mmm_task.apply_async(args=[data]) logging.info("Task submitted with ID: %s", task.id) - # session[task.id] = "STARTED" - return jsonify({"task_id": task.id}) except Exception as e: logging.error("Error in run_mmm_async: %s", str(e), exc_info=True) return jsonify({"error": str(e)}), 500 -@app.route('/get_results', methods=['GET']) -def get_results(): - try: - task_id = request.args.get('task_id') - logging.info("Received request for get_results with task_id: %s", task_id) - # if task_id not in session: - # return jsonify({'status': "failure", "error":'No such task'}), 404 +def check_task_status(task_id): + try: + logging.info("Received request for check_task_status with task_id: %s", task_id) task = run_mmm_task.AsyncResult(task_id) if task.state == 'PENDING': @@ -230,15 +190,51 @@ def get_results(): response = {"status": "pending"} elif task.state != 'FAILURE': logging.info("Task %s completed successfully.", task_id) - response = task.result + response = {"status": "completed"} else: logging.error("Task %s failed.", task_id) response = {"status": "failure", "error": str(task.info)} - return jsonify(response) + return response except Exception as e: - logging.error("Error in get_results: %s", str(e), exc_info=True) - return jsonify({"error": str(e)}), 500 + logging.error("Error in check_task_status: %s", str(e), exc_info=True) + return {"status": "failure", "error": str(e)} + + +@app.route('/extract_summary_statistics', methods=['GET']) +def extract_summary_statistics(): + try: + task_id = request.args.get('task_id') + logging.info("Received request for extract_summary_statistics with task_id: %s", task_id) + + task_status = check_task_status(task_id) + logging.info("Task status: %s with task_id: %s", task_status, task_id) + + if task_status["status"] == "completed": + task = run_mmm_task.AsyncResult(task_id) + mmm = task.result + logging.info("MMM model: %s", mmm) + import pdb; pdb.set_trace() + + # Extract and return summary statistics + summary = az.summary(mmm.fit_result) + + # Filter only the most important statistics + important_params = summary[summary.index.str.contains('alpha|beta|sigma|intercept|lam|gamma_control', case=False)] + # Limit decimal places and convert to more compact format + important_params = important_params.round(5) + + summary_json = important_params.to_json(orient="split", double_precision=5) + logging.info("Summary statistics extracted.") + logging.info("summary_json=%s", summary_json) + + return jsonify({"status": "completed", "summary": summary_json}) + else: + return jsonify(task_status) + except Exception as e: + logging.error("Error in extract_summary_statistics: %s", str(e), exc_info=True) + return jsonify({"status": "failure", "error": str(e)}), 500 + if __name__ == '__main__': from argparse import ArgumentParser diff --git a/test_mmm_async.py b/test_mmm_async.py index 5ce8ae0..5e1f044 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -52,7 +52,7 @@ def test_async_mmm_run(base_url): print(f"Got task_id {task_id}") # Polling URL - results_url = f"{base_url}/get_results?task_id={task_id}" + results_url = f"{base_url}/extract_summary_statistics?task_id={task_id}" # Poll for results while True: From 99754c329e69eaa8c3876c98f9f0ac27d145d345 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Fri, 7 Feb 2025 14:48:39 -0500 Subject: [PATCH 16/29] updated code to handle new payload, cant yet test locally --- app.py | 52 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/app.py b/app.py index 5f432c6..56fb8be 100644 --- a/app.py +++ b/app.py @@ -46,6 +46,8 @@ # Additional local logging configuration (if needed) # For example, you can set a file handler or a stream handler for local logging pass + # from celery.utils.log import get_task_logger + # logging = get_task_logger(__name__) # Initialize Celery @@ -110,12 +112,28 @@ def run_mmm_task(self, data): # Ensure the file is readable/writable os.chmod(data_file, 0o666) + params = data.get("params", {}) + try: - df = pd.read_json(io.StringIO(data["df"]), orient="split") - except Exception as e: - logging.info("Error reading JSON data attempting to read CSV: %s", str(e), exc_info=True) - df = pd.read_csv(io.StringIO(data["df"])) + file_refs = params.get("openaiFileIdRefs", []) + if len(file_refs) == 0: + logging.info("No file references found") + raise ValueError("No file references found") + else: + download_url = file_refs[0].get("download_link", "") # TODO: handle multiple files + logging.info("Downloading data from %s", download_url) + df = pd.read_csv(download_url) + logging.info("Data downloaded successfully") + + logging.info("Saving data to file") + file_name = file_refs[0].get("name", "") + file_path = os.path.join(DATA_DIR, file_name) + df.to_csv(file_path, index=False) + logging.info("Data saved to file %s", file_path) + except Exception as e: + logging.info("Error reading data attempting to read CSV: %s", str(e), exc_info=True) + raise e logging.info("DataFrame loaded with shape=%s and columns=%s", df.shape, df.columns) logging.info("First 5 rows:\n%s", df.head(5)) @@ -124,11 +142,12 @@ def run_mmm_task(self, data): if len(df) < 15: raise ValueError(f"DataFrame must have at least 15 rows for reliable model fitting. Current shape: {df.shape}") # Extract optional parameters from 'data' - date_column = data.get('date_column', 'date') - channel_columns = data.get('channel_columns', []) - adstock_max_lag = data.get('adstock_max_lag', 8) - yearly_seasonality = data.get('yearly_seasonality', 2) - control_columns = data.get('control_columns', None) + date_column = params.get('date_column', 'date') + channel_columns = params.get('channel_columns', []) + adstock_max_lag = params.get('adstock_max_lag', 8) + yearly_seasonality = params.get('yearly_seasonality', 2) + control_columns = params.get('control_columns', None) + y_column = params.get('y_column', 'sales') logging.debug("Parameters extracted: date_column=%s, channel_columns=%s, adstock_max_lag=%d, yearly_seasonality=%d, control_columns=%s", date_column, channel_columns, adstock_max_lag, yearly_seasonality, control_columns) @@ -140,7 +159,7 @@ def is_valid_dates(df, column): logging.debug("Creating MMM model") mmm = MMM( - adstock=GeometricAdstock(l_max=int(adstock_max_lag)), + adstock=GeometricAdstock(l_max=adstock_max_lag), saturation=LogisticSaturation(), date_column=date_column, channel_columns=channel_columns, @@ -148,11 +167,15 @@ def is_valid_dates(df, column): yearly_seasonality=yearly_seasonality, ) logging.info("MMM model defined.") - - X = df.drop('sales', axis=1) - y = df['sales'] - logging.debug("Starting model fitting.") + # Ensure date_week is in datetime format + df[date_column] = pd.to_datetime(df[date_column]) + + + # X = df.drop(y_column, axis=1).astype(float) + X = df.drop(y_column, axis=1) + y = df[y_column].astype(float) + mmm.fit(X, y) logging.info("Model fitting completed.") logging.info("run_mmm_task completed successfully.") @@ -214,7 +237,6 @@ def extract_summary_statistics(): task = run_mmm_task.AsyncResult(task_id) mmm = task.result logging.info("MMM model: %s", mmm) - import pdb; pdb.set_trace() # Extract and return summary statistics summary = az.summary(mmm.fit_result) From a9468661a90933e734ba06db035c866c5e6cd70a Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Mon, 10 Feb 2025 10:45:00 -0500 Subject: [PATCH 17/29] model persistence working, able to run extract_summary_statistics --- app.py | 85 +++++++++++++++++++++++------------------------ environment.yml | 1 + test_mmm_async.py | 49 +++++++++++++++------------ 3 files changed, 70 insertions(+), 65 deletions(-) diff --git a/app.py b/app.py index 56fb8be..5ff6c59 100644 --- a/app.py +++ b/app.py @@ -12,7 +12,7 @@ import logging -import pickle +import dill as pickle # more robust than pickle import os import io @@ -180,7 +180,10 @@ def is_valid_dates(df, column): logging.info("Model fitting completed.") logging.info("run_mmm_task completed successfully.") - return mmm + logging.info("Serializing MMM model") + mmm_pickle = pickle.dumps(mmm) + logging.info("MMM model serialized successfully") + return mmm_pickle except Exception as e: logging.error("run_mmm_task failed: %s\nJSON data: %s", str(e), data, exc_info=True) @@ -203,56 +206,52 @@ def run_mmm_async(): return jsonify({"error": str(e)}), 500 -def check_task_status(task_id): - try: - logging.info("Received request for check_task_status with task_id: %s", task_id) +def check_task_status(f): + def wrapper(*args, **kwargs): + try: + task_id = kwargs.get('task_id') or request.args.get('task_id') + logging.info("Checking task status with task_id: %s", task_id) - task = run_mmm_task.AsyncResult(task_id) - if task.state == 'PENDING': - logging.info("Task %s is still pending.", task_id) - response = {"status": "pending"} - elif task.state != 'FAILURE': + task = run_mmm_task.AsyncResult(task_id) + if task.state == 'PENDING': + logging.info("Task %s is still pending.", task_id) + return jsonify({"status": "pending"}) + elif task.state == 'FAILURE': + logging.error("Task %s failed.", task_id) + return jsonify({"status": "failure", "error": str(task.info)}) + + # If task completed successfully, proceed with the decorated function logging.info("Task %s completed successfully.", task_id) - response = {"status": "completed"} - else: - logging.error("Task %s failed.", task_id) - response = {"status": "failure", "error": str(task.info)} - - return response - except Exception as e: - logging.error("Error in check_task_status: %s", str(e), exc_info=True) - return {"status": "failure", "error": str(e)} - + return f(*args, **kwargs) + + except Exception as e: + logging.error("Error in check_task_status: %s", str(e), exc_info=True) + return jsonify({"status": "failure", "error": str(e)}), 500 + + return wrapper @app.route('/extract_summary_statistics', methods=['GET']) +@check_task_status def extract_summary_statistics(): try: task_id = request.args.get('task_id') - logging.info("Received request for extract_summary_statistics with task_id: %s", task_id) - - task_status = check_task_status(task_id) - logging.info("Task status: %s with task_id: %s", task_status, task_id) - - if task_status["status"] == "completed": - task = run_mmm_task.AsyncResult(task_id) - mmm = task.result - logging.info("MMM model: %s", mmm) + task = run_mmm_task.AsyncResult(task_id) + mmm = pickle.loads(task.result) + logging.info("MMM model: %s", mmm) - # Extract and return summary statistics - summary = az.summary(mmm.fit_result) - - # Filter only the most important statistics - important_params = summary[summary.index.str.contains('alpha|beta|sigma|intercept|lam|gamma_control', case=False)] - # Limit decimal places and convert to more compact format - important_params = important_params.round(5) - - summary_json = important_params.to_json(orient="split", double_precision=5) - logging.info("Summary statistics extracted.") - logging.info("summary_json=%s", summary_json) + # Extract and return summary statistics + summary = az.summary(mmm.fit_result) + + # Filter only the most important statistics + important_params = summary[summary.index.str.contains('alpha|beta|sigma|intercept|lam|gamma_control', case=False)] + # Limit decimal places and convert to more compact format + important_params = important_params.round(5) + + summary_json = important_params.to_json(orient="split", double_precision=5) + logging.info("Summary statistics extracted.") + logging.info("summary_json=%s", summary_json) - return jsonify({"status": "completed", "summary": summary_json}) - else: - return jsonify(task_status) + return jsonify({"status": "completed", "summary": summary_json}) except Exception as e: logging.error("Error in extract_summary_statistics: %s", str(e), exc_info=True) return jsonify({"status": "failure", "error": str(e)}), 500 diff --git a/environment.yml b/environment.yml index ed21a2a..addddce 100644 --- a/environment.yml +++ b/environment.yml @@ -13,4 +13,5 @@ dependencies: - redis-py=4.5.0 - procps-ng - yq=2.12.0 + - dill=0.3.9 diff --git a/test_mmm_async.py b/test_mmm_async.py index 5e1f044..cc7c2b4 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -6,35 +6,40 @@ import sys import io -def create_payload_csv(): - # Load the user-uploaded data file - data = pd.read_csv('test-data/mmm_example.csv') - # Rename the 'y' column to 'sales' and select relevant columns - data.rename(columns={'y': 'sales'}, inplace=True) - mmm_data = data[['date_week', 'sales', 'x1', 'x2', 'event_1', 'event_2', 't']] - - # Convert 'date_week' to datetime format - mmm_data.loc[:, 'date_week'] = pd.to_datetime(mmm_data['date_week']).dt.strftime('%Y-%m-%d') - - # Convert the prepared data to JSON format for payload, ensuring proper formatting - data_json = mmm_data.to_json(orient="split", index=False) - #print(data_json) - # Example payload +def create_payload(): + payload = { - "df": data_json, - "date_column": "date_week", - "channel_columns": ["x1", "x2"], - "adstock_max_lag": 2, - "yearly_seasonality": 8, - "control_columns": ["event_1", "event_2", "t"] + "domain": "dev-nextgen-mmm.pymc-labs.com", + "method": "post", + "path": "/run_mmm_async", + "operation": "runMMMAsync", + "operation_hash": "0c869884cb92378e2dfe2ae377cac236cbc2b9d0", + "is_consequential": True, + "params": { + "openaiFileIdRefs": [ + { + "name": "mmm_example.csv", + "id": "file-1234567890", + "mime_type": "text/csv", + "download_link": "https://raw.githubusercontent.com/pymc-labs/pymc-marketing/refs/heads/main/data/mmm_example.csv" + } + ], + "date_column": "date_week", + "channel_columns": [ + "x1", + "x2" + ], + "adstock_max_lag": 8, + "yearly_seasonality": 2, + "y_column": "y" + } } - return payload def test_async_mmm_run(base_url): # Payload that includes data - payload = create_payload_csv() + payload = create_payload() # Replace with your API endpoint for async run run_url = f"{base_url}/run_mmm_async" From 2fcc068e58d28a6edf4aca27dbc9881f28f126a9 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 11 Feb 2025 13:09:41 -0500 Subject: [PATCH 18/29] saving latest progress --- app.py | 90 ++++++++++++++++++++++++++++++++++++++--------- test_mmm_async.py | 31 ++++++++-------- 2 files changed, 88 insertions(+), 33 deletions(-) diff --git a/app.py b/app.py index 5ff6c59..ea67d18 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,4 @@ +import json from flask import Flask, request, jsonify from celery import Celery @@ -16,6 +17,8 @@ import os import io +from functools import wraps + __version__ = "0.3" @@ -101,7 +104,6 @@ def run_mmm_task(self, data): try: logging.info("Starting run_mmm_task here!!") - # Use the dedicated data directory data_file = os.path.join(DATA_DIR, f"data_{self.request.id}.pkl") @@ -112,18 +114,33 @@ def run_mmm_task(self, data): # Ensure the file is readable/writable os.chmod(data_file, 0o666) - params = data.get("params", {}) - try: - file_refs = params.get("openaiFileIdRefs", []) + file_refs = data.get("openaiFileIdRefs", []) if len(file_refs) == 0: logging.info("No file references found") raise ValueError("No file references found") else: download_url = file_refs[0].get("download_link", "") # TODO: handle multiple files logging.info("Downloading data from %s", download_url) - df = pd.read_csv(download_url) - logging.info("Data downloaded successfully") + + # Add headers to the request + headers = { + 'User-Agent': 'Mozilla/5.0', + 'Accept': 'text/csv' + } + + try: + # Use requests library for better control over the HTTP request + import requests + response = requests.get(download_url, headers=headers) + response.raise_for_status() # Raise an exception for bad status codes + + # Read CSV from the response content + df = pd.read_csv(io.StringIO(response.text)) + logging.info("Data downloaded successfully") + except requests.exceptions.RequestException as e: + logging.error("Failed to download file: %s", str(e), exc_info=True) + raise ValueError(f"Failed to download file: {str(e)}") logging.info("Saving data to file") file_name = file_refs[0].get("name", "") @@ -132,7 +149,7 @@ def run_mmm_task(self, data): logging.info("Data saved to file %s", file_path) except Exception as e: - logging.info("Error reading data attempting to read CSV: %s", str(e), exc_info=True) + logging.error("Error reading data attempting to read CSV: %s", str(e), exc_info=True) raise e logging.info("DataFrame loaded with shape=%s and columns=%s", df.shape, df.columns) @@ -142,12 +159,12 @@ def run_mmm_task(self, data): if len(df) < 15: raise ValueError(f"DataFrame must have at least 15 rows for reliable model fitting. Current shape: {df.shape}") # Extract optional parameters from 'data' - date_column = params.get('date_column', 'date') - channel_columns = params.get('channel_columns', []) - adstock_max_lag = params.get('adstock_max_lag', 8) - yearly_seasonality = params.get('yearly_seasonality', 2) - control_columns = params.get('control_columns', None) - y_column = params.get('y_column', 'sales') + date_column = data.get('date_column', 'date') + channel_columns = data.get('channel_columns', []) + adstock_max_lag = data.get('adstock_max_lag', 8) + yearly_seasonality = data.get('yearly_seasonality', 2) + control_columns = data.get('control_columns', None) + y_column = data.get('y_column', 'y') logging.debug("Parameters extracted: date_column=%s, channel_columns=%s, adstock_max_lag=%d, yearly_seasonality=%d, control_columns=%s", date_column, channel_columns, adstock_max_lag, yearly_seasonality, control_columns) @@ -204,12 +221,22 @@ def run_mmm_async(): except Exception as e: logging.error("Error in run_mmm_async: %s", str(e), exc_info=True) return jsonify({"error": str(e)}), 500 + +@app.route('/get_task_status', methods=['GET']) +def get_task_status(): + task_id = request.args.get('task_id') + task = run_mmm_task.AsyncResult(task_id) + return jsonify({"status": task.state}) def check_task_status(f): + @wraps(f) # Preserve function metadata def wrapper(*args, **kwargs): try: - task_id = kwargs.get('task_id') or request.args.get('task_id') + task_id = request.args.get('task_id') # Simplify task_id extraction + if not task_id: + return jsonify({"status": "failure", "error": "No task_id provided"}), 400 + logging.info("Checking task status with task_id: %s", task_id) task = run_mmm_task.AsyncResult(task_id) @@ -230,9 +257,9 @@ def wrapper(*args, **kwargs): return wrapper -@app.route('/extract_summary_statistics', methods=['GET']) +@app.route('/get_summary_statistics', methods=['GET']) @check_task_status -def extract_summary_statistics(): +def get_summary_statistics(): try: task_id = request.args.get('task_id') task = run_mmm_task.AsyncResult(task_id) @@ -257,6 +284,37 @@ def extract_summary_statistics(): return jsonify({"status": "failure", "error": str(e)}), 500 +@app.route('/get_posterior_predictive', methods=['GET']) +@check_task_status +def get_posterior_predictive(): + try: + task_id = request.args.get('task_id') + task = run_mmm_task.AsyncResult(task_id) + mmm = pickle.loads(task.result) + logging.info("MMM model: %s", mmm) + + logging.info("Sampling posterior predictive") + mmm.sample_posterior_predictive(mmm.X, extend_idata = True, combined = True) + logging.info("Posterior predictive sampled") + + logging.info("Generating posterior predictive plot") + fig = mmm.plot_posterior_predictive() + logging.info("Posterior predictive plot generated") + + axes = fig.get_axes()[0] + posterior_predictive_dict = { + 'obs_xdata': list(map(lambda x: pd.to_datetime(x).strftime('%Y-%m-%d'), axes.get_lines()[0].get_xdata())), + 'obs_ydata': list(axes.get_lines()[0].get_ydata()), + 'pred_ydata': list(axes.get_lines()[1].get_ydata()) + } + posterior_predictive_json = json.dumps(posterior_predictive_dict) + logging.info("Posterior predictive JSON generated") + return jsonify({"status": "completed", "posterior_predictive": posterior_predictive_json}) + except Exception as e: + logging.error("Error in get_posterior_predictive: %s", str(e), exc_info=True) + return jsonify({"status": "failure", "error": str(e)}), 500 + + if __name__ == '__main__': from argparse import ArgumentParser parser = ArgumentParser() diff --git a/test_mmm_async.py b/test_mmm_async.py index cc7c2b4..a4cb669 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -7,7 +7,6 @@ import io def create_payload(): - payload = { "domain": "dev-nextgen-mmm.pymc-labs.com", "method": "post", @@ -15,24 +14,22 @@ def create_payload(): "operation": "runMMMAsync", "operation_hash": "0c869884cb92378e2dfe2ae377cac236cbc2b9d0", "is_consequential": True, - "params": { - "openaiFileIdRefs": [ - { - "name": "mmm_example.csv", - "id": "file-1234567890", - "mime_type": "text/csv", - "download_link": "https://raw.githubusercontent.com/pymc-labs/pymc-marketing/refs/heads/main/data/mmm_example.csv" - } - ], - "date_column": "date_week", - "channel_columns": [ + "openaiFileIdRefs": [ + { + "name": "mmm_example.csv", + "id": "file-1234567890", + "mime_type": "text/csv", + "download_link": "https://raw.githubusercontent.com/pymc-labs/pymc-marketing/refs/heads/main/data/mmm_example.csv" + } + ], + "date_column": "date_week", + "channel_columns": [ "x1", "x2" - ], - "adstock_max_lag": 8, - "yearly_seasonality": 2, - "y_column": "y" - } + ], + "adstock_max_lag": 8, + "yearly_seasonality": 2, + "y_column": "y" } return payload From db4a7df6dcdde37898201b9a93fafa61859f0b6f Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 11 Feb 2025 13:19:46 -0500 Subject: [PATCH 19/29] saving updated api spec and prompt --- gpt-agent/api_spec.json | 98 ++++++++++++++++++++++++++++++++++++++--- gpt-agent/gpt_prompt.md | 43 +++++++----------- 2 files changed, 107 insertions(+), 34 deletions(-) diff --git a/gpt-agent/api_spec.json b/gpt-agent/api_spec.json index 05d8270..7f556fa 100644 --- a/gpt-agent/api_spec.json +++ b/gpt-agent/api_spec.json @@ -7,7 +7,7 @@ }, "servers": [ { - "url": "https://nextgen-mmm.pymc-labs.com" + "url": "https://dev-nextgen-mmm.pymc-labs.com" } ], "paths": { @@ -16,16 +16,18 @@ "description": "Initiates an asynchronous MMM model run.", "operationId": "runMMMAsync", "requestBody": { - "description": "Data and parameters for the MMM model.", + "description": "Both raw and cleaned data for the MMM model.", "required": true, "content": { "application/json": { "schema": { "type": "object", "properties": { - "df": { - "type": "string", - "description": "Data in CSV format for the model. Must contain at least 15 rows." + "openaiFileIdRefs": { + "type": "array", + "items": { + "type": "string" + } }, "date_column": { "type": "string", @@ -74,10 +76,49 @@ } } }, - "/get_results": { + "/get_task_status": { + "get": { + "description": "Retrieves the status of an MMM model run.", + "operationId": "getTaskStatus", + "parameters": [ + { + "name": "task_id", + "in": "query", + "required": true, + "description": "The task ID of the MMM model run.", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Status of the model execution.", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "status": { + "type": "string", + "description": "Status of the model execution." + }, + "error": { + "type": "string", + "description": "Error message, present if status is 'failed'." + } + } + } + } + } + } + } + } + }, + "/get_summary_statistics": { "get": { "description": "Retrieves the results of an MMM model run.", - "operationId": "getMMMResults", + "operationId": "getSummaryStatistics", "parameters": [ { "name": "task_id", @@ -116,6 +157,49 @@ } } } + }, + "/get_posterior_predictive": { + "get": { + "description": "Retrieves the posterior predictive check of an MMM model run.", + "operationId": "getPosteriorPredictive", + "parameters": [ + { + "name": "task_id", + "in": "query", + "required": true, + "description": "The task ID of the MMM model run.", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Status of the model execution.", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "status": { + "type": "string", + "description": "Status of the model execution." + }, + "summary": { + "type": "string", + "description": "JSON string of posterior predictive samples, present if status is 'completed'." + }, + "error": { + "type": "string", + "description": "Error message, present if status is 'failed'." + } + } + } + } + } + } + } + } } }, "components": { diff --git a/gpt-agent/gpt_prompt.md b/gpt-agent/gpt_prompt.md index 2f94e1e..6da33ac 100644 --- a/gpt-agent/gpt_prompt.md +++ b/gpt-agent/gpt_prompt.md @@ -1,17 +1,18 @@ # Bayes MMM: Your Marketing Mix Modeling Assistant BayesMMM is a specialized assistant for marketing analytics, focusing on Marketing Mix Modeling (MMM) for analysts and business stakeholders. -It leverages the `nextgen-mmm.pymc-labs.com` API to run MMM models and retrive fitted parameters. This API provide two asycronous operations: +It leverages the `dev-nextgen-mmm.pymc-labs.com` API to run MMM models and retrieve fitted parameters. This API provides the following asynchronous operations: -1. `runMMMAsync` initianting the MMM model run and returns a unique `task_id` upon starting a the model fit. -2. `getMMMResults` which is used to check the model's status using the `task_id` and retrieve results (the parameters of the fitted model). +1. `runMMMAsync` initiating the MMM model run and returns a unique `task_id` upon starting a the model fit. +2. `getTaskStatus` which is used to check if the model has finished executing. +3. `getSummaryStatistics` which is used to retrieve summary statistics of the parameters of the fitted model. ## Key Responsibilities As BayesMMM, your main role is to: -1. Assist users in preparing and validating their data for MMM and ensure that is correcly formatted for the API operations. -2. Run the model asynchronously using `runMMMAsync` and track its progress with `getMMMResults`. +1. Assist users in preparing and validating their data for MMM and ensure that is correctly formatted for the API operations and then save it to a file so that it can be sent to a server for further processing. +2. Run the model asynchronously using `runMMMAsync`. When calling runMMMAsync, always make sure to include the file reference in the payload. 3. Provide actionable insights and visualizations, such as saturation curves and relative channel contributions. 4. Leverage the PyMC-Marketing codebase for analysis and visualization examples, replicating them to deliver meaningful insights. @@ -34,42 +35,32 @@ Handle missing values appropriately and convert the date column to the required data['date_column_name'] = pd.to_datetime(data['date_column_name']).dt.strftime('%Y-%m-%d') ``` -Always confirm with the user that the data is correctly formatted before proceeding to initiate the model run. +**Very Important:** +- Always confirm with the user that the data is correctly formatted before proceeding to initiate the model run. +- If you make any changes to the data, make sure to save the changes to a new file and provide the reference to the new file in the payload. ### 2. Initiating the Model Run -When asked to run the Baysian MMM model you must use the `runMMMAsync` API operation with the correctly formatted data. **Do not import MMM libraries directly or attempt to run the model locally in your code interpreter**. The payload to the API should include the data in CSV format and the following parameters: +When asked to run the Bayesian MMM model you must use the `runMMMAsync` API operation with the correctly formatted data. **Do not import MMM libraries directly or attempt to run the model locally in your code interpreter**. The payload to the API should include the reference to the data file and the following parameters: - **df**: The data as a CSV string. - **date_column**: Name of the date column. - **channel_columns**: List of channel spend columns. +- **y_column**: Name of the y column. - **Optional Parameters**: - **control_columns**: List of control columns. - **adstock_max_lag** (default: 8) - **yearly_seasonality** (default: 2) -Here is an example of how to convert the data to CSV before sending and create the payload for the API call: -```python -csv_data = data.to_csv(index=False) -payload = { - "df": csv_data, - "date_column": "date_column_name", - "channel_columns": ["channel_1_column_name", "channel_2_column_name"], - "control_columns": ["control_1_column_name", "control_2_column_name", "control_3_column_name"], - "adstock_max_lag": 8, - "yearly_seasonality": 2 -} -``` - -> **Very Important:** -> * DO NOT TRY TO IMPORT AN MMM LIBRARY AND RUN THE MODEL LOCALLY. -> * NEVER WRITE ANY CODE LIKE THIS `import nextgen_mmm_pymc_labs_com__jit_plugin as mmm_plugin` +**Very Important:** +- DO NOT TRY TO IMPORT AN MMM LIBRARY AND RUN THE MODEL LOCALLY. +- NEVER WRITE ANY CODE LIKE THIS `import nextgen_mmm_pymc_labs_com__jit_plugin as mmm_plugin` ### 3. Retrieving Results Once the run is initiated: -- Check Status: Use `task_id` that is returned from the `runMMMAsync` operation with `getMMMResults` to monitor progress (pending, completed, or failed). +- Check Status: Use `task_id` that is returned from the `runMMMAsync` operation with `getTaskStatus` to monitor progress (pending, completed, or failed). - Retrieve Results: After completion, analyze the results, including channel contributions and statistical insights. @@ -123,6 +114,4 @@ After retrieving results here are some ideas: - Saturation Curve Plot: Display channel saturations in a single plot with uncertainty. -- Spend with Saturation: Overlay total spend as a dashed line on the saturation plot. - - +- Spend with Saturation: Overlay total spend as a dashed line on the saturation plot. \ No newline at end of file From a2b853378ce6b7a59fb855268f4dff62749eeff4 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 11 Feb 2025 16:38:56 -0500 Subject: [PATCH 20/29] changing how we instantiate the celery app, hoping that fixes the redis config issue --- Dockerfile | 4 +- app.py | 111 +++++++++++++++++++++++----------------------- test_mmm_async.py | 2 +- 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/Dockerfile b/Dockerfile index bd99506..b071dfa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,8 @@ RUN micromamba install -y -n base -f /tmp/env.yaml && \ mkdir -p /opt/conda/var/db && \ chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db && \ mkdir -p /opt/conda/var/db/redis && \ - chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis && \ - chmod 750 /opt/conda/var/db/redis + chown -R $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis && \ + chmod -R 770 /opt/conda/var/db/redis # Copy the start script COPY --chown=$MAMBA_USER:$MAMBA_USER start.sh $APP_HOME/start.sh diff --git a/app.py b/app.py index ea67d18..330c56a 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,6 @@ import json from flask import Flask, request, jsonify -from celery import Celery +from celery import Celery, Task import pandas as pd import arviz as az @@ -52,38 +52,37 @@ # from celery.utils.log import get_task_logger # logging = get_task_logger(__name__) -# Initialize Celery - -app = Flask(__name__) -app.config['broker_url'] = 'redis://localhost:6379/0' -app.config['result_backend'] = 'redis://localhost:6379/0' - -celery = Celery(app.name, broker=app.config['broker_url']) -celery.conf.update(app.config) -celery.conf.update( - worker_pool='threads', # Use prefork (multiprocessing) - task_always_eager=False, # Ensure tasks are not run locally by the worker that started them - task_time_limit=600, # Add 1-hour timeout - broker_connection_retry_on_startup=True, # Retry broker connection on startup - worker_redirect_stdouts=False, # Don't redirect stdout/stderr - worker_redirect_stdouts_level='DEBUG', # Log level for stdout/stderr - broker_transport_options={ - 'retry_on_timeout': True, - 'max_retries': 3, - }, - redis_max_connections=10, - broker_pool_limit=None, # Disable connection pool size limit +# Create module-level Celery instance +celery = Celery( + "app", + broker="redis://localhost:6379/0", + backend="redis://localhost:6379/0" ) -# Add Redis error logging -logging.info("Initializing Celery with Redis backend") -try: - celery.backend.client.ping() - logging.info("Successfully connected to Redis backend") -except Exception as e: - logging.error("Failed to connect to Redis: %s", str(e), exc_info=True) +def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: + with app.app_context(): + return self.run(*args, **kwargs) -logging.info("App started. Version: %s", __version__) + celery.Task = FlaskTask + celery.config_from_object(app.config["CELERY"]) + app.extensions["celery"] = celery + return celery + +# Initialize Flask app +app = Flask(__name__) +app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost:6379/0", + result_backend="redis://localhost:6379/0", + worker_pool='threads', + task_time_limit=600, + broker_connection_retry=True, + broker_connection_max_retries=0, # Retry forever + ), +) +celery_app = celery_init_app(app) # Create a data directory if it doesn't exist DATA_DIR = "/tmp/mmm_data" @@ -284,35 +283,35 @@ def get_summary_statistics(): return jsonify({"status": "failure", "error": str(e)}), 500 -@app.route('/get_posterior_predictive', methods=['GET']) -@check_task_status -def get_posterior_predictive(): - try: - task_id = request.args.get('task_id') - task = run_mmm_task.AsyncResult(task_id) - mmm = pickle.loads(task.result) - logging.info("MMM model: %s", mmm) +# @app.route('/get_posterior_predictive', methods=['GET']) +# @check_task_status +# def get_posterior_predictive(): +# try: +# task_id = request.args.get('task_id') +# task = run_mmm_task.AsyncResult(task_id) +# mmm = pickle.loads(task.result) +# logging.info("MMM model: %s", mmm) - logging.info("Sampling posterior predictive") - mmm.sample_posterior_predictive(mmm.X, extend_idata = True, combined = True) - logging.info("Posterior predictive sampled") +# logging.info("Sampling posterior predictive") +# mmm.sample_posterior_predictive(mmm.X, extend_idata = True, combined = True) +# logging.info("Posterior predictive sampled") - logging.info("Generating posterior predictive plot") - fig = mmm.plot_posterior_predictive() - logging.info("Posterior predictive plot generated") +# logging.info("Generating posterior predictive plot") +# fig = mmm.plot_posterior_predictive() +# logging.info("Posterior predictive plot generated") - axes = fig.get_axes()[0] - posterior_predictive_dict = { - 'obs_xdata': list(map(lambda x: pd.to_datetime(x).strftime('%Y-%m-%d'), axes.get_lines()[0].get_xdata())), - 'obs_ydata': list(axes.get_lines()[0].get_ydata()), - 'pred_ydata': list(axes.get_lines()[1].get_ydata()) - } - posterior_predictive_json = json.dumps(posterior_predictive_dict) - logging.info("Posterior predictive JSON generated") - return jsonify({"status": "completed", "posterior_predictive": posterior_predictive_json}) - except Exception as e: - logging.error("Error in get_posterior_predictive: %s", str(e), exc_info=True) - return jsonify({"status": "failure", "error": str(e)}), 500 +# axes = fig.get_axes()[0] +# posterior_predictive_dict = { +# 'obs_xdata': list(map(lambda x: pd.to_datetime(x).strftime('%Y-%m-%d'), axes.get_lines()[0].get_xdata())), +# 'obs_ydata': list(axes.get_lines()[0].get_ydata()), +# 'pred_ydata': list(axes.get_lines()[1].get_ydata()) +# } +# posterior_predictive_json = json.dumps(posterior_predictive_dict) +# logging.info("Posterior predictive JSON generated") +# return jsonify({"status": "completed", "posterior_predictive": posterior_predictive_json}) +# except Exception as e: +# logging.error("Error in get_posterior_predictive: %s", str(e), exc_info=True) +# return jsonify({"status": "failure", "error": str(e)}), 500 if __name__ == '__main__': diff --git a/test_mmm_async.py b/test_mmm_async.py index a4cb669..211f037 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -54,7 +54,7 @@ def test_async_mmm_run(base_url): print(f"Got task_id {task_id}") # Polling URL - results_url = f"{base_url}/extract_summary_statistics?task_id={task_id}" + results_url = f"{base_url}/get_summary_statistics?task_id={task_id}" # Poll for results while True: From 70d83a63e963ae748f351178082af9c4c8882d7d Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 13 Feb 2025 11:45:23 -0500 Subject: [PATCH 21/29] explicitly setting logfile and data dirs to see if it fixes redis writing permissions error --- start.sh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/start.sh b/start.sh index ca9ce9c..029ae89 100755 --- a/start.sh +++ b/start.sh @@ -1,8 +1,11 @@ #!/bin/bash set -e -# Start Redis in the background -redis-server --daemonize yes +# Start Redis in the background with specific directory configurations +redis-server --daemonize yes \ + --dir /opt/conda/var/db/redis \ + --logfile /opt/conda/var/db/redis/redis.log \ + --pidfile /opt/conda/var/db/redis/redis.pid # Start Celery worker in the background celery -A app.celery worker --loglevel=info --concurrency=4 & From 329bb2ed42a7b09f990b0317184038ac309447a6 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Thu, 13 Feb 2025 14:26:09 -0500 Subject: [PATCH 22/29] minor tweaks to the prompt --- gpt-agent/gpt_prompt.md | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/gpt-agent/gpt_prompt.md b/gpt-agent/gpt_prompt.md index 6da33ac..51ff07d 100644 --- a/gpt-agent/gpt_prompt.md +++ b/gpt-agent/gpt_prompt.md @@ -11,8 +11,8 @@ It leverages the `dev-nextgen-mmm.pymc-labs.com` API to run MMM models and retri As BayesMMM, your main role is to: -1. Assist users in preparing and validating their data for MMM and ensure that is correctly formatted for the API operations and then save it to a file so that it can be sent to a server for further processing. -2. Run the model asynchronously using `runMMMAsync`. When calling runMMMAsync, always make sure to include the file reference in the payload. +1. Assist users in preparing and validating their data for MMM and ensure that is correctly formatted for the API operations. +2. Run the model asynchronously using `runMMMAsync`. 3. Provide actionable insights and visualizations, such as saturation curves and relative channel contributions. 4. Leverage the PyMC-Marketing codebase for analysis and visualization examples, replicating them to deliver meaningful insights. @@ -37,7 +37,6 @@ data['date_column_name'] = pd.to_datetime(data['date_column_name']).dt.strftime( **Very Important:** - Always confirm with the user that the data is correctly formatted before proceeding to initiate the model run. -- If you make any changes to the data, make sure to save the changes to a new file and provide the reference to the new file in the payload. ### 2. Initiating the Model Run @@ -55,6 +54,7 @@ When asked to run the Bayesian MMM model you must use the `runMMMAsync` API oper **Very Important:** - DO NOT TRY TO IMPORT AN MMM LIBRARY AND RUN THE MODEL LOCALLY. - NEVER WRITE ANY CODE LIKE THIS `import nextgen_mmm_pymc_labs_com__jit_plugin as mmm_plugin` +- Always send the OpenAI link to the data file in the payload. ### 3. Retrieving Results @@ -81,12 +81,7 @@ mmm = MMM( ) ``` -You can retrieve the summary statistics for the parameters of this model from the `summary` field in the payload returned by `getMMMResults`. - -```python -# Example code to retrieve the summary statistics -summary = pd.read_json(io.StringIO(result_data["summary"]),orient='split') -``` +You can retrieve the summary statistics for the parameters of this model from the `summary` field in the payload returned by `getSummaryStatistics`. The most important parameters are: From 154f57a6f709e128a2553f7f5e976e6f008e3f2e Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Sat, 15 Feb 2025 08:22:16 -0500 Subject: [PATCH 23/29] register serializer with kombu instead of manually serializing --- app.py | 61 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/app.py b/app.py index 330c56a..792a6e4 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,11 @@ import json from flask import Flask, request, jsonify from celery import Celery, Task +from kombu import serialization import pandas as pd import arviz as az - +import numpy as np from pymc_marketing.mmm import ( GeometricAdstock, LogisticSaturation, @@ -13,7 +14,7 @@ import logging -import dill as pickle # more robust than pickle +import dill import os import io @@ -52,6 +53,14 @@ # from celery.utils.log import get_task_logger # logging = get_task_logger(__name__) +# Register dill as the serialization method for Celery +serialization.register( + name = 'dill', + encoder = dill.dumps, + decoder = dill.loads, + content_type='application/octet-stream' +) + # Create module-level Celery instance celery = Celery( "app", @@ -80,6 +89,9 @@ def __call__(self, *args: object, **kwargs: object) -> object: task_time_limit=600, broker_connection_retry=True, broker_connection_max_retries=0, # Retry forever + task_serializer='dill', + result_serializer='dill', + accept_content=['dill'] ), ) celery_app = celery_init_app(app) @@ -108,7 +120,7 @@ def run_mmm_task(self, data): # Save the data to file with open(data_file, "wb") as f: - pickle.dump(data, f) + dill.dump(data, f) # Ensure the file is readable/writable os.chmod(data_file, 0o666) @@ -196,10 +208,7 @@ def is_valid_dates(df, column): logging.info("Model fitting completed.") logging.info("run_mmm_task completed successfully.") - logging.info("Serializing MMM model") - mmm_pickle = pickle.dumps(mmm) - logging.info("MMM model serialized successfully") - return mmm_pickle + return mmm except Exception as e: logging.error("run_mmm_task failed: %s\nJSON data: %s", str(e), data, exc_info=True) @@ -262,7 +271,7 @@ def get_summary_statistics(): try: task_id = request.args.get('task_id') task = run_mmm_task.AsyncResult(task_id) - mmm = pickle.loads(task.result) + mmm = task.result logging.info("MMM model: %s", mmm) # Extract and return summary statistics @@ -281,7 +290,41 @@ def get_summary_statistics(): except Exception as e: logging.error("Error in extract_summary_statistics: %s", str(e), exc_info=True) return jsonify({"status": "failure", "error": str(e)}), 500 + + +# @app.route('/get_return_on_ad_spend', methods=['GET']) +# @check_task_status +# def get_return_on_ad_spend(): +# try: +# task_id = request.args.get('task_id') +# task = run_mmm_task.AsyncResult(task_id) +# mmm = task.result +# logging.info("MMM model: %s", mmm) +# # Get the return on ad spend +# channel_contribution_original_scale = mmm.compute_channel_contribution_original_scale() +# channel_columns = mmm.channel_columns +# spend_sum = mmm.X[channel_columns].sum().to_numpy() + +# roas_samples = ( +# channel_contribution_original_scale.sum(dim="date") +# / spend_sum[np.newaxis, np.newaxis, :] +# ) +# roas_mean = roas_samples.mean(dim=["draw", "chain"]) +# roas_hdi = az.hdi(roas_samples, hdi_prob=0.94) + +# return_on_ad_spend = { +# "channel_columns": channel_columns, +# "roas_mean": roas_mean.tolist(), +# "roas_hdi_lower": roas_hdi.tolist()[0], +# "roas_hdi_upper": roas_hdi.tolist()[1] +# } +# logging.info("Return on ad spend: %s", return_on_ad_spend) + +# return jsonify({"status": "completed", "return_on_ad_spend": return_on_ad_spend}) +# except Exception as e: +# logging.error("Error in get_return_on_ad_spend: %s", str(e), exc_info=True) +# return jsonify({"status": "failure", "error": str(e)}), 500 # @app.route('/get_posterior_predictive', methods=['GET']) # @check_task_status @@ -289,7 +332,7 @@ def get_summary_statistics(): # try: # task_id = request.args.get('task_id') # task = run_mmm_task.AsyncResult(task_id) -# mmm = pickle.loads(task.result) +# mmm = task.result # logging.info("MMM model: %s", mmm) # logging.info("Sampling posterior predictive") From 870ffe48d227f2ac1bb9836fc4f71b6f945d3270 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Sat, 15 Feb 2025 13:08:54 -0500 Subject: [PATCH 24/29] removing some WIP work I accidentally commited --- app.py | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/app.py b/app.py index 792a6e4..b994af6 100644 --- a/app.py +++ b/app.py @@ -5,7 +5,6 @@ import pandas as pd import arviz as az -import numpy as np from pymc_marketing.mmm import ( GeometricAdstock, LogisticSaturation, @@ -290,41 +289,6 @@ def get_summary_statistics(): except Exception as e: logging.error("Error in extract_summary_statistics: %s", str(e), exc_info=True) return jsonify({"status": "failure", "error": str(e)}), 500 - - -# @app.route('/get_return_on_ad_spend', methods=['GET']) -# @check_task_status -# def get_return_on_ad_spend(): -# try: -# task_id = request.args.get('task_id') -# task = run_mmm_task.AsyncResult(task_id) -# mmm = task.result -# logging.info("MMM model: %s", mmm) - -# # Get the return on ad spend -# channel_contribution_original_scale = mmm.compute_channel_contribution_original_scale() -# channel_columns = mmm.channel_columns -# spend_sum = mmm.X[channel_columns].sum().to_numpy() - -# roas_samples = ( -# channel_contribution_original_scale.sum(dim="date") -# / spend_sum[np.newaxis, np.newaxis, :] -# ) -# roas_mean = roas_samples.mean(dim=["draw", "chain"]) -# roas_hdi = az.hdi(roas_samples, hdi_prob=0.94) - -# return_on_ad_spend = { -# "channel_columns": channel_columns, -# "roas_mean": roas_mean.tolist(), -# "roas_hdi_lower": roas_hdi.tolist()[0], -# "roas_hdi_upper": roas_hdi.tolist()[1] -# } -# logging.info("Return on ad spend: %s", return_on_ad_spend) - -# return jsonify({"status": "completed", "return_on_ad_spend": return_on_ad_spend}) -# except Exception as e: -# logging.error("Error in get_return_on_ad_spend: %s", str(e), exc_info=True) -# return jsonify({"status": "failure", "error": str(e)}), 500 # @app.route('/get_posterior_predictive', methods=['GET']) # @check_task_status From 203ff308406af4c713c9db04e7c3ade10e83a97d Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Sun, 16 Feb 2025 07:32:37 -0500 Subject: [PATCH 25/29] updates to api spec and prompt --- gpt-agent/api_spec.json | 55 ++++++++--------------------------------- gpt-agent/gpt_prompt.md | 9 +++++++ 2 files changed, 19 insertions(+), 45 deletions(-) diff --git a/gpt-agent/api_spec.json b/gpt-agent/api_spec.json index 7f556fa..64f450a 100644 --- a/gpt-agent/api_spec.json +++ b/gpt-agent/api_spec.json @@ -16,7 +16,7 @@ "description": "Initiates an asynchronous MMM model run.", "operationId": "runMMMAsync", "requestBody": { - "description": "Both raw and cleaned data for the MMM model.", + "description": "Data and parameters for the MMM model.", "required": true, "content": { "application/json": { @@ -27,7 +27,8 @@ "type": "array", "items": { "type": "string" - } + }, + "description": "List of OpenAI file IDs to be used as references." }, "date_column": { "type": "string", @@ -50,6 +51,13 @@ "type": "integer", "default": 2, "description": "Yearly seasonality factor." + }, + "control_columns": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of control column names." } } } @@ -157,49 +165,6 @@ } } } - }, - "/get_posterior_predictive": { - "get": { - "description": "Retrieves the posterior predictive check of an MMM model run.", - "operationId": "getPosteriorPredictive", - "parameters": [ - { - "name": "task_id", - "in": "query", - "required": true, - "description": "The task ID of the MMM model run.", - "schema": { - "type": "string" - } - } - ], - "responses": { - "200": { - "description": "Status of the model execution.", - "content": { - "application/json": { - "schema": { - "type": "object", - "properties": { - "status": { - "type": "string", - "description": "Status of the model execution." - }, - "summary": { - "type": "string", - "description": "JSON string of posterior predictive samples, present if status is 'completed'." - }, - "error": { - "type": "string", - "description": "Error message, present if status is 'failed'." - } - } - } - } - } - } - } - } } }, "components": { diff --git a/gpt-agent/gpt_prompt.md b/gpt-agent/gpt_prompt.md index 51ff07d..38dd391 100644 --- a/gpt-agent/gpt_prompt.md +++ b/gpt-agent/gpt_prompt.md @@ -96,6 +96,15 @@ The most important parameters are: * intercept: Intercept parameter * (optional) gamma_control: Control parameters that multiply the control variables +You can retrieve the return on ad spend from the `return_on_ad_spend` field in the payload returned by `getReturnOnAdSpend`. This is a JSON object with the following fields: + +- `channel_columns`: List of channel columns. +- `roas_mean`: Mean of the return on ad spend. +- `roas_hdi_lower`: Lower bound of the 94% confidence interval of the return on ad spend. +- `roas_hdi_upper`: Upper bound of the 94% confidence interval of the return on ad spend. + +Plot the return on ad spend using the `roas_mean` and the `roas_hdi_lower` and `roas_hdi_upper` to plot the confidence interval. + ### 6. Analysis Workflow While waiting for results, you can suggest to the user to perform exploratory data analysis. Here some ideas: From 9b782db36ed9e966e88c4d585a388489af33b573 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Mon, 17 Feb 2025 11:01:25 -0500 Subject: [PATCH 26/29] reminder to be concise in responses --- gpt-agent/gpt_prompt.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gpt-agent/gpt_prompt.md b/gpt-agent/gpt_prompt.md index 38dd391..43cf815 100644 --- a/gpt-agent/gpt_prompt.md +++ b/gpt-agent/gpt_prompt.md @@ -118,4 +118,8 @@ After retrieving results here are some ideas: - Saturation Curve Plot: Display channel saturations in a single plot with uncertainty. -- Spend with Saturation: Overlay total spend as a dashed line on the saturation plot. \ No newline at end of file +- Spend with Saturation: Overlay total spend as a dashed line on the saturation plot. + +** Important Reminder ** + +- Throughout your interactions provide **concise responses** using bullet points and formulas when appropriate. \ No newline at end of file From 1e03f5696c6c1334b64f0fdd9b8df1a618d0825d Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Mon, 17 Feb 2025 12:43:54 -0500 Subject: [PATCH 27/29] adds auth --- Dockerfile | 3 +++ app.py | 16 +++++++++++++++- build.sh | 3 ++- cloudbuild.yaml | 2 ++ environment.yml | 1 - 5 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index b071dfa..56838dc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,6 +4,9 @@ ENV APP_HOME=/app ENV PORT=5000 ENV JAX_PLATFORM_NAME=cpu +ARG API_KEY +ENV API_KEY=$API_KEY + WORKDIR $APP_HOME COPY . ./ diff --git a/app.py b/app.py index b994af6..d8cbf1c 100644 --- a/app.py +++ b/app.py @@ -19,9 +19,10 @@ from functools import wraps - __version__ = "0.3" +API_KEY = os.environ.get('API_KEY', None) + running_in_google_cloud = os.environ.get('RUNNING_IN_GOOGLE_CLOUD', 'False').lower() == 'true' # Configure standard logging @@ -212,9 +213,20 @@ def is_valid_dates(df, column): except Exception as e: logging.error("run_mmm_task failed: %s\nJSON data: %s", str(e), data, exc_info=True) return {"status": "failed", "error": str(e)} + +def require_api_key(func): + @wraps(func) + def decorated_function(*args, **kwargs): + api_key = request.headers.get('X-API-Key') + if api_key and api_key == API_KEY: + return func(*args, **kwargs) + else: + return jsonify({"message": "Unauthorized"}), 401 + return decorated_function @app.route('/run_mmm_async', methods=['POST']) +@require_api_key def run_mmm_async(): try: logging.info("Received request to run_mmm_async") @@ -231,6 +243,7 @@ def run_mmm_async(): @app.route('/get_task_status', methods=['GET']) +@require_api_key def get_task_status(): task_id = request.args.get('task_id') task = run_mmm_task.AsyncResult(task_id) @@ -265,6 +278,7 @@ def wrapper(*args, **kwargs): return wrapper @app.route('/get_summary_statistics', methods=['GET']) +@require_api_key @check_task_status def get_summary_statistics(): try: diff --git a/build.sh b/build.sh index 8685b43..59d124f 100755 --- a/build.sh +++ b/build.sh @@ -18,7 +18,8 @@ get_config() { # Get configuration values INSTANCE_NAME=$(get_config "instanceName") REGION=$(get_config "region") +API_KEY=$(grep API_KEY .env | cut -d "'" -f 2) echo "Building $INSTANCE_NAME in $REGION" -gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$INSTANCE_NAME,_SERVICE_NAME=$INSTANCE_NAME,_REGION=$REGION +gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$INSTANCE_NAME,_SERVICE_NAME=$INSTANCE_NAME,_REGION=$REGION,_API_KEY=$API_KEY diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 32503cb..8440f08 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -7,6 +7,8 @@ steps: - -t - ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPO_NAME}/${_SERVICE_NAME} - ${_SERVICE_FOLDER} + - --build-arg + - API_KEY=${_API_KEY} # Push the container image to Artifact Registry - id: push image diff --git a/environment.yml b/environment.yml index addddce..6e0db19 100644 --- a/environment.yml +++ b/environment.yml @@ -14,4 +14,3 @@ dependencies: - procps-ng - yq=2.12.0 - dill=0.3.9 - From b61986e8baa0c7d9a700e589608d5465ab86ae2e Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Mon, 17 Feb 2025 14:06:52 -0500 Subject: [PATCH 28/29] add api key to test request --- test_mmm_async.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/test_mmm_async.py b/test_mmm_async.py index 211f037..5437086 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -2,9 +2,14 @@ import json import time import pandas as pd - +import os import sys import io +import dotenv + +dotenv.load_dotenv() + +API_KEY = os.environ.get('API_KEY', None) def create_payload(): payload = { @@ -42,10 +47,12 @@ def test_async_mmm_run(base_url): run_url = f"{base_url}/run_mmm_async" # Make a POST request to initiate the model run - headers = {'Content-Type': 'application/json'} + headers = { + 'Content-Type': 'application/json', + 'X-API-Key': API_KEY + } response = requests.post(run_url, data=json.dumps(payload), headers=headers) - print(response) # Assert the status code for initiation assert response.status_code == 200 @@ -58,7 +65,7 @@ def test_async_mmm_run(base_url): # Poll for results while True: - result_response = requests.get(results_url) + result_response = requests.get(results_url, headers=headers) result_data = result_response.json() if result_data["status"] == "completed": From 7b19c6b64428801cc34746cf30ab184953310785 Mon Sep 17 00:00:00 2001 From: Andy Heusser Date: Tue, 18 Feb 2025 09:40:42 -0500 Subject: [PATCH 29/29] update version, remove commented route --- app.py | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/app.py b/app.py index d8cbf1c..cccf508 100644 --- a/app.py +++ b/app.py @@ -19,7 +19,7 @@ from functools import wraps -__version__ = "0.3" +__version__ = "0.4" API_KEY = os.environ.get('API_KEY', None) @@ -304,37 +304,6 @@ def get_summary_statistics(): logging.error("Error in extract_summary_statistics: %s", str(e), exc_info=True) return jsonify({"status": "failure", "error": str(e)}), 500 -# @app.route('/get_posterior_predictive', methods=['GET']) -# @check_task_status -# def get_posterior_predictive(): -# try: -# task_id = request.args.get('task_id') -# task = run_mmm_task.AsyncResult(task_id) -# mmm = task.result -# logging.info("MMM model: %s", mmm) - -# logging.info("Sampling posterior predictive") -# mmm.sample_posterior_predictive(mmm.X, extend_idata = True, combined = True) -# logging.info("Posterior predictive sampled") - -# logging.info("Generating posterior predictive plot") -# fig = mmm.plot_posterior_predictive() -# logging.info("Posterior predictive plot generated") - -# axes = fig.get_axes()[0] -# posterior_predictive_dict = { -# 'obs_xdata': list(map(lambda x: pd.to_datetime(x).strftime('%Y-%m-%d'), axes.get_lines()[0].get_xdata())), -# 'obs_ydata': list(axes.get_lines()[0].get_ydata()), -# 'pred_ydata': list(axes.get_lines()[1].get_ydata()) -# } -# posterior_predictive_json = json.dumps(posterior_predictive_dict) -# logging.info("Posterior predictive JSON generated") -# return jsonify({"status": "completed", "posterior_predictive": posterior_predictive_json}) -# except Exception as e: -# logging.error("Error in get_posterior_predictive: %s", str(e), exc_info=True) -# return jsonify({"status": "failure", "error": str(e)}), 500 - - if __name__ == '__main__': from argparse import ArgumentParser parser = ArgumentParser()