Skip to content

Commit

Permalink
Merge branch 'main' into dev/runtime-compatibility-hot-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotzh committed Apr 29, 2024
2 parents 605b88b + 9b31614 commit 4236e7d
Show file tree
Hide file tree
Showing 48 changed files with 797 additions and 327 deletions.
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
"autogen",
"spawnve",
"addrs",
"pycache",
"pywin",
"STARTF",
"mltable",
Expand Down
39 changes: 24 additions & 15 deletions .github/workflows/promptflow-import-linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,16 @@ jobs:
- uses: snok/install-poetry@v1
- name: Install all packages
run: |
cd ${{ env.WORKING_DIRECTORY }}/src/promptflow-tracing
touch promptflow/__init__.py
poetry install --with dev
cd ${{ env.WORKING_DIRECTORY }}/src/promptflow-core
touch promptflow/__init__.py
poetry install --with dev
cd ${{ env.WORKING_DIRECTORY }}/src/promptflow-devkit
touch promptflow/__init__.py
poetry install --with dev
cd ${{ env.WORKING_DIRECTORY }}/src/promptflow-azure
touch promptflow/__init__.py
poetry install --with dev
cd ${{ env.WORKING_DIRECTORY }}/src/promptflow-evals
touch promptflow/__init__.py
poetry install --with dev
touch src/promptflow-tracing/promptflow/__init__.py
poetry install --with dev -C ${{ env.WORKING_DIRECTORY }}/src/promptflow-tracing
touch src/promptflow-core/promptflow/__init__.py
poetry install --with dev -C ${{ env.WORKING_DIRECTORY }}/src/promptflow-core
touch src/promptflow-devkit/promptflow/__init__.py
poetry install --with dev -C ${{ env.WORKING_DIRECTORY }}/src/promptflow-devkit
touch src/promptflow-azure/promptflow/__init__.py
poetry install --with dev -C ${{ env.WORKING_DIRECTORY }}/src/promptflow-azure
touch src/promptflow-evals/promptflow/__init__.py
poetry install --with dev -C ${{ env.WORKING_DIRECTORY }}/src/promptflow-evals
working-directory: ${{ env.WORKING_DIRECTORY }}
- name: import lint
run: |
Expand All @@ -59,4 +54,18 @@ jobs:
cd ${{ env.WORKING_DIRECTORY }}/src/promptflow-evals
poetry run lint-imports
working-directory: ${{ env.WORKING_DIRECTORY }}
- name: import lint testing private imports from global
working-directory: ${{ env.WORKING_DIRECTORY }}/src/promptflow-azure
run: |
set -xe
rm ${{ env.WORKING_DIRECTORY }}/src/promptflow-tracing/promptflow/__init__.py
rm ${{ env.WORKING_DIRECTORY }}/src/promptflow-core/promptflow/__init__.py
rm ${{ env.WORKING_DIRECTORY }}/src/promptflow-devkit/promptflow/__init__.py
rm ${{ env.WORKING_DIRECTORY }}/src/promptflow-azure/promptflow/__init__.py
rm ${{ env.WORKING_DIRECTORY }}/src/promptflow-evals/promptflow/__init__.py
echo "=== Add more import linter when facing more import errors ==="
echo "=== promptflow-azure full lints ==="
poetry run pip install langchain
poetry run python ${{ github.workspace }}/scripts/import_linter/import_linter.py
4 changes: 2 additions & 2 deletions examples/flex-flows/chat-stream/data.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"question": "What is Prompt flow?", "statements": {"correctness": "should explain what's 'Prompt flow'"}}
{"question": "What is ChatGPT? Please explain with consise statement", "statements": { "correctness": "should explain what's ChatGPT", "consise": "It is a consise statement."}}
{"question": "What is Prompt flow?", "chat_history": [], "statements": { "correctness": "result should be 1", "consise": "It is a consise statement."}}
{"question": "What is ChatGPT? Please explain with consise statement", "chat_history": [], "statements": { "correctness": "result should be 1", "consise": "It is a consise statement."}}
{"question": "How many questions did user ask?", "chat_history": [{"role": "user","content": "where is the nearest coffee shop?"},{"role": "system","content": "I'm sorry, I don't know that. Would you like me to look it up for you?"}], "statements": { "correctness": "result should be 1", "consise": "It is a consise statement."}}
56 changes: 56 additions & 0 deletions scripts/import_linter/import_linter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
import subprocess
import multiprocessing
import importlib

