Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge build-to-app into master #53

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions python/ray/experimental/dag/class_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from importlib import import_module
import json
from typing import Any, Dict, List, Optional, Tuple
import base64

import ray
import ray.cloudpickle as pickle
from ray.experimental.dag.dag_node import DAGNode
from ray.experimental.dag.input_node import InputNode
from ray.experimental.dag.format_utils import get_dag_node_str
Expand All @@ -7,8 +13,7 @@
PREV_CLASS_METHOD_CALL_KEY,
DAGNODE_TYPE_KEY,
)

from typing import Any, Dict, List, Optional, Tuple
from ray.serve.utils import parse_import_path


class ClassNode(DAGNode):
Expand Down Expand Up @@ -94,23 +99,31 @@ def get_import_path(self) -> str:
def to_json(self, encoder_cls) -> Dict[str, Any]:
json_dict = super().to_json_base(encoder_cls, ClassNode.__name__)
import_path = self.get_import_path()
error_message = (
"Class used in DAG should not be in-line defined when exporting"
"import path for deployment. Please ensure it has fully "
"qualified name with valid __module__ and __qualname__ for "
"import path, with no __main__ or <locals>. \n"
f"Current import path: {import_path}"
)
assert "__main__" not in import_path, error_message
assert "<locals>" not in import_path, error_message
if "__main__" in import_path or "<locals>" in import_path:
# Best effort to get FQN string import path
json_dict["import_path"] = base64.b64encode(
pickle.dumps(self._body)
).decode()
else:
json_dict["import_path"] = import_path

json_dict["import_path"] = import_path
return json_dict

@classmethod
def from_json(cls, input_json, module, object_hook=None):
def from_json(cls, input_json, object_hook=None):
assert input_json[DAGNODE_TYPE_KEY] == ClassNode.__name__
args_dict = super().from_json_base(input_json, object_hook=object_hook)

import_path = input_json["import_path"]
module = import_path
if isinstance(import_path, bytes):
# In dev mode we store pickled class or function body in import_path
# if we failed to get a FQN import path for it.
module = pickle.loads(base64.b64decode(json.loads(import_path)))
else:
module_name, attr_name = parse_import_path(import_path)
module = getattr(import_module(module_name), attr_name)

