Skip to content

Commit

Permalink
Merge pull request #107 from criteo/release-0.6
Browse files Browse the repository at this point in the history
fix examples script and add logs
  • Loading branch information
jcuquemelle authored May 15, 2023
2 parents a14cb31 + bda035c commit efa2fcf
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 60 deletions.
2 changes: 1 addition & 1 deletion tf_yarn/bin/check_hadoop_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

logger = logging.getLogger(__name__)

PATH_ON_HDFS = f"{cluster_pack.get_default_fs()}/tmp/{uuid.uuid4()}"
PATH_ON_HDFS = f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}/tmp/{uuid.uuid4()}"
FILENAME_ON_HDFS = "hello_tf_yarn.txt"
FILEPATH_ON_HDFS = f"{PATH_ON_HDFS}/{FILENAME_ON_HDFS}"
EXPECTED_CONTENT = "Hello tf-yarn!"
Expand Down
14 changes: 14 additions & 0 deletions tf_yarn/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ def _execute_and_await_termination(
n_try: int = 0,
poll_every_secs: int = 10
) -> Optional[metrics.Metrics]:
logger.info("posting serialized Experiment fcn to skein KV store")
skein_cluster.app.kv[constants.KV_EXPERIMENT_FN] = serialized_fn
eval_metrics_logger = evaluator_metrics.EvaluatorMetricsLogger(
[task for task in _internal.iter_tasks(skein_cluster.tasks)
Expand All @@ -534,18 +535,31 @@ def _execute_and_await_termination(
)

state = None
container_log_urls: Dict[str, str] = {}

while True:
report = skein_cluster.client.application_report(skein_cluster.app.id)
logger.info(
f"Application report for {skein_cluster.app.id} (state: {report.state})")
if state != report.state:
logger.info(_format_app_report(report))

if state == "running":
try:
for key in skein_cluster.app.kv.keys():
if '/logs' in key and key not in container_log_urls.keys():
container_log_urls[key] = skein_cluster.app.kv.wait(key).decode()
except Exception:
pass # accessing the kv if the app is not ready or closed raises, we just ignore

if report.final_status != "undefined":
skein_cluster.event_listener.join()
log_events, result_metrics, container_status = _handle_events(skein_cluster.events,
n_try)
logger.info(log_events)
logger.info('container logs urls:')
for v in container_log_urls.values():
logger.info(v)

containers = container_status.by_container_id()
# add one for AM container
Expand Down
12 changes: 7 additions & 5 deletions tf_yarn/examples/collective_all_reduce_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import winequality
from datetime import datetime
import cluster_pack
import tensorflow_io as tfio
from tf_yarn.tensorflow import Experiment, TaskSpec, run_on_yarn

logger = logging.getLogger(__name__)
Expand All @@ -33,25 +32,27 @@
2. Upload it to HDFS
3. Pass a full URI to either of the CSV files to the example
"""
WINE_EQUALITY_FILE = f"{cluster_pack.get_default_fs()}/user/{USER}/tf_yarn_test/winequality-red.csv"
WINE_QUALITY_FILE = f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}" \
f"/user/{USER}/tf_yarn_test/winequality-red.csv"

"""
Output path of the learned model on hdfs
"""
HDFS_DIR = (f"{cluster_pack.get_default_fs()}/user/{USER}"
HDFS_DIR = (f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}/user/{USER}"
f"/tf_yarn_test/tf_yarn_{int(datetime.now().timestamp())}")


def experiment_fn() -> Experiment:
# To mitigate issue https://github.com/tensorflow/tensorflow/issues/32159 for tf >= 1.15
import tensorflow as tf
import tensorflow_io as tfio

def train_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="train")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="train")
return dataset.shuffle(1000).batch(128).repeat()

def eval_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="test")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="test")
return dataset.shuffle(1000).batch(128)

estimator = tf.compat.v1.estimator.LinearClassifier(
Expand Down Expand Up @@ -88,5 +89,6 @@ def eval_input_fn():
files={
os.path.basename(winequality.__file__): winequality.__file__,
},
file_systems=['viewfs://root', 'hdfs://root'],
custom_task_module="tf_yarn.tensorflow.tasks.gloo_allred_task"
)
25 changes: 14 additions & 11 deletions tf_yarn/examples/keras_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,40 @@
import os
from datetime import datetime

from tensorflow import keras
import cluster_pack
from cluster_pack import filesystem
import tensorflow as tf
import tensorflow_io as tfio

from tf_yarn.examples import winequality
from tf_yarn.tensorflow import TaskSpec, Experiment, run_on_yarn


logger = logging.getLogger()
USER = getpass.getuser()
WINE_EQUALITY_FILE = f"{cluster_pack.get_default_fs()}/user/{USER}/tf_yarn_test/winequality-red.csv"
WINE_QUALITY_FILE = f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}" \
f"/user/{USER}/tf_yarn_test/winequality-red.csv"
# Output path of the learned model on hdfs
HDFS_DIR = (f"{cluster_pack.get_default_fs()}/user/{USER}"
HDFS_DIR = (f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}/user/{USER}"
f"/tf_yarn_test/tf_yarn_{int(datetime.now().timestamp())}")


def importable_experiment_fn(hdfs_dir: str) -> Experiment:
import tensorflow as tf
import tensorflow_io as tfio
from tensorflow import keras

def convert_to_tensor(x, y):
return (tf.convert_to_tensor(value=list(x.values()), dtype=tf.float32),
tf.convert_to_tensor(value=y, dtype=tf.int32))

def train_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="train")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="train")
return (dataset.map(convert_to_tensor)
.shuffle(1000)
.batch(128)
.repeat())

def eval_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="test")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="test")
return (dataset.map(convert_to_tensor)
.shuffle(1000)
.batch(128))
Expand Down Expand Up @@ -78,9 +80,9 @@ def experiment_fn() -> Experiment:


def main():
fs, _ = filesystem.resolve_filesystem_and_path(WINE_EQUALITY_FILE)
if not fs.exists(WINE_EQUALITY_FILE):
raise Exception(f"{WINE_EQUALITY_FILE} not found")
fs, _ = filesystem.resolve_filesystem_and_path(WINE_QUALITY_FILE)
if not fs.exists(WINE_QUALITY_FILE):
raise Exception(f"{WINE_QUALITY_FILE} not found")

# forcing call to model_to_estimator._save_first_checkpoint l457
# https://github.com/tensorflow/estimator/blob/ \
Expand All @@ -100,7 +102,8 @@ def main():
files={
os.path.basename(winequality.__file__): winequality.__file__,
os.path.basename(__file__): __file__
}
},
file_systems=['viewfs://root', 'hdfs://root']
)


Expand Down
20 changes: 11 additions & 9 deletions tf_yarn/examples/linear_classifier_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@
import cluster_pack
from cluster_pack import filesystem
import winequality
import tensorflow_io as tfio

from tf_yarn.tensorflow import Experiment, TaskSpec, run_on_yarn


USER = getpass.getuser()
WINE_EQUALITY_FILE = f"{cluster_pack.get_default_fs()}/user/{USER}/tf_yarn_test/winequality-red.csv"
WINE_QUALITY_FILE = f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}" \
f"/user/{USER}/tf_yarn_test/winequality-red.csv"
# Output path of the learned model on hdfs
HDFS_DIR = (f"{cluster_pack.get_default_fs()}/user/{USER}"
HDFS_DIR = (f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}/user/{USER}"
f"/tf_yarn_test/tf_yarn_{int(datetime.now().timestamp())}")


def experiment_fn() -> Experiment:
# To mitigate issue https://github.com/tensorflow/tensorflow/issues/32159 for tf >= 1.15
import tensorflow as tf
import tensorflow_io as tfio

def train_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="train")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="train")
return (dataset.shuffle(1000)
.batch(128)
.repeat())

def eval_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="test")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="test")
return (dataset.shuffle(1000)
.batch(128))

Expand All @@ -60,9 +61,9 @@ def eval_input_fn():


if __name__ == "__main__":
fs, _ = filesystem.resolve_filesystem_and_path(WINE_EQUALITY_FILE)
if not fs.exists(WINE_EQUALITY_FILE):
raise Exception(f"{WINE_EQUALITY_FILE} not found")
fs, _ = filesystem.resolve_filesystem_and_path(WINE_QUALITY_FILE)
if not fs.exists(WINE_QUALITY_FILE):
raise Exception(f"{WINE_QUALITY_FILE} not found")

run_on_yarn(
experiment_fn,
Expand All @@ -73,5 +74,6 @@ def eval_input_fn():
},
files={
os.path.basename(winequality.__file__): winequality.__file__,
}
},
file_systems=['viewfs://root', 'hdfs://root']
)
21 changes: 12 additions & 9 deletions tf_yarn/examples/mlflow_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import cluster_pack
from cluster_pack import filesystem
import winequality
import tensorflow_io as tfio


from tf_yarn.tensorflow import Experiment, TaskSpec, run_on_yarn


logger = logging.getLogger(__name__)

USER = getpass.getuser()
WINE_EQUALITY_FILE = f"{cluster_pack.get_default_fs()}/user/{USER}/tf_yarn_test/winequality-red.csv"
WINE_QUALITY_FILE = f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}" \
f"/user/{USER}/tf_yarn_test/winequality-red.csv"
# Output path of the learned model on hdfs
HDFS_DIR = (f"{cluster_pack.get_default_fs()}/user/{USER}"
HDFS_DIR = (f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}/user/{USER}"
f"/tf_yarn_test/tf_yarn_{int(datetime.now().timestamp())}")


Expand All @@ -44,15 +45,16 @@ def _get_fs_for_tests():
def experiment_fn() -> Experiment:
# To mitigate issue https://github.com/tensorflow/tensorflow/issues/32159 for tf >= 1.15
import tensorflow as tf
import tensorflow_io as tfio

def train_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="train")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="train")
return (dataset.shuffle(1000)
.batch(128)
.repeat())

def eval_input_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="test")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="test")
return (dataset.shuffle(1000)
.batch(128))

Expand All @@ -74,9 +76,9 @@ def eval_input_fn():


if __name__ == "__main__":
fs, _ = filesystem.resolve_filesystem_and_path(WINE_EQUALITY_FILE)
if not fs.exists(WINE_EQUALITY_FILE):
raise Exception(f"{WINE_EQUALITY_FILE} not found")
fs, _ = filesystem.resolve_filesystem_and_path(WINE_QUALITY_FILE)
if not fs.exists(WINE_QUALITY_FILE):
raise Exception(f"{WINE_QUALITY_FILE} not found")

# you need to install mlflow `pip install mlflow`
# and set MLflow tracking uri
Expand All @@ -101,7 +103,8 @@ def eval_input_fn():
},
files={
os.path.basename(winequality.__file__): winequality.__file__,
}
},
file_systems=['viewfs://root', 'hdfs://root']
)

mlflow.end_run()
Expand Down
21 changes: 12 additions & 9 deletions tf_yarn/examples/native_keras_with_gloo_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
from cluster_pack import filesystem
from tf_yarn.tensorflow import TaskSpec, KerasExperiment, run_on_yarn
import winequality
import tensorflow as tf
import tensorflow_io as tfio

logger = logging.getLogger(__name__)

Expand All @@ -34,28 +32,32 @@


USER = getpass.getuser()
WINE_EQUALITY_FILE = f"{cluster_pack.get_default_fs()}/user/{USER}/tf_yarn_test/winequality-red.csv"
WINE_QUALITY_FILE = f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}" \
f"/user/{USER}/tf_yarn_test/winequality-red.csv"

# Output path of the learned model on hdfs
HDFS_DIR = (f"{cluster_pack.get_default_fs()}/user/{USER}"
HDFS_DIR = (f"{cluster_pack.get_default_fs().replace('viewfs://', 'hdfs://')}/user/{USER}"
f"/tf_yarn_test/tf_yarn_{int(datetime.now().timestamp())}")
HVD_SIZE = 2


def experiment_fn() -> KerasExperiment:
import tensorflow as tf
import tensorflow_io as tfio

def convert_to_tensor(x, y):
return (tf.convert_to_tensor(value=list(x.values()), dtype=tf.float32),
tf.convert_to_tensor(value=y, dtype=tf.int32))

def input_data_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="train")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="train")
return (dataset.map(convert_to_tensor)
.shuffle(1000)
.batch(128)
.repeat())

def validation_data_fn():
dataset = winequality.get_dataset(WINE_EQUALITY_FILE, split="test")
dataset = winequality.get_dataset(WINE_QUALITY_FILE, split="test")
return (dataset.map(convert_to_tensor)
.shuffle(1000)
.batch(128))
Expand Down Expand Up @@ -89,9 +91,9 @@ def validation_data_fn():


def main():
fs, _ = filesystem.resolve_filesystem_and_path(WINE_EQUALITY_FILE)
if not fs.exists(WINE_EQUALITY_FILE):
raise Exception(f"{WINE_EQUALITY_FILE} not found")
fs, _ = filesystem.resolve_filesystem_and_path(WINE_QUALITY_FILE)
if not fs.exists(WINE_QUALITY_FILE):
raise Exception(f"{WINE_QUALITY_FILE} not found")

run_on_yarn(
experiment_fn,
Expand All @@ -104,6 +106,7 @@ def main():
os.path.basename(winequality.__file__): winequality.__file__,
os.path.basename(__file__): __file__,
},
file_systems=['viewfs://root', 'hdfs://root'],
custom_task_module="tf_yarn.tensorflow.tasks.gloo_allred_task"
)

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import torchvision.transforms as transforms
import torch.nn.functional as F


import cluster_pack
from tf_yarn.pytorch import run_on_yarn, TaskSpec, NodeLabel
from tf_yarn.pytorch import PytorchExperiment, DataLoaderArgs
from tf_yarn.pytorch import model_ckpt
Expand Down Expand Up @@ -94,10 +94,13 @@ def experiment_fn():


if __name__ == "__main__":
zip_hdfs, _ = cluster_pack.upload_env(additional_repo='https://download.pytorch.org/whl/cu117',
allow_large_pex=True)
run_on_yarn(
experiment_fn=experiment_fn,
task_specs={
"worker": TaskSpec(memory=48*2**10, vcores=48, instances=2, label=NodeLabel.GPU)
},
pyenv_zip_path=zip_hdfs,
queue="ml-gpu"
)
Loading

0 comments on commit efa2fcf

Please sign in to comment.