git_base = subprocess.check_output(['git', 'rev-parse', '--show-toplevel']).decode().strip()


def walk_and_ignore_pycache(directory):
list = []
for root, dirnames, files in os.walk(directory, topdown=True):
# This line removes any __pycache__ directories from the list
dirnames[:] = [d for d in dirnames if d != '__pycache__' and d != 'tests' and d != 'data']
filenames = [f for f in files if f.endswith('.py') and not f.startswith('__init__')]
for filename in filenames:
# Process files as you would like
list.append(os.path.join(root, filename))
return list


def file_to_import(file):
push_file = []
head_tail = os.path.split(file)
while head_tail[1] != "promptflow" and head_tail[0] != "":
if head_tail[1].endswith(".py"):
push_file.insert(0, head_tail[1][:-3])
else:
push_file.insert(0, head_tail[1])
file = head_tail[0]
head_tail = os.path.split(file)
push_file.insert(0, "promptflow")
return ".".join(push_file)


# If there is an import error, the process will exit with a non-zero exit code
# Find this importlib.import_module as the keyword to search for the error
# The error below this is the import error / circular import error.
def subprocess_check_python_import(file):
print(f'Checking import of {file} on process ID: {os.getpid()}')
importlib.import_module(file)


def process_file(file):
import_name = file_to_import(file)
subprocess_check_python_import(import_name)


if __name__ == '__main__':
pool = multiprocessing.Pool()
list = walk_and_ignore_pycache(git_base + "/src/promptflow-tracing/")
list.extend(walk_and_ignore_pycache(git_base + "/src/promptflow-core/"))
list.extend(walk_and_ignore_pycache(git_base + "/src/promptflow-devkit/"))
list.extend(walk_and_ignore_pycache(git_base + "/src/promptflow-azure/"))
pool.map(process_file, list)
pool.close()
pool.join()
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ async def register_artifact(self, run_id, datastore_name, relative_path, path):
"relativePath": relative_path,
},
}
error_msg_prefix = f"Failed to create Artifact for Run {run_id!r}"
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(url, headers=self._get_header(), json=payload)
if response.status_code == 401 or response.status_code == 403:
# if it's auth issue, return auth_error_message
raise UserAuthenticationError(response.text)
elif response.status_code != 200:
error_message = f"Failed to create Artifact for Run {run_id}. Code={response.status_code}."
error_message = f"{error_msg_prefix}. Code={response.status_code}. Message={response.text}"
logger.error(error_message)
raise ArtifactInternalError(error_message)
except Exception as e:
error_message = f"Failed to create Artifact for Run {run_id}: {str(e)}"
error_message = f"{error_msg_prefix}: {str(e)}"
logger.error(error_message)
raise ArtifactInternalError(error_message) from e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.azure._storage.blob.client import _get_datastore_credential
from promptflow.azure.operations._artifact_client import AsyncArtifactClient
from promptflow.azure.operations._metrics_client import AsyncMetricClient
from promptflow.exceptions import UserErrorException

logger = get_cli_sdk_logger()
Expand All @@ -41,6 +42,7 @@ def __init__(self, run: Run, run_ops: "RunOperations", overwrite=True):
self.datastore = self._get_datastore_with_secrets()
self.blob_service_client = self._init_blob_service_client()
self.artifact_client = AsyncArtifactClient.from_run_operations(run_ops)
self.metric_client = AsyncMetricClient.from_run_operations(run_ops)

def _get_datastore_with_secrets(self):
"""Get datastores with secrets."""
Expand Down Expand Up @@ -117,9 +119,7 @@ async def upload(self) -> Dict:
}
return result_dict

