Skip to content

Commit

Permalink
Miscellanous improvements (#129)
Browse files Browse the repository at this point in the history
* Add allow_large_pex to submit_func

* return partial logs if all logs couldn't be retrieved (with a warning log)
This allows to specify a larger number of logs if the exact one is not
known and still get logs when number of tries is reached

* Expose yarn_launcher at top module level

* move skein requirements from tests to module
  • Loading branch information
jcuquemelle authored Nov 22, 2024
1 parent fe938c1 commit 087d658
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cluster_pack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
PEX_PACKER,
get_pyenv_usage_from_archive
)

from cluster_pack.skein import skein_launcher as yarn_launcher
8 changes: 6 additions & 2 deletions cluster_pack/skein/skein_config_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def build_with_func(
additional_files: Optional[List[str]] = None,
tmp_dir: str = packaging._get_tmp_dir(),
log_level: str = "INFO",
process_logs: Callable[[str], Any] = None
process_logs: Callable[[str], Any] = None,
allow_large_pex: bool = False
) -> SkeinConfig:
"""Build the skein config from provided a function
Expand All @@ -35,6 +36,9 @@ def build_with_func(
:param log_level: default remote log level
:param process_logs: hook with the local log path as a parameter,
can be used to uplaod the logs somewhere
:param allow_large_pex: Creates a non-executable pex that will need to be unzipped to circumvent
python's limitation with zips > 2Gb. The file will need to be unzipped
and the entry point will be <output>/__main__.py
:return: SkeinConfig
"""
function_name = f"function_{uuid.uuid4()}.dat"
Expand All @@ -57,7 +61,7 @@ def build_with_func(
package_path,
additional_files,
tmp_dir,
process_logs)
process_logs, allow_large_pex=allow_large_pex)


def build(
Expand Down
16 changes: 13 additions & 3 deletions cluster_pack/skein/skein_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def submit_func(skein_client: skein.Client,
acquire_map_reduce_delegation_token: bool = False,
pre_script_hook: Optional[str] = None,
max_attempts: int = 1, max_restarts: int = 0,
process_logs: Callable[[str], Any] = None) -> str:
process_logs: Callable[[str], Any] = None,
allow_large_pex: bool = False) -> str:
"""Submit a function in a skein container
:param skein_client: skein.Client to use
Expand All @@ -111,6 +112,9 @@ def submit_func(skein_client: skein.Client,
:param max_restarts: maximum number of restarts allowed for the service
:param process_logs: hook with the local log path as a parameter,
can be used to uplaod the logs somewhere
:param allow_large_pex: Creates a non-executable pex that will need to be unzipped to circumvent
python's limitation with zips > 2Gb. The file will need to be unzipped
and the entry point will be <output>/__main__.py
:return: SkeinConfig
"""

Expand All @@ -121,7 +125,8 @@ def submit_func(skein_client: skein.Client,
package_path=package_path,
additional_files=additional_files,
tmp_dir=tmp_dir,
process_logs=process_logs)
process_logs=process_logs,
allow_large_pex=allow_large_pex)

return _submit(
skein_client, skein_config,
Expand Down Expand Up @@ -218,17 +223,22 @@ def get_application_logs(
wait_for_nb_logs: Optional[int] = None,
log_tries: int = 15
) -> Optional[skein.model.ApplicationLogs]:
nb_keys = 0
for ind in range(log_tries):
try:
logs = client.application_logs(app_id)
nb_keys = len(logs.keys())
logger.info(f"Got {nb_keys}/{wait_for_nb_logs} log files")
if not wait_for_nb_logs or nb_keys == wait_for_nb_logs:
if not wait_for_nb_logs or nb_keys >= wait_for_nb_logs:
return logs
except Exception:
logger.warning(
f"Cannot collect logs (attempt {ind+1}/{log_tries})")
time.sleep(3)
if nb_keys >= 1:
logger.warning(
f"Only {nb_keys} logs retrieved instead of {wait_for_nb_logs} requested")
return logs
return None


Expand Down
7 changes: 4 additions & 3 deletions examples/skein-project/skein_project/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import tempfile

import cluster_pack
from cluster_pack.skein import skein_config_builder, skein_launcher
from cluster_pack import yarn_launcher
from cluster_pack.skein import skein_config_builder


if __name__ == "__main__":
Expand All @@ -28,8 +29,8 @@
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)

skein_launcher.wait_for_finished(client, app_id)
logs = skein_launcher.get_application_logs(client, app_id, 2)
yarn_launcher.wait_for_finished(client, app_id)
logs = yarn_launcher.get_application_logs(client, app_id, 2)
if logs:
for key, value in logs.items():
print(f"skein logs:{key} {value}")
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ fire
types-setuptools
wheel-filename
fsspec
skein
1 change: 0 additions & 1 deletion tests-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pypandoc<1.8
pyspark
skein
pytest
pyflakes==2.4.0
pylama
Expand Down
14 changes: 6 additions & 8 deletions tests/integration/test_skein_launcher.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import getpass
import logging
import functools
import os
import pytest
import skein
import uuid
import tempfile

from cluster_pack.skein import skein_launcher
from cluster_pack import yarn_launcher
from cluster_pack import filesystem

pytestmark = pytest.mark.hadoop
Expand Down Expand Up @@ -36,13 +34,13 @@ def path_to_hdfs():
def _submit_and_await_app_master(func, assert_result_status=True, assert_log_content=None):
with skein.Client() as client:
log_output_path = f"hdfs:///tmp/{uuid.uuid4()}.log"
app_id = skein_launcher.submit_func(
app_id = yarn_launcher.submit_func(
client,
func=func,
args=[],
memory="2 GiB",
process_logs=functools.partial(skein_launcher.upload_logs_to_hdfs, log_output_path))
result = skein_launcher.wait_for_finished(client, app_id)
process_logs=functools.partial(yarn_launcher.upload_logs_to_hdfs, log_output_path))
result = yarn_launcher.wait_for_finished(client, app_id)

fs, _ = filesystem.resolve_filesystem_and_path(log_output_path)
with fs.open(log_output_path, "rb") as f:
Expand Down Expand Up @@ -78,8 +76,8 @@ def launch_skein():
)
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)
skein_launcher.wait_for_finished(client, app_id)
logs = skein_launcher.get_application_logs(client, app_id, 2)
yarn_launcher.wait_for_finished(client, app_id)
logs = yarn_launcher.get_application_logs(client, app_id, 2)
for key, value in logs.items():
print(f"skein logs:{key} {value}")

Expand Down

0 comments on commit 087d658

Please sign in to comment.