node = cls(
module.__ray_metadata__.modified_class,
args_dict["args"],
Expand Down Expand Up @@ -220,14 +233,24 @@ def __str__(self) -> str:
def get_method_name(self) -> str:
return self._method_name

def get_body(self):
return self._parent_class_node._body.__ray_actor_class__

def get_import_path(self) -> str:
body = self._parent_class_node._body.__ray_actor_class__
body = self.get_body()
return f"{body.__module__}.{body.__qualname__}"

def to_json(self, encoder_cls) -> Dict[str, Any]:
json_dict = super().to_json_base(encoder_cls, ClassMethodNode.__name__)
json_dict["method_name"] = self.get_method_name()
json_dict["import_path"] = self.get_import_path()
import_path = self.get_import_path()
if "__main__" in import_path or "<locals>" in import_path:
# Best effort to get FQN string import path
json_dict["import_path"] = base64.b64encode(
pickle.dumps(self.get_body())
).decode()
else:
json_dict["import_path"] = import_path
return json_dict

@classmethod
Expand Down
36 changes: 25 additions & 11 deletions python/ray/experimental/dag/function_node.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import base64
from importlib import import_module
import json
from typing import Any, Dict, List


import ray
import ray.cloudpickle as pickle
from ray.experimental.dag.dag_node import DAGNode
from ray.experimental.dag.format_utils import get_dag_node_str
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY
from ray.serve.utils import parse_import_path


class FunctionNode(DAGNode):
Expand Down Expand Up @@ -64,22 +69,31 @@ def get_import_path(self):
def to_json(self, encoder_cls) -> Dict[str, Any]:
json_dict = super().to_json_base(encoder_cls, FunctionNode.__name__)
import_path = self.get_import_path()
error_message = (
"Function used in DAG should not be in-line defined when exporting"
"import path for deployment. Please ensure it has fully "
"qualified name with valid __module__ and __qualname__ for "
"import path, with no __main__ or <locals>. \n"
f"Current import path: {import_path}"
)
assert "__main__" not in import_path, error_message
assert "<locals>" not in import_path, error_message
json_dict["import_path"] = import_path
if "__main__" in import_path or "<locals>" in import_path:
# Best effort to get FQN string import path
json_dict["import_path"] = base64.b64encode(
pickle.dumps(self._body)
).decode()
else:
json_dict["import_path"] = import_path

return json_dict

@classmethod
def from_json(cls, input_json, module, object_hook=None):
def from_json(cls, input_json, object_hook=None):
assert input_json[DAGNODE_TYPE_KEY] == FunctionNode.__name__
args_dict = super().from_json_base(input_json, object_hook=object_hook)

import_path = input_json["import_path"]
module = import_path
if isinstance(import_path, bytes):
# In dev mode we store pickled class or function body in import_path
# if we failed to get a FQN import path for it.
module = pickle.loads(base64.b64decode(json.loads(import_path)))
else:
module_name, attr_name = parse_import_path(import_path)
module = getattr(import_module(module_name), attr_name)

node = cls(
module._function,
args_dict["args"],
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def deploy(
self.log_deployment_ready(name, version, url, tag)

@_ensure_connected
def deploy_group(self, deployments: List[Dict], _blocking: bool = True):
def deploy_group(self, deployments: List[Dict], _blocking: bool = True) -> None:
deployment_args_list = []
for deployment in deployments:
deployment_args_list.append(
Expand Down
8 changes: 4 additions & 4 deletions python/ray/serve/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def add_deployment(self, deployment: Deployment):

self._deployments[deployment.name] = deployment

def deploy(self, blocking: bool = True):
def deploy(self, blocking: bool = True, return_sync_handle=True):
"""Atomically deploys the Application's deployments to the Ray cluster.

The Application's deployments can carry handles to one another.
Expand Down Expand Up @@ -102,9 +102,9 @@ def deploy(self, blocking: bool = True):

parameter_group.append(deployment_parameters)

return internal_get_global_client().deploy_group(
parameter_group, _blocking=blocking
)
internal_get_global_client().deploy_group(parameter_group, _blocking=blocking)

return self._deployments["ingress"].get_handle(sync=return_sync_handle)

def run(self, logger: Union[Logger, _CliLogger] = logger):
"""Deploys all deployments in this Application and logs status.
Expand Down
12 changes: 8 additions & 4 deletions python/ray/serve/pipeline/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ray.experimental.dag import DAGNode
from ray.serve.application import Application
from ray.serve.pipeline.generate import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
Expand Down Expand Up @@ -63,12 +64,15 @@ def build(ray_dag_root_node: DAGNode):
>>> # This will fail where enforcements are applied.
>>> deployment_yaml = app.to_yaml()
"""

serve_root_dag = ray_dag_root_node.apply_recursive(transform_ray_dag_to_serve_dag)
deployments = extract_deployments_from_serve_dag(serve_root_dag)

app = Application(deployments)

# No JSON serde or FQN import path enforced yet.
pipeline_input_node = get_pipeline_input_node(serve_root_dag)
ingress_deployment = get_ingress_deployment(serve_root_dag, pipeline_input_node)
deployments.insert(0, ingress_deployment)
app.add_deployment(ingress_deployment)

# TODO (jiaodong): Call into Application once Shreyas' PR is merged
# TODO (jiaodong): Apply enforcements at serve app to_yaml level
return deployments
return app
30 changes: 27 additions & 3 deletions python/ray/serve/pipeline/deployment_method_node.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from typing import Any, Dict, Optional, Tuple, List, Union
import base64
from importlib import import_module

import ray.cloudpickle as pickle
from ray.experimental.dag import DAGNode
from ray.experimental.dag.format_utils import get_dag_node_str
from ray.serve.handle import RayServeSyncHandle, RayServeHandle
from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY
from ray.serve.api import Deployment, DeploymentConfig
from ray.serve.utils import parse_import_path


class DeploymentMethodNode(DAGNode):
Expand Down Expand Up @@ -103,30 +107,50 @@ def get_deployment_name(self) -> str:
def get_deployment_method_name(self) -> str:
return self._deployment_method_name

def get_body(self):
return self._deployment._func_or_class.__ray_actor_class__

def get_import_path(self) -> str:
if isinstance(self._deployment._func_or_class, str):
# We're processing a deserilized JSON node where import_path
# is dag_node body.
return self._deployment._func_or_class
else:
body = self._deployment._func_or_class.__ray_actor_class__
body = self.get_body()
return f"{body.__module__}.{body.__qualname__}"

def to_json(self, encoder_cls) -> Dict[str, Any]:
json_dict = super().to_json_base(encoder_cls, DeploymentMethodNode.__name__)
json_dict["deployment_name"] = self.get_deployment_name()
json_dict["deployment_method_name"] = self.get_deployment_method_name()
json_dict["import_path"] = self.get_import_path()
import_path = self.get_import_path()
if "__main__" in import_path or "<locals>" in import_path:
# Best effort to get FQN string import path
json_dict["import_path"] = base64.b64encode(
pickle.dumps(self.get_body())
).decode()
else:
json_dict["import_path"] = import_path

return json_dict

@classmethod
def from_json(cls, input_json, object_hook=None):
assert input_json[DAGNODE_TYPE_KEY] == DeploymentMethodNode.__name__
args_dict = super().from_json_base(input_json, object_hook=object_hook)
import_path = input_json["import_path"]
module = import_path
try:
# In dev mode we store pickled class or function body in import_path
# if we failed to get a FQN import path for it.
module = pickle.loads(base64.b64decode(import_path))
except Exception:
module_name, attr_name = parse_import_path(input_json["import_path"])
module = getattr(import_module(module_name), attr_name)

return cls(
Deployment(
input_json["import_path"],
module,
input_json["deployment_name"],
# TODO: (jiaodong) Support deployment config from user input
DeploymentConfig(),
Expand Down
43 changes: 28 additions & 15 deletions python/ray/serve/pipeline/deployment_node.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from importlib import import_module
from typing import Any, Callable, Dict, Optional, List, Tuple, Union
import base64

import ray.cloudpickle as pickle
from ray.experimental.dag import DAGNode, InputNode
from ray.serve.handle import RayServeSyncHandle, RayServeHandle
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY
from ray.experimental.dag.constants import DAGNODE_TYPE_KEY
from ray.experimental.dag.format_utils import get_dag_node_str
from ray.serve.api import Deployment, DeploymentConfig
from ray.serve.utils import parse_import_path


class DeploymentNode(DAGNode):
Expand Down Expand Up @@ -157,40 +161,49 @@ def __str__(self) -> str:
def get_deployment_name(self):
return self._deployment.name

def get_import_path(self):
def get_body(self):
return self._deployment._func_or_class.__ray_actor_class__

def get_import_path(self) -> str:
if isinstance(self._deployment._func_or_class, str):
# We're processing a deserilized JSON node where import_path
# is dag_node body.
return self._deployment._func_or_class
else:
body = self._deployment._func_or_class.__ray_actor_class__
body = self.get_body()
return f"{body.__module__}.{body.__qualname__}"

def to_json(self, encoder_cls) -> Dict[str, Any]:
json_dict = super().to_json_base(encoder_cls, DeploymentNode.__name__)
json_dict["deployment_name"] = self.get_deployment_name()
import_path = self.get_import_path()

error_message = (
"Class used in DAG should not be in-line defined when exporting"
"import path for deployment. Please ensure it has fully "
"qualified name with valid __module__ and __qualname__ for "
"import path, with no __main__ or <locals>. \n"
f"Current import path: {import_path}"
)
assert "__main__" not in import_path, error_message
assert "<locals>" not in import_path, error_message

json_dict["import_path"] = import_path
print(f">>> DeploymentNode import_path: {import_path}")
if "__main__" in import_path or "<locals>" in import_path:
# Best effort to get FQN string import path
json_dict["import_path"] = base64.b64encode(
pickle.dumps(self.get_body())
).decode()
else:
json_dict["import_path"] = import_path

return json_dict

@classmethod
def from_json(cls, input_json, object_hook=None):
assert input_json[DAGNODE_TYPE_KEY] == DeploymentNode.__name__
args_dict = super().from_json_base(input_json, object_hook=object_hook)
import_path = input_json["import_path"]
module = import_path
try:
# In dev mode we store pickled class or function body in import_path
# if we failed to get a FQN import path for it.
module = pickle.loads(base64.b64decode(import_path))
except Exception:
module_name, attr_name = parse_import_path(input_json["import_path"])
module = getattr(import_module(module_name), attr_name)

return cls(
input_json["import_path"],
module,
input_json["deployment_name"],
args_dict["args"],
args_dict["kwargs"],
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/pipeline/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ def get_ingress_deployment(
ingress (Deployment): Generated pipeline ingress deployment to serve
user HTTP requests.
"""
print(f"serve_dag_root_node: \n{serve_dag_root_node}")
serve_dag_root_json = json.dumps(serve_dag_root_node, cls=DAGNodeEncoder)
print(f"serve_dag_root_json: \n{serve_dag_root_json}")
preprocessor_import_path = pipeline_input_node.get_preprocessor_import_path()
serve_dag_root_deployment = make_ingress_deployment(
DEFAULT_INGRESS_DEPLOYMENT_NAME,
Expand Down
Loading