except UserAuthenticationError:
raise
except UploadUserError:
except UserErrorException:
raise
except Exception as e:
raise UploadInternalError(f"{error_msg_prefix}. Error: {e}") from e
Expand Down Expand Up @@ -202,13 +202,6 @@ async def _upload_snapshot(self) -> str:
await self._upload_local_folder_to_blob(temp_local_folder, remote_folder)
return f"{remote_folder}/{self.run.name}/{flow_file}"

async def _upload_metrics(self) -> None:
"""Upload run metrics to cloud."""
logger.debug(f"Uploading metrics for run {self.run.name!r}.")
local_folder = self.run_output_path / LocalStorageFilenames.METRICS
remote_folder = f"{Local2Cloud.BLOB_ROOT_PROMPTFLOW}/{Local2Cloud.BLOB_METRICS}/{self.run.name}"
await self._upload_local_folder_to_blob(local_folder, remote_folder)

async def _upload_flow_logs(self) -> str:
"""Upload flow logs for each line run to cloud."""
logger.debug(f"Uploading flow logs for run {self.run.name!r}.")
Expand Down Expand Up @@ -242,6 +235,33 @@ async def _upload_instance_results(self) -> str:

return remote_file

async def _upload_metrics(self) -> Dict:
"""Write run metrics to metric service."""
logger.debug(f"Uploading metrics for run {self.run.name!r}.")
# system metrics that starts with "__pf__" are reserved for promptflow internal use
metrics = {
k: v for k, v in self.run.properties[FlowRunProperties.SYSTEM_METRICS].items() if k.startswith("__pf__")
}

# add user metrics from local metric file
metric_file = self.run_output_path / LocalStorageFilenames.METRICS
if metric_file.is_file():
with open(metric_file, "r", encoding=DEFAULT_ENCODING) as f:
user_metrics = json.load(f)
if isinstance(user_metrics, dict):
metrics.update(user_metrics)

# convert metrics to float values
try:
metrics = {k: float(v) for k, v in metrics.items()}
except Exception as e:
raise UserErrorException(f"Failed to convert metrics {metrics!r} to float values. Error: {e}") from e

# write metrics to metric service
for k, v in metrics.items():
await self.metric_client.log_metric(self.run.name, k, v)
return metrics

