diff --git a/cluster_pack/__init__.py b/cluster_pack/__init__.py index f73211c..5dda644 100644 --- a/cluster_pack/__init__.py +++ b/cluster_pack/__init__.py @@ -15,3 +15,5 @@ PEX_PACKER, get_pyenv_usage_from_archive ) + +from cluster_pack.skein import skein_launcher as yarn_launcher diff --git a/examples/skein-project/skein_project/client.py b/examples/skein-project/skein_project/client.py index d737df2..8fead61 100644 --- a/examples/skein-project/skein_project/client.py +++ b/examples/skein-project/skein_project/client.py @@ -3,7 +3,8 @@ import tempfile import cluster_pack -from cluster_pack.skein import skein_config_builder, skein_launcher +from cluster_pack import yarn_launcher +from cluster_pack.skein import skein_config_builder if __name__ == "__main__": @@ -28,8 +29,8 @@ spec = skein.ApplicationSpec(services={"service": service}) app_id = client.submit(spec) - skein_launcher.wait_for_finished(client, app_id) - logs = skein_launcher.get_application_logs(client, app_id, 2) + yarn_launcher.wait_for_finished(client, app_id) + logs = yarn_launcher.get_application_logs(client, app_id, 2) if logs: for key, value in logs.items(): print(f"skein logs:{key} {value}") diff --git a/tests/integration/test_skein_launcher.py b/tests/integration/test_skein_launcher.py index 8dcf12f..eb539c1 100644 --- a/tests/integration/test_skein_launcher.py +++ b/tests/integration/test_skein_launcher.py @@ -1,13 +1,11 @@ -import getpass import logging import functools -import os import pytest import skein import uuid import tempfile -from cluster_pack.skein import skein_launcher +from cluster_pack import yarn_launcher from cluster_pack import filesystem pytestmark = pytest.mark.hadoop @@ -36,13 +34,13 @@ def path_to_hdfs(): def _submit_and_await_app_master(func, assert_result_status=True, assert_log_content=None): with skein.Client() as client: log_output_path = f"hdfs:///tmp/{uuid.uuid4()}.log" - app_id = skein_launcher.submit_func( + app_id = yarn_launcher.submit_func( client, func=func, args=[], memory="2 GiB", - process_logs=functools.partial(skein_launcher.upload_logs_to_hdfs, log_output_path)) - result = skein_launcher.wait_for_finished(client, app_id) + process_logs=functools.partial(yarn_launcher.upload_logs_to_hdfs, log_output_path)) + result = yarn_launcher.wait_for_finished(client, app_id) fs, _ = filesystem.resolve_filesystem_and_path(log_output_path) with fs.open(log_output_path, "rb") as f: @@ -78,8 +76,8 @@ def launch_skein(): ) spec = skein.ApplicationSpec(services={"service": service}) app_id = client.submit(spec) - skein_launcher.wait_for_finished(client, app_id) - logs = skein_launcher.get_application_logs(client, app_id, 2) + yarn_launcher.wait_for_finished(client, app_id) + logs = yarn_launcher.get_application_logs(client, app_id, 2) for key, value in logs.items(): print(f"skein logs:{key} {value}")