diff --git a/Dockerfile b/Dockerfile index 9909084..56838dc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,6 +4,9 @@ ENV APP_HOME=/app ENV PORT=5000 ENV JAX_PLATFORM_NAME=cpu +ARG API_KEY +ENV API_KEY=$API_KEY + WORKDIR $APP_HOME COPY . ./ @@ -14,7 +17,8 @@ RUN micromamba install -y -n base -f /tmp/env.yaml && \ mkdir -p /opt/conda/var/db && \ chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db && \ mkdir -p /opt/conda/var/db/redis && \ - chown $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis + chown -R $MAMBA_USER:$MAMBA_USER /opt/conda/var/db/redis && \ + chmod -R 770 /opt/conda/var/db/redis # Copy the start script COPY --chown=$MAMBA_USER:$MAMBA_USER start.sh $APP_HOME/start.sh diff --git a/README.md b/README.md index f39e2cf..761e5e2 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ A cloud-based Marketing Mix Modeling (MMM) solution deployed on Google Cloud Pla ## Quick Deployment ```bash -./deploy.sh # Deploy the latest version to production +./deploy.sh production # Deploy the latest version to production +./deploy.sh development # Deploy the latest version to development ``` ## System Architecture @@ -18,6 +19,7 @@ GPT-Bayes consists of two main components: 2. **Backend Service** - Production URL: https://nextgen-mmm.pymc-labs.com + - Development URL: https://dev-nextgen-mmm.pymc-labs.com - Function: Handles model fitting and parameter management via API endpoints - Infrastructure: Hosted on Google Cloud Engine (GCE) under the `gpt-bayes` project @@ -31,8 +33,10 @@ GPT-Bayes consists of two main components: - `nginx/` - NGINX reverse proxy settings - `dockerfile` - Container specifications - `start.sh` - Container initialization +- `build.sh` - Build the container image - `deploy.sh` - Deployment automation - `environment.yml` - Development environment specifications +- `config.yaml` - Environment configuration settings ### AI Agent Settings - `gpt-agent/gpt_prompt.md` - System instructions @@ -45,23 +49,33 @@ GPT-Bayes consists of two main components: ## Deployment Guide -The application runs on Google Compute Engine (GCE) under the `gpt-bayes` project, accessible at `https://nextgen-mmm.pymc-labs.com`. +The application runs on Google Compute Engine (GCE) under the `gpt-bayes` project, accessible at `https://nextgen-mmm.pymc-labs.com` (production) and `https://dev-nextgen-mmm.pymc-labs.com` (development). + +### Build and Push Docker Image + +Build and push the Docker image to Google Artifact Registry (GAR). +```bash +./build.sh production # Build and publish to production +./build.sh development # Build and publish to development +``` ### Standard Deployment -Use `deploy.sh` to update the application. This script handles: +Once the Docker image is built and pushed to GAR, use `deploy.sh` to update the application. This script handles: - Updating the container in Google Artifact Registry (GAR) -- Deploying to the production environment +- Deploying to the specified environment ```bash -./deploy.sh +./deploy.sh production # Deploy the latest version to production +./deploy.sh development # Deploy the latest version to development ``` ### Server Management -Access the production server: +Access the specified server: ```bash gcloud compute ssh gpt-bayes --zone us-central1-a +gcloud compute ssh dev-gpt-bayes --zone us-central1-a ``` Container management commands: @@ -81,10 +95,11 @@ docker exec -it CONTAINER_ID /bin/bash Build and publish to Google Artifact Registry: ```bash -gcloud builds submit +./build.sh production # Build and publish to production +./build.sh development # Build and publish to development ``` -Note: This updates the container image but doesn't affect the production deployment. +Note: This updates the container image but doesn't affect the specified deployment. ### Server Instance Management @@ -93,28 +108,42 @@ View available Container-Optimized OS images: gcloud compute images list --project cos-cloud --no-standard-images ``` -Update production container: +Update specified container: ```bash # Clear existing containers gcloud compute ssh gpt-bayes --zone us-central1-a --command 'docker system prune -f -a' +gcloud compute ssh dev-gpt-bayes --zone us-central1-a --command 'docker system prune -f -a' # Deploy new container gcloud compute instances update-container gpt-bayes \ --zone=us-central1-a \ --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest + +gcloud compute instances update-container dev-gpt-bayes \ + --zone=us-central1-a \ + --container-image=us-central1-docker.pkg.dev/bayes-gpt/dev-gpt-bayes/dev-gpt-bayes:latest ``` Create new server instance: ```bash -gcloud compute instances create gpt-bayes \ - --machine-type e2-standard-4 \ - --boot-disk-size 20GB \ - --image image-name \ - --image-project cos-cloud \ - --zone us-central1 \ - --metadata container-image=your-container-image-name \ - --tags http-server \ - --firewall-create allow-http + gcloud compute instances create-with-container gpt-bayes \ + --machine-type e2-standard-4 \ + --boot-disk-size 20GB \ + --image cos-stable-117-18613-164-4 \ + --image-project cos-cloud \ + --zone us-central1-a \ + --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest \ + --tags http-server,https-server,allow-tcp-5000 + + gcloud compute instances create-with-container dev-gpt-bayes \ + --machine-type e2-standard-4 \ + --boot-disk-size 20GB \ + --image cos-stable-117-18613-164-4 \ + --image-project cos-cloud \ + --zone us-central1-a \ + --container-image=us-central1-docker.pkg.dev/bayes-gpt/dev-gpt-bayes/dev-gpt-bayes:latest \ + --tags http-server,https-server,allow-tcp-5000 + ``` ### NGINX Configuration (Advanced) @@ -122,13 +151,14 @@ gcloud compute instances create gpt-bayes \ Deploy NGINX reverse proxy updates: ```bash cd nginx -gcloud builds submit +./deploy.sh production # Deploy the latest version to production +./deploy.sh development # Deploy the latest version to development ``` Update backend IP address: -1. Navigate to `nginx/nginx.conf` -2. Modify the `proxy_pass` directive with the new IP -3. Example: `proxy_pass http://35.208.203.115:5000;` +1. Navigate to `config.yaml` +2. Modify the `ipAddress` directive with the new IP +3. Example: `ipAddress: 35.208.203.115` ## Local Development diff --git a/app.py b/app.py index 6543466..cccf508 100644 --- a/app.py +++ b/app.py @@ -1,9 +1,10 @@ +import json from flask import Flask, request, jsonify -from celery import Celery +from celery import Celery, Task +from kombu import serialization import pandas as pd import arviz as az - from pymc_marketing.mmm import ( GeometricAdstock, LogisticSaturation, @@ -12,12 +13,15 @@ import logging -import pickle +import dill import os import io +from functools import wraps + +__version__ = "0.4" -__version__ = "0.3" +API_KEY = os.environ.get('API_KEY', None) running_in_google_cloud = os.environ.get('RUNNING_IN_GOOGLE_CLOUD', 'False').lower() == 'true' @@ -46,26 +50,51 @@ # Additional local logging configuration (if needed) # For example, you can set a file handler or a stream handler for local logging pass + # from celery.utils.log import get_task_logger + # logging = get_task_logger(__name__) + +# Register dill as the serialization method for Celery +serialization.register( + name = 'dill', + encoder = dill.dumps, + decoder = dill.loads, + content_type='application/octet-stream' +) + +# Create module-level Celery instance +celery = Celery( + "app", + broker="redis://localhost:6379/0", + backend="redis://localhost:6379/0" +) + +def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: + with app.app_context(): + return self.run(*args, **kwargs) -# Initialize Celery + celery.Task = FlaskTask + celery.config_from_object(app.config["CELERY"]) + app.extensions["celery"] = celery + return celery +# Initialize Flask app app = Flask(__name__) -app.config['broker_url'] = 'redis://localhost:6379/0' -app.config['result_backend'] = 'redis://localhost:6379/0' - - -celery = Celery(app.name, broker=app.config['broker_url']) -celery.conf.update(app.config) -celery.conf.update( - worker_pool='threads', # Use prefork (multiprocessing) - task_always_eager=False, # Ensure tasks are not run locally by the worker that started them - task_time_limit=600, # Add 1-hour timeout - broker_connection_retry_on_startup=True, # Retry broker connection on startup - worker_redirect_stdouts=False, # Don't redirect stdout/stderr - worker_redirect_stdouts_level='DEBUG' # Log level for stdout/stderr +app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost:6379/0", + result_backend="redis://localhost:6379/0", + worker_pool='threads', + task_time_limit=600, + broker_connection_retry=True, + broker_connection_max_retries=0, # Retry forever + task_serializer='dill', + result_serializer='dill', + accept_content=['dill'] + ), ) - -logging.info("App started. Version: %s", __version__) +celery_app = celery_init_app(app) # Create a data directory if it doesn't exist DATA_DIR = "/tmp/mmm_data" @@ -74,8 +103,6 @@ os.chmod(DATA_DIR, 0o777) - - @celery.task(bind=True) def run_mmm_task(self, data): """Run Marketing Mix Model analysis task. @@ -88,23 +115,53 @@ def run_mmm_task(self, data): try: logging.info("Starting run_mmm_task here!!") - # Use the dedicated data directory data_file = os.path.join(DATA_DIR, f"data_{self.request.id}.pkl") # Save the data to file with open(data_file, "wb") as f: - pickle.dump(data, f) + dill.dump(data, f) # Ensure the file is readable/writable os.chmod(data_file, 0o666) try: - df = pd.read_json(io.StringIO(data["df"]), orient="split") - except Exception as e: - logging.info("Error reading JSON data attempting to read CSV: %s", str(e), exc_info=True) - df = pd.read_csv(io.StringIO(data["df"])) + file_refs = data.get("openaiFileIdRefs", []) + if len(file_refs) == 0: + logging.info("No file references found") + raise ValueError("No file references found") + else: + download_url = file_refs[0].get("download_link", "") # TODO: handle multiple files + logging.info("Downloading data from %s", download_url) + + # Add headers to the request + headers = { + 'User-Agent': 'Mozilla/5.0', + 'Accept': 'text/csv' + } + + try: + # Use requests library for better control over the HTTP request + import requests + response = requests.get(download_url, headers=headers) + response.raise_for_status() # Raise an exception for bad status codes + + # Read CSV from the response content + df = pd.read_csv(io.StringIO(response.text)) + logging.info("Data downloaded successfully") + except requests.exceptions.RequestException as e: + logging.error("Failed to download file: %s", str(e), exc_info=True) + raise ValueError(f"Failed to download file: {str(e)}") + + logging.info("Saving data to file") + file_name = file_refs[0].get("name", "") + file_path = os.path.join(DATA_DIR, file_name) + df.to_csv(file_path, index=False) + logging.info("Data saved to file %s", file_path) + except Exception as e: + logging.error("Error reading data attempting to read CSV: %s", str(e), exc_info=True) + raise e logging.info("DataFrame loaded with shape=%s and columns=%s", df.shape, df.columns) logging.info("First 5 rows:\n%s", df.head(5)) @@ -118,6 +175,7 @@ def run_mmm_task(self, data): adstock_max_lag = data.get('adstock_max_lag', 8) yearly_seasonality = data.get('yearly_seasonality', 2) control_columns = data.get('control_columns', None) + y_column = data.get('y_column', 'y') logging.debug("Parameters extracted: date_column=%s, channel_columns=%s, adstock_max_lag=%d, yearly_seasonality=%d, control_columns=%s", date_column, channel_columns, adstock_max_lag, yearly_seasonality, control_columns) @@ -127,13 +185,9 @@ def is_valid_dates(df, column): if not is_valid_dates(df, date_column): raise ValueError(f"Date column must be in YYYY-MM-DD format (e.g. 2023-12-31). Found values like: {df[date_column].iloc[0]} with dtype: {df[date_column].dtype}") - - # Define and fit the MMM model - # import ipdb; ipdb.set_trace() - logging.debug("Creating MMM model") mmm = MMM( - adstock=GeometricAdstock(l_max=int(adstock_max_lag)), + adstock=GeometricAdstock(l_max=adstock_max_lag), saturation=LogisticSaturation(), date_column=date_column, channel_columns=channel_columns, @@ -141,51 +195,38 @@ def is_valid_dates(df, column): yearly_seasonality=yearly_seasonality, ) logging.info("MMM model defined.") - - X = df.drop('sales', axis=1) - y = df['sales'] - logging.debug("Starting model fitting.") - - # mmm.fit(X, y, random_seed=42, cores=1) - mmm.fit(X, y) - logging.info("Model fitting completed.") + # Ensure date_week is in datetime format + df[date_column] = pd.to_datetime(df[date_column]) - # Extract and return summary statistics - summary = az.summary(mmm.fit_result) - # Filter only the most important statistics - important_params = summary[summary.index.str.contains('alpha|beta|sigma|intercept|lam|gamma_control', case=False)] - # Limit decimal places and convert to more compact format - important_params = important_params.round(5) - - summary_json = important_params.to_json(orient="split", double_precision=5) - logging.info("Summary statistics extracted.") - logging.info("summary_json=%s", summary_json) - - # Add model metrics - response = { - "status": "completed", - "summary": summary_json, - # "model_info": { - # "num_observations": len(df), - # "channels": channel_columns, - # "adstock_max_lag": adstock_max_lag, - # "yearly_seasonality": yearly_seasonality - # } - } - + # X = df.drop(y_column, axis=1).astype(float) + X = df.drop(y_column, axis=1) + y = df[y_column].astype(float) + + mmm.fit(X, y) + logging.info("Model fitting completed.") logging.info("run_mmm_task completed successfully.") - logging.debug("response=%s", response) - return response + return mmm except Exception as e: logging.error("run_mmm_task failed: %s\nJSON data: %s", str(e), data, exc_info=True) - return {"status": "failed", "error": str(e)} + return {"status": "failed", "error": str(e)} + +def require_api_key(func): + @wraps(func) + def decorated_function(*args, **kwargs): + api_key = request.headers.get('X-API-Key') + if api_key and api_key == API_KEY: + return func(*args, **kwargs) + else: + return jsonify({"message": "Unauthorized"}), 401 + return decorated_function @app.route('/run_mmm_async', methods=['POST']) +@require_api_key def run_mmm_async(): try: logging.info("Received request to run_mmm_async") @@ -195,37 +236,73 @@ def run_mmm_async(): task = run_mmm_task.apply_async(args=[data]) logging.info("Task submitted with ID: %s", task.id) - # session[task.id] = "STARTED" - return jsonify({"task_id": task.id}) except Exception as e: logging.error("Error in run_mmm_async: %s", str(e), exc_info=True) return jsonify({"error": str(e)}), 500 + -@app.route('/get_results', methods=['GET']) -def get_results(): - try: - task_id = request.args.get('task_id') - logging.info("Received request for get_results with task_id: %s", task_id) +@app.route('/get_task_status', methods=['GET']) +@require_api_key +def get_task_status(): + task_id = request.args.get('task_id') + task = run_mmm_task.AsyncResult(task_id) + return jsonify({"status": task.state}) - # if task_id not in session: - # return jsonify({'status': "failure", "error":'No such task'}), 404 +def check_task_status(f): + @wraps(f) # Preserve function metadata + def wrapper(*args, **kwargs): + try: + task_id = request.args.get('task_id') # Simplify task_id extraction + if not task_id: + return jsonify({"status": "failure", "error": "No task_id provided"}), 400 + + logging.info("Checking task status with task_id: %s", task_id) + + task = run_mmm_task.AsyncResult(task_id) + if task.state == 'PENDING': + logging.info("Task %s is still pending.", task_id) + return jsonify({"status": "pending"}) + elif task.state == 'FAILURE': + logging.error("Task %s failed.", task_id) + return jsonify({"status": "failure", "error": str(task.info)}) + + # If task completed successfully, proceed with the decorated function + logging.info("Task %s completed successfully.", task_id) + return f(*args, **kwargs) + + except Exception as e: + logging.error("Error in check_task_status: %s", str(e), exc_info=True) + return jsonify({"status": "failure", "error": str(e)}), 500 + + return wrapper +@app.route('/get_summary_statistics', methods=['GET']) +@require_api_key +@check_task_status +def get_summary_statistics(): + try: + task_id = request.args.get('task_id') task = run_mmm_task.AsyncResult(task_id) - if task.state == 'PENDING': - logging.info("Task %s is still pending.", task_id) - response = {"status": "pending"} - elif task.state != 'FAILURE': - logging.info("Task %s completed successfully.", task_id) - response = task.result - else: - logging.error("Task %s failed.", task_id) - response = {"status": "failure", "error": str(task.info)} + mmm = task.result + logging.info("MMM model: %s", mmm) + + # Extract and return summary statistics + summary = az.summary(mmm.fit_result) + + # Filter only the most important statistics + important_params = summary[summary.index.str.contains('alpha|beta|sigma|intercept|lam|gamma_control', case=False)] + # Limit decimal places and convert to more compact format + important_params = important_params.round(5) - return jsonify(response) + summary_json = important_params.to_json(orient="split", double_precision=5) + logging.info("Summary statistics extracted.") + logging.info("summary_json=%s", summary_json) + + return jsonify({"status": "completed", "summary": summary_json}) except Exception as e: - logging.error("Error in get_results: %s", str(e), exc_info=True) - return jsonify({"error": str(e)}), 500 + logging.error("Error in extract_summary_statistics: %s", str(e), exc_info=True) + return jsonify({"status": "failure", "error": str(e)}), 500 if __name__ == '__main__': from argparse import ArgumentParser diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..59d124f --- /dev/null +++ b/build.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Validate environment argument +if [ "$1" != "production" ] && [ "$1" != "development" ]; then + echo "Error: Environment must be specified as either 'production' or 'development'" + echo "Usage: $0 " + exit 1 +fi + +ENVIRONMENT=$1 + +# Function to read config value +get_config() { + local key=$1 + yq ".$ENVIRONMENT.$key" config.yaml | tr -d '"' +} + +# Get configuration values +INSTANCE_NAME=$(get_config "instanceName") +REGION=$(get_config "region") +API_KEY=$(grep API_KEY .env | cut -d "'" -f 2) + +echo "Building $INSTANCE_NAME in $REGION" + +gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$INSTANCE_NAME,_SERVICE_NAME=$INSTANCE_NAME,_REGION=$REGION,_API_KEY=$API_KEY diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 5881765..8440f08 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -7,6 +7,8 @@ steps: - -t - ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPO_NAME}/${_SERVICE_NAME} - ${_SERVICE_FOLDER} + - --build-arg + - API_KEY=${_API_KEY} # Push the container image to Artifact Registry - id: push image @@ -20,6 +22,3 @@ images: substitutions: _SERVICE_FOLDER: . - _REPO_NAME: gpt-bayes # Name of Google Artifact Registry (GAR)'s repo - _SERVICE_NAME: gpt-bayes # Name of Cloud Run Service - _REGION: us-central1 diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..9a866a9 --- /dev/null +++ b/config.yaml @@ -0,0 +1,22 @@ +production: + instanceName: gpt-bayes + region: us-central1 + zone: us-central1-a + nginx: + repoName: gcr2gce-proxy + serviceName: gcr2gce-proxy + serverName: nextgen-mmm.pymc-labs.com + ipAddress: 35.208.203.115 + port: 5000 + region: us-central1 +development: + instanceName: dev-gpt-bayes + region: us-central1 + zone: us-central1-a + nginx: + repoName: dev-gcr2gce-proxy + serviceName: dev-gcr2gce-proxy + serverName: dev-nextgen-mmm.pymc-labs.com + ipAddress: 34.59.137.141 + port: 5000 + region: us-central1 \ No newline at end of file diff --git a/deploy.sh b/deploy.sh index 987c645..3bde53c 100755 --- a/deploy.sh +++ b/deploy.sh @@ -1,5 +1,25 @@ #!/bin/bash +# Validate environment argument +if [ "$1" != "production" ] && [ "$1" != "development" ]; then + echo "Error: Environment must be specified as either 'production' or 'development'" + echo "Usage: $0 " + exit 1 +fi + +ENVIRONMENT=$1 + +# Function to read config value +get_config() { + local key=$1 + yq ".$ENVIRONMENT.$key" config.yaml | tr -d '"' +} + +# Get configuration values +INSTANCE_NAME=$(get_config "instanceName") +REGION=$(get_config "region") +ZONE=$(get_config "zone") + # Function to check if rebuild is needed should_rebuild() { read -p "Did you modify Dockerfile, environment.yml or other dependencies? (y/N): " answer @@ -15,23 +35,23 @@ should_rebuild() { if should_rebuild; then echo "Rebuilding container..." - gcloud builds submit + ./build.sh $ENVIRONMENT echo "Cleaning up docker system..." - gcloud compute ssh gpt-bayes --zone us-central1-a --command 'docker system prune -f -a' + gcloud compute ssh "$INSTANCE_NAME" --zone "$ZONE" --command 'docker system prune -f -a' echo "Updating container..." - gcloud compute instances update-container gpt-bayes \ - --zone=us-central1-a \ - --container-image=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest + gcloud compute instances update-container "$INSTANCE_NAME" \ + --zone="$ZONE" \ + --container-image=$REGION-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest else echo "Copying new files and restarting application..." # First, copy files to the instance - gcloud compute scp --zone=us-central1-a --recurse ./*.py gpt-bayes:/tmp/ + gcloud compute scp --zone="$ZONE" --recurse ./*.py "$INSTANCE_NAME":/tmp/ # Then copy files into the container and restart services - gcloud compute ssh gpt-bayes --zone=us-central1-a --command " - CONTAINER_ID=\$(docker ps --filter ancestor=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest -q) && \ + gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command " + CONTAINER_ID=\$(docker ps --filter ancestor=$REGION-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q) && \ if [ -z \"\$CONTAINER_ID\" ]; then echo \"Error: Could not find running container\" exit 1 @@ -50,7 +70,7 @@ else # Restart the container echo "Restarting container..." - gcloud compute ssh gpt-bayes --zone=us-central1-a --command "docker restart \$(docker ps --filter ancestor=us-central1-docker.pkg.dev/bayes-gpt/gpt-bayes/gpt-bayes:latest -q)" + gcloud compute ssh "$INSTANCE_NAME" --zone="$ZONE" --command "docker restart \$(docker ps --filter ancestor=$REGION-docker.pkg.dev/bayes-gpt/$INSTANCE_NAME/$INSTANCE_NAME:latest -q)" fi echo "Deployment complete!" diff --git a/environment.yml b/environment.yml index 70e2d7c..6e0db19 100644 --- a/environment.yml +++ b/environment.yml @@ -12,4 +12,5 @@ dependencies: - vine=5.1.0 - redis-py=4.5.0 - procps-ng - + - yq=2.12.0 + - dill=0.3.9 diff --git a/gpt-agent/api_spec.json b/gpt-agent/api_spec.json index 05d8270..64f450a 100644 --- a/gpt-agent/api_spec.json +++ b/gpt-agent/api_spec.json @@ -7,7 +7,7 @@ }, "servers": [ { - "url": "https://nextgen-mmm.pymc-labs.com" + "url": "https://dev-nextgen-mmm.pymc-labs.com" } ], "paths": { @@ -23,9 +23,12 @@ "schema": { "type": "object", "properties": { - "df": { - "type": "string", - "description": "Data in CSV format for the model. Must contain at least 15 rows." + "openaiFileIdRefs": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of OpenAI file IDs to be used as references." }, "date_column": { "type": "string", @@ -48,6 +51,13 @@ "type": "integer", "default": 2, "description": "Yearly seasonality factor." + }, + "control_columns": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of control column names." } } } @@ -74,10 +84,49 @@ } } }, - "/get_results": { + "/get_task_status": { + "get": { + "description": "Retrieves the status of an MMM model run.", + "operationId": "getTaskStatus", + "parameters": [ + { + "name": "task_id", + "in": "query", + "required": true, + "description": "The task ID of the MMM model run.", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Status of the model execution.", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "status": { + "type": "string", + "description": "Status of the model execution." + }, + "error": { + "type": "string", + "description": "Error message, present if status is 'failed'." + } + } + } + } + } + } + } + } + }, + "/get_summary_statistics": { "get": { "description": "Retrieves the results of an MMM model run.", - "operationId": "getMMMResults", + "operationId": "getSummaryStatistics", "parameters": [ { "name": "task_id", diff --git a/gpt-agent/gpt_prompt.md b/gpt-agent/gpt_prompt.md index 2f94e1e..43cf815 100644 --- a/gpt-agent/gpt_prompt.md +++ b/gpt-agent/gpt_prompt.md @@ -1,17 +1,18 @@ # Bayes MMM: Your Marketing Mix Modeling Assistant BayesMMM is a specialized assistant for marketing analytics, focusing on Marketing Mix Modeling (MMM) for analysts and business stakeholders. -It leverages the `nextgen-mmm.pymc-labs.com` API to run MMM models and retrive fitted parameters. This API provide two asycronous operations: +It leverages the `dev-nextgen-mmm.pymc-labs.com` API to run MMM models and retrieve fitted parameters. This API provides the following asynchronous operations: -1. `runMMMAsync` initianting the MMM model run and returns a unique `task_id` upon starting a the model fit. -2. `getMMMResults` which is used to check the model's status using the `task_id` and retrieve results (the parameters of the fitted model). +1. `runMMMAsync` initiating the MMM model run and returns a unique `task_id` upon starting a the model fit. +2. `getTaskStatus` which is used to check if the model has finished executing. +3. `getSummaryStatistics` which is used to retrieve summary statistics of the parameters of the fitted model. ## Key Responsibilities As BayesMMM, your main role is to: -1. Assist users in preparing and validating their data for MMM and ensure that is correcly formatted for the API operations. -2. Run the model asynchronously using `runMMMAsync` and track its progress with `getMMMResults`. +1. Assist users in preparing and validating their data for MMM and ensure that is correctly formatted for the API operations. +2. Run the model asynchronously using `runMMMAsync`. 3. Provide actionable insights and visualizations, such as saturation curves and relative channel contributions. 4. Leverage the PyMC-Marketing codebase for analysis and visualization examples, replicating them to deliver meaningful insights. @@ -34,42 +35,32 @@ Handle missing values appropriately and convert the date column to the required data['date_column_name'] = pd.to_datetime(data['date_column_name']).dt.strftime('%Y-%m-%d') ``` -Always confirm with the user that the data is correctly formatted before proceeding to initiate the model run. +**Very Important:** +- Always confirm with the user that the data is correctly formatted before proceeding to initiate the model run. ### 2. Initiating the Model Run -When asked to run the Baysian MMM model you must use the `runMMMAsync` API operation with the correctly formatted data. **Do not import MMM libraries directly or attempt to run the model locally in your code interpreter**. The payload to the API should include the data in CSV format and the following parameters: +When asked to run the Bayesian MMM model you must use the `runMMMAsync` API operation with the correctly formatted data. **Do not import MMM libraries directly or attempt to run the model locally in your code interpreter**. The payload to the API should include the reference to the data file and the following parameters: - **df**: The data as a CSV string. - **date_column**: Name of the date column. - **channel_columns**: List of channel spend columns. +- **y_column**: Name of the y column. - **Optional Parameters**: - **control_columns**: List of control columns. - **adstock_max_lag** (default: 8) - **yearly_seasonality** (default: 2) -Here is an example of how to convert the data to CSV before sending and create the payload for the API call: -```python -csv_data = data.to_csv(index=False) -payload = { - "df": csv_data, - "date_column": "date_column_name", - "channel_columns": ["channel_1_column_name", "channel_2_column_name"], - "control_columns": ["control_1_column_name", "control_2_column_name", "control_3_column_name"], - "adstock_max_lag": 8, - "yearly_seasonality": 2 -} -``` - -> **Very Important:** -> * DO NOT TRY TO IMPORT AN MMM LIBRARY AND RUN THE MODEL LOCALLY. -> * NEVER WRITE ANY CODE LIKE THIS `import nextgen_mmm_pymc_labs_com__jit_plugin as mmm_plugin` +**Very Important:** +- DO NOT TRY TO IMPORT AN MMM LIBRARY AND RUN THE MODEL LOCALLY. +- NEVER WRITE ANY CODE LIKE THIS `import nextgen_mmm_pymc_labs_com__jit_plugin as mmm_plugin` +- Always send the OpenAI link to the data file in the payload. ### 3. Retrieving Results Once the run is initiated: -- Check Status: Use `task_id` that is returned from the `runMMMAsync` operation with `getMMMResults` to monitor progress (pending, completed, or failed). +- Check Status: Use `task_id` that is returned from the `runMMMAsync` operation with `getTaskStatus` to monitor progress (pending, completed, or failed). - Retrieve Results: After completion, analyze the results, including channel contributions and statistical insights. @@ -90,12 +81,7 @@ mmm = MMM( ) ``` -You can retrieve the summary statistics for the parameters of this model from the `summary` field in the payload returned by `getMMMResults`. - -```python -# Example code to retrieve the summary statistics -summary = pd.read_json(io.StringIO(result_data["summary"]),orient='split') -``` +You can retrieve the summary statistics for the parameters of this model from the `summary` field in the payload returned by `getSummaryStatistics`. The most important parameters are: @@ -110,6 +96,15 @@ The most important parameters are: * intercept: Intercept parameter * (optional) gamma_control: Control parameters that multiply the control variables +You can retrieve the return on ad spend from the `return_on_ad_spend` field in the payload returned by `getReturnOnAdSpend`. This is a JSON object with the following fields: + +- `channel_columns`: List of channel columns. +- `roas_mean`: Mean of the return on ad spend. +- `roas_hdi_lower`: Lower bound of the 94% confidence interval of the return on ad spend. +- `roas_hdi_upper`: Upper bound of the 94% confidence interval of the return on ad spend. + +Plot the return on ad spend using the `roas_mean` and the `roas_hdi_lower` and `roas_hdi_upper` to plot the confidence interval. + ### 6. Analysis Workflow While waiting for results, you can suggest to the user to perform exploratory data analysis. Here some ideas: @@ -125,4 +120,6 @@ After retrieving results here are some ideas: - Spend with Saturation: Overlay total spend as a dashed line on the saturation plot. +** Important Reminder ** +- Throughout your interactions provide **concise responses** using bullet points and formulas when appropriate. \ No newline at end of file diff --git a/nginx/cloudbuild.yaml b/nginx/cloudbuild.yaml index fb65fe7..0c9c84b 100644 --- a/nginx/cloudbuild.yaml +++ b/nginx/cloudbuild.yaml @@ -37,7 +37,4 @@ images: - ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPO_NAME}/${_SERVICE_NAME} substitutions: - _SERVICE_FOLDER: . - _REPO_NAME: gcr2gce-proxy # Name of Google Artifact Registry (GAR)'s repo - _SERVICE_NAME: gcr2gce-proxy # Name of Cloud Run Service - _REGION: us-central1 + _SERVICE_FOLDER: . \ No newline at end of file diff --git a/nginx/deploy.sh b/nginx/deploy.sh new file mode 100755 index 0000000..6b40fac --- /dev/null +++ b/nginx/deploy.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# Validate environment argument +if [ "$1" != "production" ] && [ "$1" != "development" ]; then + echo "Error: Environment must be specified as either 'production' or 'development'" + echo "Usage: $0 " + exit 1 +fi + +ENVIRONMENT=$1 + +# Function to read config value +get_nginx_config() { + local key=$1 + yq ".$ENVIRONMENT.nginx.$key" ../config.yaml | tr -d '"' +} + +# Get configuration values +SERVER_NAME=$(get_nginx_config "serverName") +IP_ADDRESS=$(get_nginx_config "ipAddress") +PORT=$(get_nginx_config "port") +REPO_NAME=$(get_nginx_config "repoName") +SERVICE_NAME=$(get_nginx_config "serviceName") +REGION=$(get_nginx_config "region") + +# Replace placeholders in nginx.conf.template +sed -e "s/\${SERVER_NAME}/$SERVER_NAME/g" \ + -e "s/\${IP_ADDRESS}/$IP_ADDRESS/g" \ + -e "s/\${PORT}/$PORT/g" nginx.conf.template > nginx.conf + + +# Deploy +gcloud builds submit --config cloudbuild.yaml --substitutions=_REPO_NAME=$REPO_NAME,_SERVICE_NAME=$SERVICE_NAME,_REGION=$REGION + +# Clean up +rm nginx.conf diff --git a/nginx/nginx.conf b/nginx/nginx.conf.template similarity index 85% rename from nginx/nginx.conf rename to nginx/nginx.conf.template index 8687395..e62aeae 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf.template @@ -14,10 +14,10 @@ http { server { listen 8080; - server_name nextgen-mmm.pymc-labs.com; + server_name ${SERVER_NAME}; location / { - proxy_pass http://35.208.203.115:5000; + proxy_pass http://${IP_ADDRESS}:${PORT}; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; diff --git a/start.sh b/start.sh index ca9ce9c..029ae89 100755 --- a/start.sh +++ b/start.sh @@ -1,8 +1,11 @@ #!/bin/bash set -e -# Start Redis in the background -redis-server --daemonize yes +# Start Redis in the background with specific directory configurations +redis-server --daemonize yes \ + --dir /opt/conda/var/db/redis \ + --logfile /opt/conda/var/db/redis/redis.log \ + --pidfile /opt/conda/var/db/redis/redis.pid # Start Celery worker in the background celery -A app.celery worker --loglevel=info --concurrency=4 & diff --git a/test_mmm_async.py b/test_mmm_async.py index d228909..5437086 100644 --- a/test_mmm_async.py +++ b/test_mmm_async.py @@ -2,48 +2,57 @@ import json import time import pandas as pd - +import os import sys import io +import dotenv -def create_payload_csv(): - # Load the user-uploaded data file - data = pd.read_csv('test-data/mmm_example.csv') - # Rename the 'y' column to 'sales' and select relevant columns - data.rename(columns={'y': 'sales'}, inplace=True) - mmm_data = data[['date_week', 'sales', 'x1', 'x2', 'event_1', 'event_2', 't']] +dotenv.load_dotenv() - # Convert 'date_week' to datetime format - mmm_data.loc[:, 'date_week'] = pd.to_datetime(mmm_data['date_week']).dt.strftime('%Y-%m-%d') +API_KEY = os.environ.get('API_KEY', None) - # Convert the prepared data to JSON format for payload, ensuring proper formatting - data_json = mmm_data.to_json(orient="split", index=False) - #print(data_json) - # Example payload +def create_payload(): payload = { - "df": data_json, + "domain": "dev-nextgen-mmm.pymc-labs.com", + "method": "post", + "path": "/run_mmm_async", + "operation": "runMMMAsync", + "operation_hash": "0c869884cb92378e2dfe2ae377cac236cbc2b9d0", + "is_consequential": True, + "openaiFileIdRefs": [ + { + "name": "mmm_example.csv", + "id": "file-1234567890", + "mime_type": "text/csv", + "download_link": "https://raw.githubusercontent.com/pymc-labs/pymc-marketing/refs/heads/main/data/mmm_example.csv" + } + ], "date_column": "date_week", - "channel_columns": ["x1", "x2"], - "adstock_max_lag": 2, - "yearly_seasonality": 8, - "control_columns": ["event_1", "event_2", "t"] + "channel_columns": [ + "x1", + "x2" + ], + "adstock_max_lag": 8, + "yearly_seasonality": 2, + "y_column": "y" } - return payload def test_async_mmm_run(base_url): # Payload that includes data - payload = create_payload_csv() + payload = create_payload() # Replace with your API endpoint for async run run_url = f"{base_url}/run_mmm_async" # Make a POST request to initiate the model run - headers = {'Content-Type': 'application/json'} + headers = { + 'Content-Type': 'application/json', + 'X-API-Key': API_KEY + } response = requests.post(run_url, data=json.dumps(payload), headers=headers) - print(response) # Assert the status code for initiation assert response.status_code == 200 @@ -52,11 +61,11 @@ def test_async_mmm_run(base_url): print(f"Got task_id {task_id}") # Polling URL - results_url = f"{base_url}/get_results?task_id={task_id}" + results_url = f"{base_url}/get_summary_statistics?task_id={task_id}" # Poll for results while True: - result_response = requests.get(results_url) + result_response = requests.get(results_url, headers=headers) result_data = result_response.json() if result_data["status"] == "completed": @@ -87,10 +96,12 @@ def test_async_mmm_run(base_url): if environment == "local": base_url = "http://localhost:5001" - elif environment == "deployed": + elif environment == "deployed-production": base_url = "https://nextgen-mmm.pymc-labs.com" + elif environment == "deployed-development": + base_url = "https://dev-nextgen-mmm.pymc-labs.com" else: - print("Invalid argument. Use 'local' or 'deployed'.") + print("Invalid argument. Use 'local' or 'deployed-production' or 'deployed-development'.") sys.exit(1) test_async_mmm_run(base_url)