async def _upload_local_folder_to_blob(self, local_folder, remote_folder):
"""Upload local folder to remote folder in blob.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
from typing import Dict

import httpx

from promptflow._sdk._errors import MetricInternalError, SDKError, UserAuthenticationError
from promptflow._sdk._utilities.general_utils import get_promptflow_sdk_version
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.azure._utils.general import get_authorization

logger = get_cli_sdk_logger()

POST_METRICS_URL = (
"{endpoint}/metric/v2.0/subscriptions/{sub}/resourceGroups/{rg}/"
"providers/Microsoft.MachineLearningServices/workspaces/{ws}/runs/{runId}/batchsync"
)


class AsyncMetricClient:
def __init__(
self,
subscription_id,
resource_group,
workspace_name,
service_endpoint,
credential,
):
self.subscription_id = subscription_id
self.resource_group = resource_group
self.workspace_name = workspace_name
self.service_endpoint = service_endpoint
self.credential = credential

async def log_metric(self, run_id, metric_key: str, metric_value: float):
"""Write metric for a run."""
url = POST_METRICS_URL.format(
sub=self.subscription_id,
rg=self.resource_group,
ws=self.workspace_name,
endpoint=self.service_endpoint,
runId=run_id,
)

logger.debug(f"Writing metrics for Run {run_id}...")

payload = {
"values": [
{
"name": metric_key,
"columns": {metric_key: "Double"},
"properties": {"uxMetricType": "azureml.v1.scalar"},
"value": [{"data": {metric_key: metric_value}, "step": 0}],
}
]
}

error_msg_prefix = f"Failed to write metrics for Run {run_id!r}"
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(url, headers=self._get_header(), json=payload)
if response.status_code == 401 or response.status_code == 403:
# if it's auth issue, return auth_error_message
raise UserAuthenticationError(response.text)
elif response.status_code != 200:
error_message = f"{error_msg_prefix}. Code={response.status_code}. Message={response.text}"
logger.error(error_message)
raise MetricInternalError(error_message)
except Exception as e:
error_message = f"{error_msg_prefix}: {str(e)}"
logger.error(error_message)
raise MetricInternalError(error_message) from e

def _get_header(self) -> Dict[str, str]:
headers = {
"Authorization": get_authorization(credential=self.credential),
"Content-Type": "application/json",
"User-Agent": "promptflow/%s" % get_promptflow_sdk_version(),
}
return headers

@classmethod
def from_run_operations(cls, run_ops):
from promptflow.azure.operations import RunOperations

if not isinstance(run_ops, RunOperations):
raise SDKError(f"run_ops should be an instance of azure RunOperations, got {type(run_ops)!r} instead.")

return cls(
subscription_id=run_ops._operation_scope.subscription_id,
resource_group=run_ops._operation_scope.resource_group_name,
workspace_name=run_ops._operation_scope.workspace_name,
service_endpoint=run_ops._service_caller._service_endpoint[0:-1], # remove trailing slash
credential=run_ops._credential,
)
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,9 @@ def _upload(self, run: Union[str, Run]):
# registry the run in the cloud
self._registry_existing_bulk_run(run=run)

# log metrics for the run, it can only be done after the run history record is created
async_run_allowing_running_loop(run_uploader._upload_metrics)

# print portal url when executing in jupyter notebook
if in_jupyter_notebook():
print(f"Portal url: {self._get_run_portal_url(run_id=run.name)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:

return cloud_run

@staticmethod
def check_run_metrics(pf: PFClient, local_pf: LocalPFClient, run: Run):
"""Check the metrics of the run are uploaded to cloud."""
local_metrics = local_pf.runs.get_metrics(run.name)
with patch.object(pf.runs, "_is_system_metric", return_value=False):
# get the metrics of the run
cloud_metrics = pf.runs.get_metrics(run.name)

# check all the user metrics are uploaded to cloud
for k, v in local_metrics.items():
assert cloud_metrics.pop(k) == v

# check all the rest system metrics are uploaded to cloud
assert cloud_metrics == {
"__pf__.nodes.grade.completed": 3.0,
"__pf__.nodes.calculate_accuracy.completed": 1.0,
"__pf__.nodes.aggregation_assert.completed": 1.0,
"__pf__.lines.completed": 3.0,
"__pf__.lines.failed": 0.0,
}


@pytest.mark.timeout(timeout=DEFAULT_TEST_TIMEOUT, method=PYTEST_TIMEOUT_METHOD)
@pytest.mark.e2etest
Expand Down Expand Up @@ -204,12 +225,19 @@ def test_upload_eval_run(self, pf: PFClient, randstr: Callable[[str], str]):
eval_run_name = randstr("eval_run_name_for_test_upload_eval_run")
local_pf = Local2CloudTestHelper.get_local_pf(eval_run_name)
eval_run = local_pf.run(
flow=f"{FLOWS_DIR}/simple_hello_world",
data=f"{DATAS_DIR}/webClassification3.jsonl",
flow=f"{FLOWS_DIR}/classification_accuracy_evaluation",
run=main_run_name,
name=eval_run_name,
column_mapping={"name": "${data.url}"},
column_mapping={
"prediction": "${run.outputs.result}",
"variant_id": "${run.outputs.result}",
"groundtruth": "${run.outputs.result}",
},
)
# check the run metrics are uploaded to cloud
Local2CloudTestHelper.check_run_metrics(pf, local_pf, eval_run)

# check other run details are uploaded to cloud
eval_run = Local2CloudTestHelper.check_local_to_cloud_run(pf, eval_run)
assert eval_run.properties["azureml.promptflow.variant_run_id"] == main_run_name

Expand Down
6 changes: 6 additions & 0 deletions src/promptflow-core/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ def get_all_values():
return values


class SystemMetricKeys:
NODE_PREFIX = "__pf__.nodes"
LINES_COMPLETED = "__pf__.lines.completed"
LINES_FAILED = "__pf__.lines.failed"


class ConnectionProviderConfig:
LOCAL = "local"
AZUREML = "azureml"
Expand Down
Loading

0 comments on commit 4236e7d

Please sign in to comment.