From 06d77db0dca6f04e7ea12d497eafed136c1cddbe Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 23 Sep 2024 17:14:17 -0700 Subject: [PATCH 01/46] Register model with MLflow PySDK now that retries are baked in. This cleans up the code a little and prevents us from having forked logic in Composer to fetch by run_id --- llmfoundry/callbacks/hf_checkpointer.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 65bdcb3b6c..d9ee3d11ef 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -107,14 +107,14 @@ def _maybe_get_license_filename( return None -def _register_model_with_run_id_multiprocess( +def _register_model_multiprocess( mlflow_logger: MLFlowLogger, composer_logging_level: int, model_uri: str, name: str, await_creation_for: int, ): - """Call MLFlowLogger.register_model_with_run_id. + """Call MLFlowLogger.register_model. Used mainly to register from a child process. """ @@ -128,7 +128,7 @@ def _register_model_with_run_id_multiprocess( logging.getLogger('composer').setLevel(composer_logging_level) # Register model. - mlflow_logger.register_model_with_run_id( + mlflow_logger.register_model( model_uri=model_uri, name=name, await_creation_for=await_creation_for, @@ -672,7 +672,7 @@ def tensor_hook( log.debug('Logging Hugging Face model to MLFlow') for i, mlflow_logger in enumerate(self.mlflow_loggers): log.debug( - f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + f'Logging model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', ) local_save_path = str( Path(temp_save_dir) / f'mlflow_save_{i}', @@ -706,7 +706,7 @@ def tensor_hook( 'transformers', 'torch', ] - mlflow_logger.save_model(**model_saving_kwargs) + mlflow_logger.log_model(**model_saving_kwargs) # Upload the license file generated by mlflow during the model saving. license_filename = _maybe_get_license_filename( @@ -730,7 +730,7 @@ def tensor_hook( # Spawn a new process to register the model. process = SpawnProcess( - target=_register_model_with_run_id_multiprocess, + target=_register_model_multiprocess, kwargs={ 'mlflow_logger': mlflow_logger, From e40e5dd27331fb136c21f0a1ddf17bf46a8247d3 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 23 Sep 2024 17:43:25 -0700 Subject: [PATCH 02/46] Register model with MLflow PySDK now that retries are baked in. This cleans up the code a little and prevents us from having forked logic in Composer to fetch by run_id --- llmfoundry/callbacks/hf_checkpointer.py | 61 +++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index d9ee3d11ef..6098bbeefd 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -135,6 +135,43 @@ def _register_model_multiprocess( ) +def _log_model_multiprocess( + mlflow_logger: MLFlowLogger, + composer_logging_level: int, + input_example: dict[str, Any], + task: str, + name: str, + model_name: str, + await_creation_for: int, +): + """ + Call MLFlowLogger.log_model. + + Used mainly to log from a child process. + """ + # Setup logging for child process. This ensures that any logs from composer are surfaced. + if composer_logging_level > 0: + # If logging_level is 0, then the composer logger was unset. + logging.basicConfig( + format= + f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', + ) + logging.getLogger('composer').setLevel(composer_logging_level) + mlflow_logger.log_model( + flavor='transformers', + artifact_path="model", + input_example=input_example, + task=task, + metadata={ + "task": task, + "databricks_model_source": "genai-fine-tuning", + "pretrained_model_name": model_name, + }, # This metadata is currently needed for optimized serving + registered_model_name=name, + await_creation_for=await_creation_for + ) + + class HuggingFaceCheckpointer(Callback): """Save a huggingface formatted checkpoint during training. @@ -729,6 +766,30 @@ def tensor_hook( monitor_process = None # Spawn a new process to register the model. + # Slower method to register the model via log_model. + # TODO: design this with some extra param in the model saving config to invoke this + if True: + process = SpawnProcess( + target=_log_model_multiprocess, + kwargs={ + 'mlflow_logger': + mlflow_logger, + 'composer_logging_level': + logging.getLogger('composer').level, + 'model_uri': + local_save_path, + 'name': + self.mlflow_registered_model_name, + 'model_name': + self.pretrained_model_name, + 'input_example': + self.mlflow_logging_config['input_example'], + 'await_creation_for': + 3600, + }, + ) + process.start() + # Faster method to register model in parallel. process = SpawnProcess( target=_register_model_multiprocess, kwargs={ From 454e18b3aae56cd7bc761880970529fafd9724cb Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 23 Sep 2024 17:56:03 -0700 Subject: [PATCH 03/46] small changes --- llmfoundry/callbacks/hf_checkpointer.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 6098bbeefd..8956858a93 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -239,6 +239,7 @@ def __init__( + f'Defaulting to final_register_only=False and saving the HuggingFace checkpoint to {save_folder=}.', ) + self.use_mlflow_log_model = False # mlflow config setup if mlflow_logging_config is None: @@ -269,6 +270,8 @@ def __init__( 'input_example', default_input_example, ) + if mlflow_logging_config['use_mlflow_log_model']: + self.use_mlflow_log_model = True self.mlflow_logging_config = mlflow_logging_config if 'metadata' in self.mlflow_logging_config: @@ -709,7 +712,7 @@ def tensor_hook( log.debug('Logging Hugging Face model to MLFlow') for i, mlflow_logger in enumerate(self.mlflow_loggers): log.debug( - f'Logging model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', ) local_save_path = str( Path(temp_save_dir) / f'mlflow_save_{i}', @@ -743,7 +746,7 @@ def tensor_hook( 'transformers', 'torch', ] - mlflow_logger.log_model(**model_saving_kwargs) + mlflow_logger.save_model(**model_saving_kwargs) # Upload the license file generated by mlflow during the model saving. license_filename = _maybe_get_license_filename( @@ -767,8 +770,7 @@ def tensor_hook( # Spawn a new process to register the model. # Slower method to register the model via log_model. - # TODO: design this with some extra param in the model saving config to invoke this - if True: + if self.use_mlflow_log_model: process = SpawnProcess( target=_log_model_multiprocess, kwargs={ From c8bd06f26610aec7a970b23045136b492000cc90 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 23 Sep 2024 18:03:23 -0700 Subject: [PATCH 04/46] isolated changes --- llmfoundry/callbacks/hf_checkpointer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 8956858a93..61d348967c 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -107,7 +107,7 @@ def _maybe_get_license_filename( return None -def _register_model_multiprocess( +def _register_model_with_run_id_multiprocess( mlflow_logger: MLFlowLogger, composer_logging_level: int, model_uri: str, @@ -128,7 +128,7 @@ def _register_model_multiprocess( logging.getLogger('composer').setLevel(composer_logging_level) # Register model. - mlflow_logger.register_model( + mlflow_logger.register_model_with_run_id( model_uri=model_uri, name=name, await_creation_for=await_creation_for, @@ -793,7 +793,7 @@ def tensor_hook( process.start() # Faster method to register model in parallel. process = SpawnProcess( - target=_register_model_multiprocess, + target=_register_model_with_run_id_multiprocess, kwargs={ 'mlflow_logger': mlflow_logger, From 0d3f9cefd29fdb2eeff93d48a289ac285d18c654 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 1 Oct 2024 17:26:51 -0700 Subject: [PATCH 05/46] pr feedback with a print statement for testing --- llmfoundry/callbacks/hf_checkpointer.py | 30 ++++++++++++++----------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 61d348967c..f0cfe742f9 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -138,10 +138,11 @@ def _register_model_with_run_id_multiprocess( def _log_model_multiprocess( mlflow_logger: MLFlowLogger, composer_logging_level: int, + flavor: str, input_example: dict[str, Any], + log_model_metadata: dict[str, str], task: str, - name: str, - model_name: str, + registered_model_name: str, await_creation_for: int, ): """ @@ -158,16 +159,12 @@ def _log_model_multiprocess( ) logging.getLogger('composer').setLevel(composer_logging_level) mlflow_logger.log_model( - flavor='transformers', - artifact_path="model", + flavor=flavor, + artifact_path="model", # TODO: where should we define this parent dir name? input_example=input_example, + metadata=log_model_metadata, # This metadata is currently needed for optimized serving task=task, - metadata={ - "task": task, - "databricks_model_source": "genai-fine-tuning", - "pretrained_model_name": model_name, - }, # This metadata is currently needed for optimized serving - registered_model_name=name, + registered_model_name=registered_model_name, await_creation_for=await_creation_for ) @@ -769,18 +766,25 @@ def tensor_hook( monitor_process = None # Spawn a new process to register the model. + # TODO: how do we fix intermediate checkpointing logic to use this too but not register + # the model with that param + # TODO: pass in model correctly # Slower method to register the model via log_model. if self.use_mlflow_log_model: + print("----------------- REACHED MLFLOW LOG MODEL -----------------") process = SpawnProcess( target=_log_model_multiprocess, kwargs={ 'mlflow_logger': mlflow_logger, + 'flavor': + 'peft' if self.using_peft else 'transformers', 'composer_logging_level': logging.getLogger('composer').level, - 'model_uri': - local_save_path, - 'name': + 'task': + self.mlflow_logging_config['metadata']['task'], + 'log_model_metadata': self.mlflow_logging_config['metadata'], + 'registered_model_name': self.mlflow_registered_model_name, 'model_name': self.pretrained_model_name, From 6ea8de5d04a100bc6a649a90301fe05c57bcc07a Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 3 Oct 2024 17:36:05 -0700 Subject: [PATCH 06/46] some more todos and need to test --- llmfoundry/callbacks/hf_checkpointer.py | 123 +++++++++++++++--------- 1 file changed, 80 insertions(+), 43 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index f0cfe742f9..9a6cd6272f 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -142,13 +142,23 @@ def _log_model_multiprocess( input_example: dict[str, Any], log_model_metadata: dict[str, str], task: str, - registered_model_name: str, await_creation_for: int, + registered_model_name: Optional[str] = None, ): """ Call MLFlowLogger.log_model. Used mainly to log from a child process. + + Inputs: + - mlflow_logger: MLFlowLogger: MLflow logger object + - composer_logging_level: int: logging level for composer + - flavor: str: transformers or peft + - input_example: dict[str, Any]: model serving input example for model + - log_model_metadata: dict[str, str]: This metadata is currently needed for optimized serving + - task: str: LLM task for model deployment (i.e. chat or completions) + - await_creation_for: int: time to wait for model creation + - registered_model_name: Optional """ # Setup logging for child process. This ensures that any logs from composer are surfaced. if composer_logging_level > 0: @@ -158,15 +168,53 @@ def _log_model_multiprocess( f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) logging.getLogger('composer').setLevel(composer_logging_level) - mlflow_logger.log_model( - flavor=flavor, - artifact_path="model", # TODO: where should we define this parent dir name? - input_example=input_example, - metadata=log_model_metadata, # This metadata is currently needed for optimized serving - task=task, - registered_model_name=registered_model_name, - await_creation_for=await_creation_for - ) + + # monkey patch to prevent duplicate tokenizer upload + import mlflow + original_save_model = mlflow.transformers.save_model + def save_model_patch(*args, **kwargs): + original_save_model(*args, **kwargs) + print(f"List of root path: {os.listdir(kwargs['path'])}") + components_path = os.path.join(kwargs['path'], 'components') + if os.path.exists(components_path): + print(f"List of components path: {components_path}: {os.listdir(components_path)}") + tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') + if os.path.exists(tokenizer_path): + tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) + print(f"Tokenizer files: {tokenizer_files}") + try: + print(f"List of model/model/ files: {os.listdir(os.path.join(kwargs['path'], 'model'))}") + except Exception as e: + print(f"exception", e) + # TODO: what do we do here in code?? + for tokenizer_file_name in tokenizer_files: + try: + dupe_file = os.path.isfile(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) + if dupe_file: + os.remove(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) + except Exception as e: + print(f"exception", e) + mlflow.transformers.save_model = save_model_patch + + if registered_model_name is not None: + mlflow_logger.log_model( + flavor=flavor, + artifact_path='model', # TODO: where should we define this parent dir name? + input_example=input_example, + metadata=log_model_metadata, + task=task, + registered_model_name=registered_model_name, + await_creation_for=await_creation_for + ) + else: + mlflow_logger.log_model( + flavor=flavor, + artifact_path='model', # TODO: where should we define this parent dir name? + input_example=input_example, + metadata=log_model_metadata, + task=task, + await_creation_for=await_creation_for + ) class HuggingFaceCheckpointer(Callback): @@ -676,23 +724,29 @@ def tensor_hook( self.flatten_imports, ) - if self.remote_ud is not None: - for filename in os.listdir(temp_save_dir): - remote_file_name = os.path.join(save_dir, filename) - remote_file_uri = self.remote_ud.remote_backend.get_uri( - remote_file_name, - ) - log.info( - f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}', - ) - self.remote_ud.upload_file( - state=state, - remote_file_name=remote_file_name, - file_path=Path( - os.path.join(temp_save_dir, filename), - ), - overwrite=self.overwrite, + # TODO: Log the model without registering + for i, mlflow_logger in enumerate(self.mlflow_loggers): + process = SpawnProcess( + target=_log_model_multiprocess, + kwargs={ + 'mlflow_logger': + mlflow_logger, + 'flavor': + 'peft' if self.using_peft else 'transformers', + 'composer_logging_level': + logging.getLogger('composer').level, + 'task': + self.mlflow_logging_config['metadata']['task'], + 'log_model_metadata': self.mlflow_logging_config['metadata'], + 'model_name': + self.pretrained_model_name, + 'input_example': + self.mlflow_logging_config['input_example'], + 'await_creation_for': + 3600, + }, ) + process.start() dist.barrier() @@ -795,23 +849,6 @@ def tensor_hook( }, ) process.start() - # Faster method to register model in parallel. - process = SpawnProcess( - target=_register_model_with_run_id_multiprocess, - kwargs={ - 'mlflow_logger': - mlflow_logger, - 'composer_logging_level': - logging.getLogger('composer').level, - 'model_uri': - local_save_path, - 'name': - self.mlflow_registered_model_name, - 'await_creation_for': - 3600, - }, - ) - process.start() # Restore the monitor process. if monitor_process is not None: From 81306d8ecc39438cf19a641149ec12f27ec3fba3 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 10 Oct 2024 17:25:56 -0700 Subject: [PATCH 07/46] need to test --- llmfoundry/callbacks/hf_checkpointer.py | 60 ++++++++++++------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 9a6cd6272f..a8255e5087 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -138,6 +138,7 @@ def _register_model_with_run_id_multiprocess( def _log_model_multiprocess( mlflow_logger: MLFlowLogger, composer_logging_level: int, + transformers_model_path: str, flavor: str, input_example: dict[str, Any], log_model_metadata: dict[str, str], @@ -160,6 +161,7 @@ def _log_model_multiprocess( - await_creation_for: int: time to wait for model creation - registered_model_name: Optional """ + print("----------------- REACHED MLFLOW LOG MODEL -----------------") # Setup logging for child process. This ensures that any logs from composer are surfaced. if composer_logging_level > 0: # If logging_level is 0, then the composer logger was unset. @@ -168,36 +170,10 @@ def _log_model_multiprocess( f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) logging.getLogger('composer').setLevel(composer_logging_level) - - # monkey patch to prevent duplicate tokenizer upload - import mlflow - original_save_model = mlflow.transformers.save_model - def save_model_patch(*args, **kwargs): - original_save_model(*args, **kwargs) - print(f"List of root path: {os.listdir(kwargs['path'])}") - components_path = os.path.join(kwargs['path'], 'components') - if os.path.exists(components_path): - print(f"List of components path: {components_path}: {os.listdir(components_path)}") - tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') - if os.path.exists(tokenizer_path): - tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) - print(f"Tokenizer files: {tokenizer_files}") - try: - print(f"List of model/model/ files: {os.listdir(os.path.join(kwargs['path'], 'model'))}") - except Exception as e: - print(f"exception", e) - # TODO: what do we do here in code?? - for tokenizer_file_name in tokenizer_files: - try: - dupe_file = os.path.isfile(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) - if dupe_file: - os.remove(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) - except Exception as e: - print(f"exception", e) - mlflow.transformers.save_model = save_model_patch if registered_model_name is not None: mlflow_logger.log_model( + transformers_model=transformers_model_path, flavor=flavor, artifact_path='model', # TODO: where should we define this parent dir name? input_example=input_example, @@ -208,6 +184,7 @@ def save_model_patch(*args, **kwargs): ) else: mlflow_logger.log_model( + transformers_model=transformers_model_path, flavor=flavor, artifact_path='model', # TODO: where should we define this parent dir name? input_example=input_example, @@ -724,13 +701,34 @@ def tensor_hook( self.flatten_imports, ) - # TODO: Log the model without registering + if self.remote_ud is not None: + for filename in os.listdir(temp_save_dir): + remote_file_name = os.path.join(save_dir, filename) + remote_file_uri = self.remote_ud.remote_backend.get_uri( + remote_file_name, + ) + log.info( + f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}', + ) + self.remote_ud.upload_file( + state=state, + remote_file_name=remote_file_name, + file_path=Path( + os.path.join(temp_save_dir, filename), + ), + overwrite=self.overwrite, + ) + + # Log intermediate checkpoints to MLflow, but don't register the model + # TODO: is this the right place to place this code? for i, mlflow_logger in enumerate(self.mlflow_loggers): process = SpawnProcess( target=_log_model_multiprocess, kwargs={ 'mlflow_logger': mlflow_logger, + 'transformers_model_path': + temp_save_dir, 'flavor': 'peft' if self.using_peft else 'transformers', 'composer_logging_level': @@ -820,17 +818,15 @@ def tensor_hook( monitor_process = None # Spawn a new process to register the model. - # TODO: how do we fix intermediate checkpointing logic to use this too but not register - # the model with that param - # TODO: pass in model correctly # Slower method to register the model via log_model. if self.use_mlflow_log_model: - print("----------------- REACHED MLFLOW LOG MODEL -----------------") process = SpawnProcess( target=_log_model_multiprocess, kwargs={ 'mlflow_logger': mlflow_logger, + 'transformers_model_path': + local_save_path, 'flavor': 'peft' if self.using_peft else 'transformers', 'composer_logging_level': From bc73f65d0fd52999955d4b8bd0baa2c8ba35ac92 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 14 Oct 2024 17:13:35 -0700 Subject: [PATCH 08/46] use mlflow log model by default --- llmfoundry/callbacks/hf_checkpointer.py | 110 ++++++------------------ 1 file changed, 26 insertions(+), 84 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index f5453dabba..eb647f64fd 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -107,34 +107,6 @@ def _maybe_get_license_filename( return None -def _register_model_with_run_id_multiprocess( - mlflow_logger: MLFlowLogger, - composer_logging_level: int, - model_uri: str, - name: str, - await_creation_for: int, -): - """Call MLFlowLogger.register_model. - - Used mainly to register from a child process. - """ - # Setup logging for child process. This ensures that any logs from composer are surfaced. - if composer_logging_level > 0: - # If logging_level is 0, then the composer logger was unset. - logging.basicConfig( - format= - f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', - ) - logging.getLogger('composer').setLevel(composer_logging_level) - - # Register model. - mlflow_logger.register_model_with_run_id( - model_uri=model_uri, - name=name, - await_creation_for=await_creation_for, - ) - - def _log_model_multiprocess( mlflow_logger: MLFlowLogger, composer_logging_level: int, @@ -230,7 +202,7 @@ class HuggingFaceCheckpointer(Callback): def __init__( self, - save_folder: str, + save_folder: Optional[str], save_interval: Union[str, int, Time], huggingface_folder_name: str = 'ba{batch}', precision: str = 'float32', @@ -261,7 +233,6 @@ def __init__( + f'Defaulting to final_register_only=False and saving the HuggingFace checkpoint to {save_folder=}.', ) - self.use_mlflow_log_model = False # mlflow config setup if mlflow_logging_config is None: @@ -292,8 +263,6 @@ def __init__( 'input_example', default_input_example, ) - if mlflow_logging_config['use_mlflow_log_model']: - self.use_mlflow_log_model = True self.mlflow_logging_config = mlflow_logging_config if 'metadata' in self.mlflow_logging_config: @@ -719,32 +688,6 @@ def tensor_hook( ), overwrite=self.overwrite, ) - - # Log intermediate checkpoints to MLflow, but don't register the model - # TODO: is this the right place to place this code? - for i, mlflow_logger in enumerate(self.mlflow_loggers): - process = SpawnProcess( - target=_log_model_multiprocess, - kwargs={ - 'mlflow_logger': - mlflow_logger, - 'transformers_model_path': - temp_save_dir, - 'flavor': - 'peft' if self.using_peft else 'transformers', - 'composer_logging_level': - logging.getLogger('composer').level, - 'task': - self.mlflow_logging_config['metadata']['task'], - 'log_model_metadata': self.mlflow_logging_config['metadata'], - 'model_name': - self.pretrained_model_name, - 'input_example': - self.mlflow_logging_config['input_example'], - 'await_creation_for': - 3600, - }, - ) process.start() dist.barrier() @@ -820,32 +763,31 @@ def tensor_hook( # Spawn a new process to register the model. # Slower method to register the model via log_model. - if self.use_mlflow_log_model: - process = SpawnProcess( - target=_log_model_multiprocess, - kwargs={ - 'mlflow_logger': - mlflow_logger, - 'transformers_model_path': - local_save_path, - 'flavor': - 'peft' if self.using_peft else 'transformers', - 'composer_logging_level': - logging.getLogger('composer').level, - 'task': - self.mlflow_logging_config['metadata']['task'], - 'log_model_metadata': self.mlflow_logging_config['metadata'], - 'registered_model_name': - self.mlflow_registered_model_name, - 'model_name': - self.pretrained_model_name, - 'input_example': - self.mlflow_logging_config['input_example'], - 'await_creation_for': - 3600, - }, - ) - process.start() + process = SpawnProcess( + target=_log_model_multiprocess, + kwargs={ + 'mlflow_logger': + mlflow_logger, + 'transformers_model_path': + local_save_path, + 'flavor': + 'peft' if self.using_peft else 'transformers', + 'composer_logging_level': + logging.getLogger('composer').level, + 'task': + self.mlflow_logging_config['metadata']['task'], + 'log_model_metadata': self.mlflow_logging_config['metadata'], + 'registered_model_name': + self.mlflow_registered_model_name, + 'model_name': + self.pretrained_model_name, + 'input_example': + self.mlflow_logging_config['input_example'], + 'await_creation_for': + 3600, + }, + ) + process.start() # Restore the monitor process. if monitor_process is not None: From 191504278a9008ee4e21afcc7763a11fd4a88ee4 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 14 Oct 2024 17:44:38 -0700 Subject: [PATCH 09/46] patch push --- llmfoundry/callbacks/hf_checkpointer.py | 33 ++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index eb647f64fd..a9edf4840b 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -142,6 +142,29 @@ def _log_model_multiprocess( f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) logging.getLogger('composer').setLevel(composer_logging_level) + + # monkey patch to prevent duplicate tokenizer upload + import mlflow + original_save_model = mlflow.transformers.save_model + def save_model_patch(*args: Any, **kwargs: Any): + original_save_model(*args, **kwargs) + log.debug(f"List of root path: {os.listdir(kwargs['path'])}") + components_path = os.path.join(kwargs['path'], 'components') + if os.path.exists(components_path): + log.debug(f"List of components path: {components_path}: {os.listdir(components_path)}") + tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') + tokenizer_files = [] + if os.path.exists(tokenizer_path): + tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) + log.debug(f"Tokenizer files: {tokenizer_files}") + try: + for tokenizer_file_name in tokenizer_files: + dupe_file = os.path.isfile(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) + if dupe_file: + os.remove(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) + except Exception as e: + log.error(f"Exception when removing duplicate tokenizer files in the model directory", e) + mlflow.transformers.save_model = save_model_patch if registered_model_name is not None: mlflow_logger.log_model( @@ -732,14 +755,6 @@ def tensor_hook( True, ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( ) - with context_manager: - # Add the pip requirements directly to avoid mlflow - # attempting to run inference on the model - model_saving_kwargs['pip_requirements'] = [ - 'transformers', - 'torch', - ] - mlflow_logger.save_model(**model_saving_kwargs) # Upload the license file generated by mlflow during the model saving. license_filename = _maybe_get_license_filename( @@ -769,7 +784,7 @@ def tensor_hook( 'mlflow_logger': mlflow_logger, 'transformers_model_path': - local_save_path, + temp_save_dir, 'flavor': 'peft' if self.using_peft else 'transformers', 'composer_logging_level': From 99589c799449aa31accb0604ffe08fc6e710a106 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 22 Oct 2024 16:39:23 -0700 Subject: [PATCH 10/46] add log statements --- llmfoundry/callbacks/hf_checkpointer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 161178a872..8380f7917c 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -133,7 +133,6 @@ def _log_model_multiprocess( - await_creation_for: int: time to wait for model creation - registered_model_name: Optional """ - print("----------------- REACHED MLFLOW LOG MODEL -----------------") # Setup logging for child process. This ensures that any logs from composer are surfaced. if composer_logging_level > 0: # If logging_level is 0, then the composer logger was unset. @@ -143,24 +142,26 @@ def _log_model_multiprocess( ) logging.getLogger('composer').setLevel(composer_logging_level) + log.info("----------------- REACHED MLFLOW LOG MODEL -----------------") # monkey patch to prevent duplicate tokenizer upload import mlflow original_save_model = mlflow.transformers.save_model def save_model_patch(*args: Any, **kwargs: Any): original_save_model(*args, **kwargs) - log.debug(f"List of root path: {os.listdir(kwargs['path'])}") + log.info(f"List of root path: {os.listdir(kwargs['path'])}") components_path = os.path.join(kwargs['path'], 'components') if os.path.exists(components_path): - log.debug(f"List of components path: {components_path}: {os.listdir(components_path)}") + log.info(f"List of components path: {components_path}: {os.listdir(components_path)}") tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') tokenizer_files = [] if os.path.exists(tokenizer_path): tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) - log.debug(f"Tokenizer files: {tokenizer_files}") + log.info(f"Tokenizer files: {tokenizer_files}") try: for tokenizer_file_name in tokenizer_files: dupe_file = os.path.isfile(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) if dupe_file: + log.info(f"Removing duplicate tokenizer file: {tokenizer_file_name}") os.remove(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) except Exception as e: log.error(f"Exception when removing duplicate tokenizer files in the model directory", e) From 04ddfaac43aa125d7dadf98b05b869c8bdf6398e Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Wed, 23 Oct 2024 11:41:00 -0700 Subject: [PATCH 11/46] add log outside of process --- llmfoundry/callbacks/hf_checkpointer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 8380f7917c..fe0847c3b5 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -781,6 +781,7 @@ def tensor_hook( # Spawn a new process to register the model. # Slower method to register the model via log_model. + log.info('USING MY BRANCH!!!!!!!!!!!!!!') process = SpawnProcess( target=_log_model_multiprocess, kwargs={ From 8e4221796194ea5da1bc64b913f12053937563e6 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 16:31:20 -0700 Subject: [PATCH 12/46] fix --- llmfoundry/callbacks/hf_checkpointer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index fe0847c3b5..51d012483f 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -76,6 +76,11 @@ def _maybe_get_license_filename( If the license file does not exist, returns None. """ + # Early return if no local directory exists + if not os.path.exists(local_dir): + return None + + # Try to find the license file try: license_filename = next( file for file in os.listdir(local_dir) From be04e3d68ba901a405c0f4dd07decd5fc597fb65 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 16:39:55 -0700 Subject: [PATCH 13/46] bug --- llmfoundry/callbacks/hf_checkpointer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 51d012483f..5103a13d42 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -719,7 +719,6 @@ def tensor_hook( ), overwrite=self.overwrite, ) - process.start() dist.barrier() From 5ab2cc7e63e5e447e730430e2731d93d1eb98690 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 17:13:39 -0700 Subject: [PATCH 14/46] print the registered model name --- llmfoundry/callbacks/hf_checkpointer.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 5103a13d42..c9a56f0c1f 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -114,7 +114,7 @@ def _maybe_get_license_filename( def _log_model_multiprocess( mlflow_logger: MLFlowLogger, - composer_logging_level: int, + python_logging_level: int, transformers_model_path: str, flavor: str, input_example: dict[str, Any], @@ -130,7 +130,7 @@ def _log_model_multiprocess( Inputs: - mlflow_logger: MLFlowLogger: MLflow logger object - - composer_logging_level: int: logging level for composer + - python_logging_level: int: logging level - flavor: str: transformers or peft - input_example: dict[str, Any]: model serving input example for model - log_model_metadata: dict[str, str]: This metadata is currently needed for optimized serving @@ -139,13 +139,13 @@ def _log_model_multiprocess( - registered_model_name: Optional """ # Setup logging for child process. This ensures that any logs from composer are surfaced. - if composer_logging_level > 0: + if python_logging_level > 0: # If logging_level is 0, then the composer logger was unset. logging.basicConfig( format= f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) - logging.getLogger('composer').setLevel(composer_logging_level) + logging.getLogger('llmfoundry').setLevel(python_logging_level) log.info("----------------- REACHED MLFLOW LOG MODEL -----------------") # monkey patch to prevent duplicate tokenizer upload @@ -678,6 +678,7 @@ def tensor_hook( log.debug('Saving Hugging Face checkpoint to disk') + log.debug(f"UPLOAD_TO_SAVE_FOLDER: {upload_to_save_folder}") if upload_to_save_folder: # This context manager casts the TE extra state in io.BytesIO format to tensor format # Needed for proper hf ckpt saving. @@ -785,7 +786,7 @@ def tensor_hook( # Spawn a new process to register the model. # Slower method to register the model via log_model. - log.info('USING MY BRANCH!!!!!!!!!!!!!!') + log.info(f'USING MY BRANCH!!!!!!!!!!!!!! REGISTERED MODEL NAME: {self.mlflow_registered_model_name}') process = SpawnProcess( target=_log_model_multiprocess, kwargs={ @@ -795,8 +796,8 @@ def tensor_hook( temp_save_dir, 'flavor': 'peft' if self.using_peft else 'transformers', - 'composer_logging_level': - logging.getLogger('composer').level, + 'python_logging_level': + logging.getLogger('llmfoundry').level, 'task': self.mlflow_logging_config['metadata']['task'], 'log_model_metadata': self.mlflow_logging_config['metadata'], From 79356d87d68b5cacfcd6eb03b3626df6f31c53de Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 17:28:57 -0700 Subject: [PATCH 15/46] update the model registry prefix --- llmfoundry/callbacks/hf_checkpointer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index c9a56f0c1f..db38900ede 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -180,7 +180,7 @@ def save_model_patch(*args: Any, **kwargs: Any): input_example=input_example, metadata=log_model_metadata, task=task, - registered_model_name=registered_model_name, + registered_model_name=mlflow_logger.model_registry_prefix + registered_model_name, await_creation_for=await_creation_for ) else: @@ -803,8 +803,8 @@ def tensor_hook( 'log_model_metadata': self.mlflow_logging_config['metadata'], 'registered_model_name': self.mlflow_registered_model_name, - 'model_name': - self.pretrained_model_name, + # 'model_name': + # self.pretrained_model_name, 'input_example': self.mlflow_logging_config['input_example'], 'await_creation_for': From 4327257745fdb73541869769541567d4b571726c Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 17:50:45 -0700 Subject: [PATCH 16/46] move the download code out of the if statement --- llmfoundry/callbacks/hf_checkpointer.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index db38900ede..62ed8e430d 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -400,6 +400,7 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: if self._any_register_processes_error( state.device, ) and self.final_register_only: + time.sleep(60) # give me some debugging time log.error( 'An error occurred in one or more registration processes. Fallback to saving the HuggingFace checkpoint.', ) @@ -679,22 +680,16 @@ def tensor_hook( log.debug('Saving Hugging Face checkpoint to disk') log.debug(f"UPLOAD_TO_SAVE_FOLDER: {upload_to_save_folder}") + # This context manager casts the TE extra state in io.BytesIO format to tensor format + # Needed for proper hf ckpt saving. + context_manager = te.onnx_export( + True, + ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( + ) + with context_manager: + new_model_instance.save_pretrained(temp_save_dir) + original_tokenizer.save_pretrained(temp_save_dir) if upload_to_save_folder: - # This context manager casts the TE extra state in io.BytesIO format to tensor format - # Needed for proper hf ckpt saving. - context_manager = te.onnx_export( - True, - ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( - ) - with context_manager: - new_model_instance.save_pretrained(temp_save_dir) - if original_tokenizer is not None: - assert isinstance( - original_tokenizer, - PreTrainedTokenizerBase, - ) - original_tokenizer.save_pretrained(temp_save_dir) - # Only need to edit files for MPT because it has custom code if new_model_instance.config.model_type == 'mpt': log.debug('Editing MPT files for HuggingFace compatibility') From 6c5fb05968e43fcffff70c0963adace9c3bb8e88 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 18:00:23 -0700 Subject: [PATCH 17/46] try registering just the model name --- llmfoundry/callbacks/hf_checkpointer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 62ed8e430d..76d2309cac 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -180,7 +180,7 @@ def save_model_patch(*args: Any, **kwargs: Any): input_example=input_example, metadata=log_model_metadata, task=task, - registered_model_name=mlflow_logger.model_registry_prefix + registered_model_name, + registered_model_name=registered_model_name, # not the full path? mlflow_logger.model_registry_prefix await_creation_for=await_creation_for ) else: From bb0dd6a1792e5cb549716d5c62c45a6eacde36d9 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 18:13:33 -0700 Subject: [PATCH 18/46] connect the existing mlflow run id --- llmfoundry/callbacks/hf_checkpointer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 76d2309cac..14b3819a2e 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -150,6 +150,9 @@ def _log_model_multiprocess( log.info("----------------- REACHED MLFLOW LOG MODEL -----------------") # monkey patch to prevent duplicate tokenizer upload import mlflow + mlflow.start_run( + run_id=mlflow_logger._run_id, + ) original_save_model = mlflow.transformers.save_model def save_model_patch(*args: Any, **kwargs: Any): original_save_model(*args, **kwargs) From c5ae4ffd9265dd57a2bc50ed716e14e2a7db64bf Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 25 Oct 2024 18:33:54 -0700 Subject: [PATCH 19/46] omg it works --- llmfoundry/callbacks/hf_checkpointer.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 14b3819a2e..8efff363dd 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -147,7 +147,6 @@ def _log_model_multiprocess( ) logging.getLogger('llmfoundry').setLevel(python_logging_level) - log.info("----------------- REACHED MLFLOW LOG MODEL -----------------") # monkey patch to prevent duplicate tokenizer upload import mlflow mlflow.start_run( @@ -156,20 +155,13 @@ def _log_model_multiprocess( original_save_model = mlflow.transformers.save_model def save_model_patch(*args: Any, **kwargs: Any): original_save_model(*args, **kwargs) - log.info(f"List of root path: {os.listdir(kwargs['path'])}") - components_path = os.path.join(kwargs['path'], 'components') - if os.path.exists(components_path): - log.info(f"List of components path: {components_path}: {os.listdir(components_path)}") - tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') tokenizer_files = [] - if os.path.exists(tokenizer_path): - tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) - log.info(f"Tokenizer files: {tokenizer_files}") + # Check if there are duplicate tokenizer files in the model directory and remove them. try: for tokenizer_file_name in tokenizer_files: dupe_file = os.path.isfile(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) if dupe_file: - log.info(f"Removing duplicate tokenizer file: {tokenizer_file_name}") + log.debug(f"Removing duplicate tokenizer file: {tokenizer_file_name}") os.remove(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) except Exception as e: log.error(f"Exception when removing duplicate tokenizer files in the model directory", e) @@ -403,7 +395,6 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: if self._any_register_processes_error( state.device, ) and self.final_register_only: - time.sleep(60) # give me some debugging time log.error( 'An error occurred in one or more registration processes. Fallback to saving the HuggingFace checkpoint.', ) @@ -682,7 +673,6 @@ def tensor_hook( log.debug('Saving Hugging Face checkpoint to disk') - log.debug(f"UPLOAD_TO_SAVE_FOLDER: {upload_to_save_folder}") # This context manager casts the TE extra state in io.BytesIO format to tensor format # Needed for proper hf ckpt saving. context_manager = te.onnx_export( @@ -784,7 +774,6 @@ def tensor_hook( # Spawn a new process to register the model. # Slower method to register the model via log_model. - log.info(f'USING MY BRANCH!!!!!!!!!!!!!! REGISTERED MODEL NAME: {self.mlflow_registered_model_name}') process = SpawnProcess( target=_log_model_multiprocess, kwargs={ @@ -801,8 +790,6 @@ def tensor_hook( 'log_model_metadata': self.mlflow_logging_config['metadata'], 'registered_model_name': self.mlflow_registered_model_name, - # 'model_name': - # self.pretrained_model_name, 'input_example': self.mlflow_logging_config['input_example'], 'await_creation_for': From 4c86e638b9f338b299c9709cfdc29f52c966f77d Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Mon, 28 Oct 2024 21:41:01 -0700 Subject: [PATCH 20/46] pr feedback --- llmfoundry/callbacks/hf_checkpointer.py | 132 ++++++++++-------------- 1 file changed, 55 insertions(+), 77 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 8efff363dd..15b6abd59e 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -113,14 +113,12 @@ def _maybe_get_license_filename( def _log_model_multiprocess( + await_creation_for: int, + flavor: str, mlflow_logger: MLFlowLogger, + mlflow_logging_config: dict[str, Any], python_logging_level: int, transformers_model_path: str, - flavor: str, - input_example: dict[str, Any], - log_model_metadata: dict[str, str], - task: str, - await_creation_for: int, registered_model_name: Optional[str] = None, ): """ @@ -128,15 +126,14 @@ def _log_model_multiprocess( Used mainly to log from a child process. - Inputs: - - mlflow_logger: MLFlowLogger: MLflow logger object - - python_logging_level: int: logging level - - flavor: str: transformers or peft - - input_example: dict[str, Any]: model serving input example for model - - log_model_metadata: dict[str, str]: This metadata is currently needed for optimized serving - - task: str: LLM task for model deployment (i.e. chat or completions) - - await_creation_for: int: time to wait for model creation - - registered_model_name: Optional + Args: + await_creation_for: int: time to wait for model creation + flavor: str: transformers or peft + mlflow_logger: MLFlowLogger: MLflow logger object + mlflow_logging_config: dict: mlflow logging config + python_logging_level: int: logging level + transformers_model_path: str: path to the transformers model + registered_model_name: Optional name to register the model under in the MLflow model registry """ # Setup logging for child process. This ensures that any logs from composer are surfaced. if python_logging_level > 0: @@ -146,6 +143,7 @@ def _log_model_multiprocess( f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', ) logging.getLogger('llmfoundry').setLevel(python_logging_level) + logging.getLogger('composer').setLevel(python_logging_level) # monkey patch to prevent duplicate tokenizer upload import mlflow @@ -167,27 +165,16 @@ def save_model_patch(*args: Any, **kwargs: Any): log.error(f"Exception when removing duplicate tokenizer files in the model directory", e) mlflow.transformers.save_model = save_model_patch - if registered_model_name is not None: - mlflow_logger.log_model( - transformers_model=transformers_model_path, - flavor=flavor, - artifact_path='model', # TODO: where should we define this parent dir name? - input_example=input_example, - metadata=log_model_metadata, - task=task, - registered_model_name=registered_model_name, # not the full path? mlflow_logger.model_registry_prefix - await_creation_for=await_creation_for - ) - else: - mlflow_logger.log_model( - transformers_model=transformers_model_path, - flavor=flavor, - artifact_path='model', # TODO: where should we define this parent dir name? - input_example=input_example, - metadata=log_model_metadata, - task=task, - await_creation_for=await_creation_for - ) + mlflow_logger.log_model( + transformers_model=transformers_model_path, + flavor='transformers', + artifact_path='last_model_checkpoint', + input_example=mlflow_logging_config['input_example'], + metadata=mlflow_logging_config['metadata'], + task=mlflow_logging_config['metadata']['task'], + registered_model_name=registered_model_name, + await_creation_for=await_creation_for + ) class HuggingFaceCheckpointer(Callback): @@ -226,7 +213,7 @@ class HuggingFaceCheckpointer(Callback): def __init__( self, - save_folder: Optional[str], + save_folder: str, save_interval: Union[str, int, Time], huggingface_folder_name: str = 'ba{batch}', precision: str = 'float32', @@ -682,32 +669,31 @@ def tensor_hook( with context_manager: new_model_instance.save_pretrained(temp_save_dir) original_tokenizer.save_pretrained(temp_save_dir) - if upload_to_save_folder: - # Only need to edit files for MPT because it has custom code - if new_model_instance.config.model_type == 'mpt': - log.debug('Editing MPT files for HuggingFace compatibility') - edit_files_for_hf_compatibility( - temp_save_dir, - self.flatten_imports, - ) + # Only need to edit files for MPT because it has custom code + if new_model_instance.config.model_type == 'mpt': + log.debug('Editing MPT files for HuggingFace compatibility') + edit_files_for_hf_compatibility( + temp_save_dir, + self.flatten_imports, + ) - if self.remote_ud is not None: - for filename in os.listdir(temp_save_dir): - remote_file_name = os.path.join(save_dir, filename) - remote_file_uri = self.remote_ud.remote_backend.get_uri( - remote_file_name, - ) - log.info( - f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}', - ) - self.remote_ud.upload_file( - state=state, - remote_file_name=remote_file_name, - file_path=Path( - os.path.join(temp_save_dir, filename), - ), - overwrite=self.overwrite, - ) + if upload_to_save_folder and self.remote_ud is not None: + for filename in os.listdir(temp_save_dir): + remote_file_name = os.path.join(save_dir, filename) + remote_file_uri = self.remote_ud.remote_backend.get_uri( + remote_file_name, + ) + log.info( + f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}', + ) + self.remote_ud.upload_file( + state=state, + remote_file_name=remote_file_name, + file_path=Path( + os.path.join(temp_save_dir, filename), + ), + overwrite=self.overwrite, + ) dist.barrier() @@ -747,11 +733,6 @@ def tensor_hook( model_saving_kwargs['transformers_model'] = components model_saving_kwargs.update(self.mlflow_logging_config) - context_manager = te.onnx_export( - True, - ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( - ) - # Upload the license file generated by mlflow during the model saving. license_filename = _maybe_get_license_filename( local_save_path, @@ -777,23 +758,20 @@ def tensor_hook( process = SpawnProcess( target=_log_model_multiprocess, kwargs={ - 'mlflow_logger': - mlflow_logger, - 'transformers_model_path': - temp_save_dir, + 'await_creation_for': + 3600, 'flavor': 'peft' if self.using_peft else 'transformers', + 'mlflow_logger': + mlflow_logger, + 'mlflow_logging_config': + self.mlflow_logging_config, 'python_logging_level': logging.getLogger('llmfoundry').level, - 'task': - self.mlflow_logging_config['metadata']['task'], - 'log_model_metadata': self.mlflow_logging_config['metadata'], 'registered_model_name': self.mlflow_registered_model_name, - 'input_example': - self.mlflow_logging_config['input_example'], - 'await_creation_for': - 3600, + 'transformers_model_path': + temp_save_dir, }, ) process.start() From b1477bcca959e6a381dd8fddf7dab334fd901c79 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 14:01:57 -0700 Subject: [PATCH 21/46] add test helper --- tests/a_scripts/inference/test_convert_composer_to_hf.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index c25432dc48..f70b2b4a7a 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -1665,6 +1665,10 @@ def __init__(self, config: PretrainedConfig): ) else: self.generation_config = config.generation_config + + def save_pretrained(self, output_path: str): + with open(os.path.join(output_path, 'generation_config.json'), 'w') as f: + json.dump(self.config.to_dict(), f) config = AutoConfig.from_pretrained('gpt2') # Convert dict to GenerationConfig if needed From e376621461d5b4fe74a3f99c0ff69e147b2e1250 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 14:11:38 -0700 Subject: [PATCH 22/46] fix tests --- llmfoundry/callbacks/hf_checkpointer.py | 4 ---- .../inference/test_convert_composer_to_hf.py | 22 +++++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 15b6abd59e..e37dc2cb7c 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -114,7 +114,6 @@ def _maybe_get_license_filename( def _log_model_multiprocess( await_creation_for: int, - flavor: str, mlflow_logger: MLFlowLogger, mlflow_logging_config: dict[str, Any], python_logging_level: int, @@ -128,7 +127,6 @@ def _log_model_multiprocess( Args: await_creation_for: int: time to wait for model creation - flavor: str: transformers or peft mlflow_logger: MLFlowLogger: MLflow logger object mlflow_logging_config: dict: mlflow logging config python_logging_level: int: logging level @@ -760,8 +758,6 @@ def tensor_hook( kwargs={ 'await_creation_for': 3600, - 'flavor': - 'peft' if self.using_peft else 'transformers', 'mlflow_logger': mlflow_logger, 'mlflow_logging_config': diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index f70b2b4a7a..6757dde2ae 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -343,7 +343,7 @@ def _create_mlflow_logger_mock() -> MagicMock: mlflow_logger_mock = MagicMock(spec=MLFlowLogger) mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) - mlflow_logger_mock.register_model_with_run_id = MagicMock() + mlflow_logger_mock._log_model_multiprocess = MagicMock() mlflow_logger_mock.model_registry_prefix = '' mlflow_logger_mock._experiment_id = 'mlflow-experiment-id' mlflow_logger_mock._run_id = 'mlflow-run-id' @@ -373,6 +373,10 @@ def _create_optimizer(original_model: torch.nn.Module) -> torch.optim.Optimizer: 'llmfoundry.callbacks.hf_checkpointer.SpawnProcess', new=MockSpawnProcess, ) +@patch( + 'llmfoundry.callbacks.hf_checkpointer.mlflow.start_run', + new=MockSpawnProcess, +) def test_final_register_only( mlflow_registry_error: bool, mlflow_registered_model_name: Optional[str], @@ -432,10 +436,10 @@ def test_final_register_only( if mlflow_registered_model_name is not None: # We should always attempt to register the model once - assert mlflow_logger_mock.register_model_with_run_id.call_count == 1 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 if mlflow_registry_error: # If the registry fails, we should still save the model - assert mlflow_logger_mock.register_model_with_run_id.call_count == 1 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 assert checkpointer_callback._save_checkpoint.call_count == 2 assert checkpointer_callback._save_checkpoint.call_args_list[ 0].kwargs == { @@ -457,7 +461,7 @@ def test_final_register_only( } else: # No mlflow_registered_model_name, so we should only save the checkpoint - assert mlflow_logger_mock.register_model_with_run_id.call_count == 0 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 0 assert checkpointer_callback._save_checkpoint.call_count == 1 assert checkpointer_callback._save_checkpoint.call_args_list[ 0].kwargs == { @@ -545,12 +549,12 @@ def test_huggingface_conversion_callback_interval( ) assert checkpointer_callback.transform_model_pre_registration.call_count == 1 assert checkpointer_callback.pre_register_edit.call_count == 1 - assert mlflow_logger_mock.register_model_with_run_id.call_count == 1 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 else: assert checkpointer_callback.transform_model_pre_registration.call_count == 0 assert checkpointer_callback.pre_register_edit.call_count == 0 assert mlflow_logger_mock.save_model.call_count == 0 - assert mlflow_logger_mock.register_model_with_run_id.call_count == 0 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 0 normal_checkpoints = [ name for name in os.listdir(os.path.join(tmp_path, 'checkpoints')) @@ -724,10 +728,10 @@ def _assert_mlflow_logger_calls( 'pip_requirements': ANY, } mlflow_logger_mock.save_model.assert_called_with(**expectation) - assert mlflow_logger_mock.register_model_with_run_id.call_count == 1 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 else: assert mlflow_logger_mock.log_model.call_count == 0 - assert mlflow_logger_mock.register_model_with_run_id.call_count == 0 + assert mlflow_logger_mock._log_model_multiprocess.call_count == 0 def _get_fsdp_config(fsdp_state_dict_type: Optional[str]): @@ -1039,7 +1043,7 @@ def test_huggingface_conversion_callback( mlflow_logger_mock = MagicMock(spec=MLFlowLogger) mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) - mlflow_logger_mock.register_model_with_run_id = MagicMock() + mlflow_logger_mock._log_model_multiprocess = MagicMock() mlflow_logger_mock.model_registry_prefix = '' mlflow_logger_mock._experiment_id = 'mlflow-experiment-id' mlflow_logger_mock._run_id = 'mlflow-run-id' From e2a9d86c7fc0797dfa5c69b2de657c8822c2a3e3 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 14:31:44 -0700 Subject: [PATCH 23/46] mocking mlflow start run --- llmfoundry/callbacks/hf_checkpointer.py | 106 ++++++++++++++---- .../inference/test_convert_composer_to_hf.py | 6 +- 2 files changed, 92 insertions(+), 20 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index e37dc2cb7c..7c0413eca7 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -120,8 +120,7 @@ def _log_model_multiprocess( transformers_model_path: str, registered_model_name: Optional[str] = None, ): - """ - Call MLFlowLogger.log_model. + """Call MLFlowLogger.log_model. Used mainly to log from a child process. @@ -175,6 +174,42 @@ def save_model_patch(*args: Any, **kwargs: Any): ) +def _register_model_with_run_id_multiprocess( + mlflow_logger: MLFlowLogger, + composer_logging_level: int, + model_uri: str, + name: str, + await_creation_for: int, +): + """Call MLFlowLogger.register_model_with_run_id. + + Used mainly to register from a child process. + + Args: + mlflow_logger: MLFlowLogger: MLflow logger object + composer_logging_level: int: logging level + model_uri: str: path to the model + name: str: name to register the model under in the MLflow model registry + await_creation_for: int: time to wait for model creation + """ + # Setup logging for child process. This ensures that any logs from composer are surfaced. + if composer_logging_level > 0: + # If logging_level is 0, then the composer logger was unset. + logging.basicConfig( + format= + f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', + force=True, + ) + logging.getLogger('composer').setLevel(composer_logging_level) + + # Register model. + mlflow_logger.register_model_with_run_id( + model_uri=model_uri, + name=name, + await_creation_for=await_creation_for, + ) + + class HuggingFaceCheckpointer(Callback): """Save a huggingface formatted checkpoint during training. @@ -726,6 +761,19 @@ def tensor_hook( ] = temp_save_dir model_saving_kwargs[ 'metadata'] = self.mlflow_logging_config['metadata'] + # If PEFT, still use original save_model codepath + context_manager = te.onnx_export( + True, + ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( + ) + with context_manager: + # Add the pip requirements directly to avoid mlflow + # attempting to run inference on the model + model_saving_kwargs['pip_requirements'] = [ + 'transformers', + 'torch', + ] + mlflow_logger.save_model(**model_saving_kwargs) else: model_saving_kwargs['flavor'] = 'transformers' model_saving_kwargs['transformers_model'] = components @@ -753,23 +801,43 @@ def tensor_hook( # Spawn a new process to register the model. # Slower method to register the model via log_model. - process = SpawnProcess( - target=_log_model_multiprocess, - kwargs={ - 'await_creation_for': - 3600, - 'mlflow_logger': - mlflow_logger, - 'mlflow_logging_config': - self.mlflow_logging_config, - 'python_logging_level': - logging.getLogger('llmfoundry').level, - 'registered_model_name': - self.mlflow_registered_model_name, - 'transformers_model_path': - temp_save_dir, - }, - ) + if self.using_peft: + # If PEFT, use original register_model codepath until Composer + # supports logging PEFT models to MLFlow + process = SpawnProcess( + target=_register_model_with_run_id_multiprocess, + kwargs={ + 'mlflow_logger': + mlflow_logger, + 'composer_logging_level': + logging.getLogger('composer').level, + 'model_uri': + local_save_path, + 'name': + self.mlflow_registered_model_name, + 'await_creation_for': + 3600, + }, + ) + else: + # Log a transformers model + process = SpawnProcess( + target=_log_model_multiprocess, + kwargs={ + 'await_creation_for': + 3600, + 'mlflow_logger': + mlflow_logger, + 'mlflow_logging_config': + self.mlflow_logging_config, + 'python_logging_level': + logging.getLogger('llmfoundry').level, + 'registered_model_name': + self.mlflow_registered_model_name, + 'transformers_model_path': + temp_save_dir, + }, + ) process.start() # Restore the monitor process. diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 6757dde2ae..b5b17f0df0 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -374,7 +374,7 @@ def _create_optimizer(original_model: torch.nn.Module) -> torch.optim.Optimizer: new=MockSpawnProcess, ) @patch( - 'llmfoundry.callbacks.hf_checkpointer.mlflow.start_run', + 'mlflow.start_run', new=MockSpawnProcess, ) def test_final_register_only( @@ -481,6 +481,10 @@ def test_final_register_only( 'llmfoundry.callbacks.hf_checkpointer.SpawnProcess', new=MockSpawnProcess, ) +@patch( + 'mlflow.start_run', + new=MockSpawnProcess, +) def test_huggingface_conversion_callback_interval( tmp_path: pathlib.Path, log_to_mlflow: bool, From 5784c26327fb74ccec065b934d5318bcb9f237bf Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 16:04:18 -0700 Subject: [PATCH 24/46] fix --- llmfoundry/callbacks/hf_checkpointer.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 7c0413eca7..9a990aac73 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -76,9 +76,9 @@ def _maybe_get_license_filename( If the license file does not exist, returns None. """ - # Early return if no local directory exists + # Error if no local directory exists if not os.path.exists(local_dir): - return None + raise FileNotFoundError(f'Local directory {local_dir} does not exist') # Try to find the license file try: @@ -151,6 +151,9 @@ def _log_model_multiprocess( def save_model_patch(*args: Any, **kwargs: Any): original_save_model(*args, **kwargs) tokenizer_files = [] + tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') + if os.path.exists(tokenizer_path): + tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) # Check if there are duplicate tokenizer files in the model directory and remove them. try: for tokenizer_file_name in tokenizer_files: From c939752707b980fbee42812be98e19758acfd7d0 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 17:41:21 -0700 Subject: [PATCH 25/46] pr --- tests/a_scripts/inference/test_convert_composer_to_hf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index b5b17f0df0..677a1fc8b1 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -1675,6 +1675,7 @@ def __init__(self, config: PretrainedConfig): self.generation_config = config.generation_config def save_pretrained(self, output_path: str): + os.makedirs(output_path, exist_ok=True) with open(os.path.join(output_path, 'generation_config.json'), 'w') as f: json.dump(self.config.to_dict(), f) From 9e59a218878d9f08d9ad01426804ad66e4324df9 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 18:24:01 -0700 Subject: [PATCH 26/46] json format --- tests/a_scripts/inference/test_convert_composer_to_hf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 677a1fc8b1..45a999ae27 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -1677,7 +1677,7 @@ def __init__(self, config: PretrainedConfig): def save_pretrained(self, output_path: str): os.makedirs(output_path, exist_ok=True) with open(os.path.join(output_path, 'generation_config.json'), 'w') as f: - json.dump(self.config.to_dict(), f) + json.dump(self.config.to_json_string(), f) config = AutoConfig.from_pretrained('gpt2') # Convert dict to GenerationConfig if needed From 5e13b83b97ad7a5c4658f766fdf59a532148eeec Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 22:02:00 -0700 Subject: [PATCH 27/46] patches --- llmfoundry/callbacks/hf_checkpointer.py | 4 +- .../inference/test_convert_composer_to_hf.py | 47 ++++++++++++++----- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 9a990aac73..182c553c3d 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -165,6 +165,8 @@ def save_model_patch(*args: Any, **kwargs: Any): log.error(f"Exception when removing duplicate tokenizer files in the model directory", e) mlflow.transformers.save_model = save_model_patch + model_registry_name = f'{mlflow_logger.model_registry_prefix}.{registered_model_name}' \ + if registered_model_name is not None else None mlflow_logger.log_model( transformers_model=transformers_model_path, flavor='transformers', @@ -172,7 +174,7 @@ def save_model_patch(*args: Any, **kwargs: Any): input_example=mlflow_logging_config['input_example'], metadata=mlflow_logging_config['metadata'], task=mlflow_logging_config['metadata']['task'], - registered_model_name=registered_model_name, + registered_model_name=model_registry_name, await_creation_for=await_creation_for ) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 45a999ae27..877b69b221 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -341,9 +341,10 @@ def is_alive(self) -> bool: def _create_mlflow_logger_mock() -> MagicMock: mlflow_logger_mock = MagicMock(spec=MLFlowLogger) + mlflow_logger_mock.log_model = MagicMock() mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) - mlflow_logger_mock._log_model_multiprocess = MagicMock() + mlflow_logger_mock._mlflow_client = MagicMock() mlflow_logger_mock.model_registry_prefix = '' mlflow_logger_mock._experiment_id = 'mlflow-experiment-id' mlflow_logger_mock._run_id = 'mlflow-run-id' @@ -409,6 +410,8 @@ def test_final_register_only( mlflow_logger_mock = _create_mlflow_logger_mock() + original_model.save_pretrained = MagicMock() + checkpointer_callback._save_checkpoint = MagicMock( wraps=checkpointer_callback._save_checkpoint, ) @@ -436,10 +439,10 @@ def test_final_register_only( if mlflow_registered_model_name is not None: # We should always attempt to register the model once - assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 + assert mlflow_logger_mock.log_model.call_count == 1 if mlflow_registry_error: # If the registry fails, we should still save the model - assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 + assert mlflow_logger_mock.log_model.call_count == 1 assert checkpointer_callback._save_checkpoint.call_count == 2 assert checkpointer_callback._save_checkpoint.call_args_list[ 0].kwargs == { @@ -461,7 +464,7 @@ def test_final_register_only( } else: # No mlflow_registered_model_name, so we should only save the checkpoint - assert mlflow_logger_mock._log_model_multiprocess.call_count == 0 + assert mlflow_logger_mock.log_model.call_count == 0 assert checkpointer_callback._save_checkpoint.call_count == 1 assert checkpointer_callback._save_checkpoint.call_args_list[ 0].kwargs == { @@ -483,7 +486,23 @@ def test_final_register_only( ) @patch( 'mlflow.start_run', - new=MockSpawnProcess, + new=MagicMock(), +) +@patch( + 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', + new=MagicMock(), +) +@patch( + 'composer.callbacks.checkpoint_saver.CheckpointSaver._save_checkpoint', + new=MagicMock() +) +@patch( + 'llmfoundry.callbacks.hf_checkpointer._log_model_multiprocess', + new=MagicMock(), +) +@patch( + 'mlflow.transformers.save_model', + new=MagicMock(), ) def test_huggingface_conversion_callback_interval( tmp_path: pathlib.Path, @@ -520,6 +539,9 @@ def test_huggingface_conversion_callback_interval( optimizer = _create_optimizer(original_model) mlflow_logger_mock = _create_mlflow_logger_mock() + + mpt_tokenizer.save_pretrained = MagicMock() + checkpointer_callback.transform_model_pre_registration = MagicMock( wraps=checkpointer_callback.transform_model_pre_registration, ) @@ -553,12 +575,12 @@ def test_huggingface_conversion_callback_interval( ) assert checkpointer_callback.transform_model_pre_registration.call_count == 1 assert checkpointer_callback.pre_register_edit.call_count == 1 - assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 + assert mlflow_logger_mock.log_model.call_count == 1 else: assert checkpointer_callback.transform_model_pre_registration.call_count == 0 assert checkpointer_callback.pre_register_edit.call_count == 0 assert mlflow_logger_mock.save_model.call_count == 0 - assert mlflow_logger_mock._log_model_multiprocess.call_count == 0 + assert mlflow_logger_mock.log_model.call_count == 0 normal_checkpoints = [ name for name in os.listdir(os.path.join(tmp_path, 'checkpoints')) @@ -732,10 +754,10 @@ def _assert_mlflow_logger_calls( 'pip_requirements': ANY, } mlflow_logger_mock.save_model.assert_called_with(**expectation) - assert mlflow_logger_mock._log_model_multiprocess.call_count == 1 + assert mlflow_logger_mock.log_model.call_count == 1 else: assert mlflow_logger_mock.log_model.call_count == 0 - assert mlflow_logger_mock._log_model_multiprocess.call_count == 0 + assert mlflow_logger_mock.log_model.call_count == 0 def _get_fsdp_config(fsdp_state_dict_type: Optional[str]): @@ -1047,7 +1069,7 @@ def test_huggingface_conversion_callback( mlflow_logger_mock = MagicMock(spec=MLFlowLogger) mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) - mlflow_logger_mock._log_model_multiprocess = MagicMock() + mlflow_logger_mock.log_model = MagicMock() mlflow_logger_mock.model_registry_prefix = '' mlflow_logger_mock._experiment_id = 'mlflow-experiment-id' mlflow_logger_mock._run_id = 'mlflow-run-id' @@ -1677,7 +1699,7 @@ def __init__(self, config: PretrainedConfig): def save_pretrained(self, output_path: str): os.makedirs(output_path, exist_ok=True) with open(os.path.join(output_path, 'generation_config.json'), 'w') as f: - json.dump(self.config.to_json_string(), f) + f.write(str(self.generation_config)) config = AutoConfig.from_pretrained('gpt2') # Convert dict to GenerationConfig if needed @@ -1688,10 +1710,11 @@ def save_pretrained(self, output_path: str): mock_model = MockModel(config) logger = MagicMock() state = MagicMock() + tokenizer = MagicMock() state.timestamp.batch = 1 state.is_model_ddp = False state.model.model = mock_model - state.model.tokenizer = None + state.model.tokenizer = tokenizer checkpointer = HuggingFaceCheckpointer( save_folder='test', From 19862d29912dd67a17b5aeca1765c2a4e3060556 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Tue, 29 Oct 2024 22:43:54 -0700 Subject: [PATCH 28/46] still not fully working --- .../inference/test_convert_composer_to_hf.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 877b69b221..84c6926c36 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -548,6 +548,7 @@ def test_huggingface_conversion_callback_interval( checkpointer_callback.pre_register_edit = MagicMock( wraps=checkpointer_callback.pre_register_edit, ) + checkpointer_callback.mlflow_logging_config = MagicMock() trainer = Trainer( model=original_model, device='gpu', @@ -563,15 +564,16 @@ def test_huggingface_conversion_callback_interval( trainer.fit() if log_to_mlflow: - assert mlflow_logger_mock.save_model.call_count == 1 - mlflow_logger_mock.save_model.assert_called_with( - flavor='transformers', + assert mlflow_logger_mock.log_model.call_count == 1 + mlflow_logger_mock.log_model.assert_called_with( transformers_model=ANY, - path=ANY, - task='llm/v1/completions', + flavor='transformers', + artifact_path='last_model_checkpoint', input_example=ANY, - metadata={}, - pip_requirements=ANY, + metadata=ANY, + task=ANY, + registered_model_name=ANY, + await_creation_for=3600, ) assert checkpointer_callback.transform_model_pre_registration.call_count == 1 assert checkpointer_callback.pre_register_edit.call_count == 1 @@ -580,16 +582,10 @@ def test_huggingface_conversion_callback_interval( assert checkpointer_callback.transform_model_pre_registration.call_count == 0 assert checkpointer_callback.pre_register_edit.call_count == 0 assert mlflow_logger_mock.save_model.call_count == 0 - assert mlflow_logger_mock.log_model.call_count == 0 - normal_checkpoints = [ - name for name in os.listdir(os.path.join(tmp_path, 'checkpoints')) - if name != 'huggingface' - ] huggingface_checkpoints = list( os.listdir(os.path.join(tmp_path, 'checkpoints', 'huggingface')), ) - assert len(normal_checkpoints) == expected_normal_checkpoints assert len(huggingface_checkpoints) == expected_hf_checkpoints # Load the last huggingface checkpoint From 1eefb8423db6ce6f66d393b26b8f7490c38bb217 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Wed, 30 Oct 2024 21:09:12 -0700 Subject: [PATCH 29/46] fixed the final_register_only test case. now need to pass the others --- .../inference/test_convert_composer_to_hf.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 84c6926c36..93734c40c0 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -374,9 +374,13 @@ def _create_optimizer(original_model: torch.nn.Module) -> torch.optim.Optimizer: 'llmfoundry.callbacks.hf_checkpointer.SpawnProcess', new=MockSpawnProcess, ) +@patch( + 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', + new=MagicMock(), +) @patch( 'mlflow.start_run', - new=MockSpawnProcess, + new=MagicMock(), ) def test_final_register_only( mlflow_registry_error: bool, @@ -415,6 +419,7 @@ def test_final_register_only( checkpointer_callback._save_checkpoint = MagicMock( wraps=checkpointer_callback._save_checkpoint, ) + checkpointer_callback.mlflow_logging_config = MagicMock() trainer = Trainer( model=original_model, device='gpu', @@ -582,12 +587,19 @@ def test_huggingface_conversion_callback_interval( assert checkpointer_callback.transform_model_pre_registration.call_count == 0 assert checkpointer_callback.pre_register_edit.call_count == 0 assert mlflow_logger_mock.save_model.call_count == 0 + + normal_checkpoints = [ + name for name in os.listdir(os.path.join(tmp_path, 'checkpoints')) + if name != 'huggingface' + ] huggingface_checkpoints = list( os.listdir(os.path.join(tmp_path, 'checkpoints', 'huggingface')), ) + assert len(normal_checkpoints) == expected_normal_checkpoints assert len(huggingface_checkpoints) == expected_hf_checkpoints + # Load the last huggingface checkpoint loaded_model = transformers.AutoModelForCausalLM.from_pretrained( os.path.join( From 9282fe013ef6aacac53f3d35d50980c3943edbb1 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 22:07:25 -0700 Subject: [PATCH 30/46] overloading the config mapper still not working --- tests/a_scripts/inference/test_convert_composer_to_hf.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 93734c40c0..a9e9ccd503 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -601,6 +601,11 @@ def test_huggingface_conversion_callback_interval( # Load the last huggingface checkpoint + from transformers.models.auto.configuration_auto import CONFIG_MAPPING + CONFIG_MAPPING._extra_content['mpt'] = MPTConfig + MPTConfig.register_for_auto_class() + MPTForCausalLM.register_for_auto_class('AutoModelForCausalLM') + loaded_model = transformers.AutoModelForCausalLM.from_pretrained( os.path.join( tmp_path, From 04b520ba85b6abb52b3a9d2593390b0d02d86169 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 22:31:24 -0700 Subject: [PATCH 31/46] using irenes changes --- llmfoundry/callbacks/hf_checkpointer.py | 469 +++++++++++++----------- 1 file changed, 263 insertions(+), 206 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index a3b59dba1b..bc26a712ce 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -15,6 +15,7 @@ from pathlib import Path from typing import Any, Optional, Sequence, Union +import mlflow import numpy as np import torch import torch.nn as nn @@ -44,7 +45,6 @@ from llmfoundry.models.mpt import MPTConfig, MPTForCausalLM from llmfoundry.models.utils import init_empty_weights -from llmfoundry.utils.exceptions import StoragePermissionError from llmfoundry.utils.huggingface_hub_utils import \ edit_files_for_hf_compatibility @@ -60,6 +60,26 @@ _LICENSE_FILE_PATTERN = re.compile(r'license(\.[a-z]+|$)', re.IGNORECASE) +from contextlib import contextmanager + + +@contextmanager +def _monitor_process_saver(mlflow_logger: MLFlowLogger): + # Save the current monitor process + if hasattr(mlflow_logger, 'monitor_process'): + original_monitor_process = mlflow_logger.monitor_process # type: ignore + mlflow_logger.monitor_process = None # type: ignore + else: + original_monitor_process = None + + try: + # Yield control back to the calling code + yield + finally: + # Restore the monitor process + if original_monitor_process is not None: + mlflow_logger.monitor_process = original_monitor_process # type: ignore + def _maybe_get_license_filename( local_dir: str, @@ -77,11 +97,6 @@ def _maybe_get_license_filename( If the license file does not exist, returns None. """ - # Error if no local directory exists - if not os.path.exists(local_dir): - raise FileNotFoundError(f'Local directory {local_dir} does not exist') - - # Try to find the license file try: license_filename = next( file for file in os.listdir(local_dir) @@ -111,72 +126,99 @@ def _maybe_get_license_filename( except StopIteration: return None + +def _log_license_file_with_mlflow( + mlflow_logger: MLFlowLogger, + pretrained_model_name: Optional[str], + save_path: str, +): + # Get and log the license file. + license_filename = _maybe_get_license_filename( + save_path, + pretrained_model_name, + ) + if license_filename is not None: + mlflow_logger._mlflow_client.log_artifact( + mlflow_logger._run_id, + os.path.join(save_path, license_filename), + ) -def _log_model_multiprocess( - await_creation_for: int, +def _log_model_with_multi_process( mlflow_logger: MLFlowLogger, - mlflow_logging_config: dict[str, Any], python_logging_level: int, - transformers_model_path: str, - registered_model_name: Optional[str] = None, + transformers_model: str, + artifact_path: str, + task: str, + registered_model_name: str, + metadata: dict[str, Any], + await_registration_for: int, ): """Call MLFlowLogger.log_model. - - Used mainly to log from a child process. - Args: - await_creation_for: int: time to wait for model creation - mlflow_logger: MLFlowLogger: MLflow logger object - mlflow_logging_config: dict: mlflow logging config - python_logging_level: int: logging level - transformers_model_path: str: path to the transformers model - registered_model_name: Optional name to register the model under in the MLflow model registry + Used mainly to register from a child process. + First, patch the mlflow save_model function. + + We do two things: (1) Remove duplicate tokenizer files in the model + directory. (2) Log the license file. """ - # Setup logging for child process. This ensures that any logs from composer are surfaced. - if python_logging_level > 0: - # If logging_level is 0, then the composer logger was unset. - logging.basicConfig( - format= - f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', - ) - logging.getLogger('llmfoundry').setLevel(python_logging_level) - logging.getLogger('composer').setLevel(python_logging_level) - - # monkey patch to prevent duplicate tokenizer upload import mlflow - mlflow.start_run( - run_id=mlflow_logger._run_id, - ) original_save_model = mlflow.transformers.save_model + def save_model_patch(*args: Any, **kwargs: Any): original_save_model(*args, **kwargs) tokenizer_files = [] - tokenizer_path = os.path.join(kwargs['path'], 'components', 'tokenizer') + save_path = kwargs['path'] + tokenizer_path = os.path.join(save_path, 'components', 'tokenizer') if os.path.exists(tokenizer_path): - tokenizer_files = os.listdir(os.path.join(kwargs['path'], 'components', 'tokenizer')) - # Check if there are duplicate tokenizer files in the model directory and remove them. + tokenizer_files = os.listdir( + os.path.join(save_path, 'components', 'tokenizer'), + ) try: + # Check if there are duplicate tokenizer files in the model directory and remove them. for tokenizer_file_name in tokenizer_files: - dupe_file = os.path.isfile(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) + dupe_file = os.path.isfile( + os.path.join(save_path, 'model', tokenizer_file_name), + ) if dupe_file: - log.debug(f"Removing duplicate tokenizer file: {tokenizer_file_name}") - os.remove(os.path.join(kwargs['path'], 'model', tokenizer_file_name)) + log.debug( + f'Removing duplicate tokenizer file: {tokenizer_file_name}', + ) + os.remove( + os.path.join(save_path, 'model', tokenizer_file_name), + ) except Exception as e: - log.error(f"Exception when removing duplicate tokenizer files in the model directory", e) + log.error( + f'Exception when removing duplicate tokenizer files in the model directory', + e, + ) + mlflow.transformers.save_model = save_model_patch + + mlflow.set_tracking_uri(mlflow_logger.tracking_uri) + if mlflow_logger.model_registry_uri is not None: + mlflow.set_registry_uri(mlflow_logger.model_registry_uri) + mlflow.start_run(run_id=mlflow_logger._run_id,) + # Setup logging for child process. This ensures that any logs from composer are surfaced. + if python_logging_level > 0: + # If logging_level is 0, then the composer logger was unset. + logging.basicConfig( + format= + f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', + force=True, + ) + logging.getLogger('composer').setLevel(python_logging_level) + logging.getLogger('llmfoundry').setLevel(python_logging_level) - model_registry_name = f'{mlflow_logger.model_registry_prefix}.{registered_model_name}' \ - if registered_model_name is not None else None mlflow_logger.log_model( - transformers_model=transformers_model_path, + transformers_model=transformers_model, flavor='transformers', - artifact_path='last_model_checkpoint', - input_example=mlflow_logging_config['input_example'], - metadata=mlflow_logging_config['metadata'], - task=mlflow_logging_config['metadata']['task'], - registered_model_name=model_registry_name, - await_creation_for=await_creation_for + artifact_path=artifact_path, + registered_model_name=registered_model_name, + task=task, + metadata=metadata, + run_id=mlflow_logger._run_id, + await_registration_for=await_registration_for, ) @@ -190,13 +232,6 @@ def _register_model_with_run_id_multiprocess( """Call MLFlowLogger.register_model_with_run_id. Used mainly to register from a child process. - - Args: - mlflow_logger: MLFlowLogger: MLflow logger object - composer_logging_level: int: logging level - model_uri: str: path to the model - name: str: name to register the model under in the MLflow model registry - await_creation_for: int: time to wait for model creation """ # Setup logging for child process. This ensures that any logs from composer are surfaced. if composer_logging_level > 0: @@ -377,16 +412,7 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: + f'Got {type(state.model)} instead.', ) if self.remote_ud is not None: - try: - self.remote_ud.init(state, logger) - except PermissionError as e: - if 'Client Error' in str( - e, - ): # thrown from composer.utils._wrap_mlflow_exceptions - raise StoragePermissionError( - 'Error when write to save_folder.', - ) from e - raise e + self.remote_ud.init(state, logger) state.callbacks.append(self.remote_ud) if self.mlflow_registered_model_name is not None: @@ -708,163 +734,194 @@ def tensor_hook( log.debug('Saving Hugging Face checkpoint to disk') - # This context manager casts the TE extra state in io.BytesIO format to tensor format - # Needed for proper hf ckpt saving. - context_manager = te.onnx_export( - True, - ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( - ) - with context_manager: - new_model_instance.save_pretrained(temp_save_dir) - original_tokenizer.save_pretrained(temp_save_dir) - # Only need to edit files for MPT because it has custom code - if new_model_instance.config.model_type == 'mpt': - log.debug('Editing MPT files for HuggingFace compatibility') - edit_files_for_hf_compatibility( - temp_save_dir, - self.flatten_imports, + if upload_to_save_folder: + # This context manager casts the TE extra state in io.BytesIO format to tensor format + # Needed for proper hf ckpt saving. + context_manager = te.onnx_export( + True, + ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( ) - - if upload_to_save_folder and self.remote_ud is not None: - for filename in os.listdir(temp_save_dir): - remote_file_name = os.path.join(save_dir, filename) - remote_file_uri = self.remote_ud.remote_backend.get_uri( - remote_file_name, - ) - log.info( - f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}', + with context_manager: + new_model_instance.save_pretrained(temp_save_dir) + if original_tokenizer is not None: + assert isinstance( + original_tokenizer, + PreTrainedTokenizerBase, ) - self.remote_ud.upload_file( - state=state, - remote_file_name=remote_file_name, - file_path=Path( - os.path.join(temp_save_dir, filename), - ), - overwrite=self.overwrite, + original_tokenizer.save_pretrained(temp_save_dir) + + # Only need to edit files for MPT because it has custom code + if new_model_instance.config.model_type == 'mpt': + log.debug('Editing MPT files for HuggingFace compatibility') + edit_files_for_hf_compatibility( + temp_save_dir, + self.flatten_imports, ) + if self.remote_ud is not None: + for filename in os.listdir(temp_save_dir): + remote_file_name = os.path.join(save_dir, filename) + remote_file_uri = self.remote_ud.remote_backend.get_uri( + remote_file_name, + ) + log.info( + f'Uploading HuggingFace formatted checkpoint to {remote_file_uri}', + ) + self.remote_ud.upload_file( + state=state, + remote_file_name=remote_file_name, + file_path=Path( + os.path.join(temp_save_dir, filename), + ), + overwrite=self.overwrite, + ) + dist.barrier() if dist.get_global_rank() == 0: if register_to_mlflow: - new_model_instance = self.transform_model_pre_registration( - new_model_instance, - ) - - components = {'model': new_model_instance} - if original_tokenizer is not None: - components['tokenizer'] = original_tokenizer + if self.using_peft: - log.debug('Logging Hugging Face model to MLFlow') - for i, mlflow_logger in enumerate(self.mlflow_loggers): - log.debug( - f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + # Save and register peft model to mlflow, this code path uses our older two step logic + self._save_and_register_peft_model( + state, + new_model_instance, + original_tokenizer, + temp_save_dir, ) - local_save_path = str( - Path(temp_save_dir) / f'mlflow_save_{i}', + else: + register_save_dir = os.path.join( + temp_save_dir, + 'register_save', ) - - # TODO: Remove after mlflow fixes the bug that makes this necessary - import mlflow - mlflow.store._unity_catalog.registry.rest_store.get_feature_dependencies = lambda *args, **kwargs: '' - model_saving_kwargs: dict[str, Any] = { - 'path': local_save_path, - } - if self.using_peft: - model_saving_kwargs['flavor'] = 'peft' - model_saving_kwargs['save_pretrained_dir' - ] = temp_save_dir - model_saving_kwargs[ - 'metadata'] = self.mlflow_logging_config['metadata'] - # If PEFT, still use original save_model codepath - context_manager = te.onnx_export( - True, - ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( - ) - with context_manager: - # Add the pip requirements directly to avoid mlflow - # attempting to run inference on the model - model_saving_kwargs['pip_requirements'] = [ - 'transformers', - 'torch', - ] - mlflow_logger.save_model(**model_saving_kwargs) - else: - model_saving_kwargs['flavor'] = 'transformers' - model_saving_kwargs['transformers_model'] = components - model_saving_kwargs.update(self.mlflow_logging_config) - - # Upload the license file generated by mlflow during the model saving. - license_filename = _maybe_get_license_filename( - local_save_path, - self.pretrained_model_name, + new_model_instance = self.transform_model_pre_registration( + new_model_instance, ) - if license_filename is not None: - mlflow_logger._mlflow_client.log_artifact( - mlflow_logger._run_id, - os.path.join(local_save_path, license_filename), - ) - self.pre_register_edit(local_save_path,) + new_model_instance.save_pretrained(register_save_dir) + original_tokenizer.save_pretrained(register_save_dir) - # Save the monitor process to be restored after registering the model. - if hasattr(mlflow_logger, 'monitor_process'): - monitor_process = mlflow_logger.monitor_process # type: ignore - mlflow_logger.monitor_process = None # type: ignore - else: - monitor_process = None - - # Spawn a new process to register the model. - # Slower method to register the model via log_model. - if self.using_peft: - # If PEFT, use original register_model codepath until Composer - # supports logging PEFT models to MLFlow - process = SpawnProcess( - target=_register_model_with_run_id_multiprocess, - kwargs={ - 'mlflow_logger': - mlflow_logger, - 'composer_logging_level': - logging.getLogger('composer').level, - 'model_uri': - local_save_path, - 'name': - self.mlflow_registered_model_name, - 'await_creation_for': - 3600, - }, - ) - else: - # Log a transformers model - process = SpawnProcess( - target=_log_model_multiprocess, - kwargs={ - 'await_creation_for': - 3600, - 'mlflow_logger': - mlflow_logger, - 'mlflow_logging_config': - self.mlflow_logging_config, - 'python_logging_level': - logging.getLogger('llmfoundry').level, - 'registered_model_name': - self.mlflow_registered_model_name, - 'transformers_model_path': - temp_save_dir, - }, - ) - process.start() + self.pre_register_edit(register_save_dir) - # Restore the monitor process. - if monitor_process is not None: - mlflow_logger.monitor_process = monitor_process # type: ignore - self.register_processes.append(process) + for mlflow_logger in self.mlflow_loggers: + if self.mlflow_registered_model_name: + log.debug( + f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + ) - # Save the temporary directory to be cleaned up later. - if use_temp_dir: - self.temp_save_dir = temp_save_dir + _log_license_file_with_mlflow( + mlflow_logger=mlflow_logger, + pretrained_model_name=self.pretrained_model_name, + save_path=register_save_dir, + ) + + # Save the monitor process to be restored after registering the model. + with _monitor_process_saver(mlflow_logger): + process = SpawnProcess( + target=_log_model_with_multi_process, + kwargs={ + 'mlflow_logger': + mlflow_logger, + 'python_logging_level': + logging.getLogger('llmfoundry').level, + 'transformers_model': + register_save_dir, + 'artifact_path': + 'final_model_checkpoint', + 'task': + self.mlflow_logging_config['task'], + 'registered_model_name': + f'{mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + 'metadata': + self.mlflow_logging_config['metadata'], + 'await_registration_for': + 3600, + }, + ) + + process.start() + self.register_processes.append(process) + + # Save the temporary directory to be cleaned up later. + if use_temp_dir: + self.temp_save_dir = temp_save_dir else: # Clean up the temporary directory if we don't need to register to mlflow. if use_temp_dir: shutil.rmtree(temp_save_dir) dist.barrier() + + def _save_and_register_peft_model( + self, + state: State, + new_model_instance: Any, + original_tokenizer: Optional[Any], + save_dir: str, + ): + new_model_instance = self.transform_model_pre_registration( + new_model_instance, + ) + components = {'model': new_model_instance} + if original_tokenizer is not None: + components['tokenizer'] = original_tokenizer + + log.debug('Logging Hugging Face model to MLFlow') + for i, mlflow_logger in enumerate(self.mlflow_loggers): + log.debug( + f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + ) + + local_save_path = str(Path(save_dir) / f'mlflow_save_{i}',) + + # TODO: Remove after mlflow fixes the bug that makes this necessary + import mlflow + mlflow.store._unity_catalog.registry.rest_store.get_feature_dependencies = lambda *args, **kwargs: '' + + model_saving_kwargs: dict[str, Any] = { + 'path': local_save_path, + } + model_saving_kwargs['flavor'] = 'peft' + model_saving_kwargs['save_pretrained_dir'] = save_dir + model_saving_kwargs['metadata'] = self.mlflow_logging_config[ + 'metadata'] + + context_manager = te.onnx_export( + True, + ) if is_te_imported and state.precision == Precision.AMP_FP8 else contextlib.nullcontext( + ) + with context_manager: + # Add the pip requirements directly to avoid mlflow + # attempting to run inference on the model + model_saving_kwargs['pip_requirements'] = [ + 'transformers', + 'torch', + ] + mlflow_logger.save_model(**model_saving_kwargs) + + # Upload the license file generated by mlflow during the model saving. + _log_license_file_with_mlflow( + mlflow_logger=mlflow_logger, + pretrained_model_name=self.pretrained_model_name, + save_path=local_save_path + ) + + self.pre_register_edit(local_save_path) + + with _monitor_process_saver(mlflow_logger): + process = SpawnProcess( + target=_register_model_with_run_id_multiprocess, + kwargs={ + 'mlflow_logger': + mlflow_logger, + 'composer_logging_level': + logging.getLogger('composer').level, + 'model_uri': + local_save_path, + 'name': + self.mlflow_registered_model_name, + 'await_creation_for': + 3600, + }, + ) + process.start() + self.register_processes.append(process) From 687e48bc55cbee647333baecb133d40a08a000e0 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:21:20 -0700 Subject: [PATCH 32/46] default name logic --- llmfoundry/callbacks/hf_checkpointer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index bc26a712ce..1f640386d0 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -150,7 +150,7 @@ def _log_model_with_multi_process( transformers_model: str, artifact_path: str, task: str, - registered_model_name: str, + registered_model_name: Optional[str], metadata: dict[str, Any], await_registration_for: int, ): @@ -210,11 +210,12 @@ def save_model_patch(*args: Any, **kwargs: Any): logging.getLogger('composer').setLevel(python_logging_level) logging.getLogger('llmfoundry').setLevel(python_logging_level) + register_model_path = f'{mlflow_logger.model_registry_prefix}.{registered_model_name}' if registered_model_name else None mlflow_logger.log_model( transformers_model=transformers_model, flavor='transformers', artifact_path=artifact_path, - registered_model_name=registered_model_name, + registered_model_name=register_model_path, task=task, metadata=metadata, run_id=mlflow_logger._run_id, @@ -831,7 +832,7 @@ def tensor_hook( 'task': self.mlflow_logging_config['task'], 'registered_model_name': - f'{mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', + self.mlflow_registered_model_name, 'metadata': self.mlflow_logging_config['metadata'], 'await_registration_for': From 65a5a1cf35d40490d76f134bfba29537ae7ca2e9 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:25:06 -0700 Subject: [PATCH 33/46] typo --- tests/a_scripts/inference/test_convert_composer_to_hf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index a9e9ccd503..84792f04fb 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -502,7 +502,7 @@ def test_final_register_only( new=MagicMock() ) @patch( - 'llmfoundry.callbacks.hf_checkpointer._log_model_multiprocess', + 'llmfoundry.callbacks.hf_checkpointer._log_model_with_multiprocess', new=MagicMock(), ) @patch( From ff2f4ac67160ce3e491cd02854ea1ca4931176d1 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:27:38 -0700 Subject: [PATCH 34/46] precommit --- llmfoundry/callbacks/hf_checkpointer.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 1f640386d0..b1048cdbe5 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -126,11 +126,12 @@ def _maybe_get_license_filename( except StopIteration: return None - + + def _log_license_file_with_mlflow( - mlflow_logger: MLFlowLogger, - pretrained_model_name: Optional[str], - save_path: str, + mlflow_logger: MLFlowLogger, + pretrained_model_name: Optional[str], + save_path: str, ): # Get and log the license file. license_filename = _maybe_get_license_filename( @@ -156,11 +157,9 @@ def _log_model_with_multi_process( ): """Call MLFlowLogger.log_model. - Used mainly to register from a child process. - First, patch the mlflow save_model function. - - We do two things: (1) Remove duplicate tokenizer files in the model - directory. (2) Log the license file. + First, patch the mlflow save_model function by removing duplicate tokenizer + files in the model directory. Then, register the model to mlflow from a + child process. """ import mlflow original_save_model = mlflow.transformers.save_model @@ -194,7 +193,7 @@ def save_model_patch(*args: Any, **kwargs: Any): ) mlflow.transformers.save_model = save_model_patch - + mlflow.set_tracking_uri(mlflow_logger.tracking_uri) if mlflow_logger.model_registry_uri is not None: mlflow.set_registry_uri(mlflow_logger.model_registry_uri) @@ -903,7 +902,7 @@ def _save_and_register_peft_model( _log_license_file_with_mlflow( mlflow_logger=mlflow_logger, pretrained_model_name=self.pretrained_model_name, - save_path=local_save_path + save_path=local_save_path, ) self.pre_register_edit(local_save_path) From 67a3acc4d136e6547712a51cc126290fe445caac Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:37:56 -0700 Subject: [PATCH 35/46] precommit again --- llmfoundry/callbacks/hf_checkpointer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index b1048cdbe5..132d93189e 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -15,7 +15,6 @@ from pathlib import Path from typing import Any, Optional, Sequence, Union -import mlflow import numpy as np import torch import torch.nn as nn From 598d4f3c63f98b48687167f486ef7a42249f5609 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:39:41 -0700 Subject: [PATCH 36/46] precommit --- .../inference/test_convert_composer_to_hf.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 84792f04fb..6258aa062a 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -499,10 +499,10 @@ def test_final_register_only( ) @patch( 'composer.callbacks.checkpoint_saver.CheckpointSaver._save_checkpoint', - new=MagicMock() + new=MagicMock(), ) @patch( - 'llmfoundry.callbacks.hf_checkpointer._log_model_with_multiprocess', + 'llmfoundry.callbacks.hf_checkpointer._log_model_with_multi_process', new=MagicMock(), ) @patch( @@ -544,7 +544,7 @@ def test_huggingface_conversion_callback_interval( optimizer = _create_optimizer(original_model) mlflow_logger_mock = _create_mlflow_logger_mock() - + mpt_tokenizer.save_pretrained = MagicMock() checkpointer_callback.transform_model_pre_registration = MagicMock( @@ -587,7 +587,7 @@ def test_huggingface_conversion_callback_interval( assert checkpointer_callback.transform_model_pre_registration.call_count == 0 assert checkpointer_callback.pre_register_edit.call_count == 0 assert mlflow_logger_mock.save_model.call_count == 0 - + normal_checkpoints = [ name for name in os.listdir(os.path.join(tmp_path, 'checkpoints')) if name != 'huggingface' @@ -599,7 +599,6 @@ def test_huggingface_conversion_callback_interval( assert len(normal_checkpoints) == expected_normal_checkpoints assert len(huggingface_checkpoints) == expected_hf_checkpoints - # Load the last huggingface checkpoint from transformers.models.auto.configuration_auto import CONFIG_MAPPING CONFIG_MAPPING._extra_content['mpt'] = MPTConfig @@ -1708,10 +1707,13 @@ def __init__(self, config: PretrainedConfig): ) else: self.generation_config = config.generation_config - + def save_pretrained(self, output_path: str): os.makedirs(output_path, exist_ok=True) - with open(os.path.join(output_path, 'generation_config.json'), 'w') as f: + with open( + os.path.join(output_path, 'generation_config.json'), + 'w', + ) as f: f.write(str(self.generation_config)) config = AutoConfig.from_pretrained('gpt2') From 6c34a235d0ea1ede9bb986be7d45454768fd296a Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:46:18 -0700 Subject: [PATCH 37/46] fix tests --- .../a_scripts/inference/test_convert_composer_to_hf.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 6258aa062a..236961b436 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -341,15 +341,17 @@ def is_alive(self) -> bool: def _create_mlflow_logger_mock() -> MagicMock: mlflow_logger_mock = MagicMock(spec=MLFlowLogger) - mlflow_logger_mock.log_model = MagicMock() - mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} - mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) mlflow_logger_mock._mlflow_client = MagicMock() - mlflow_logger_mock.model_registry_prefix = '' mlflow_logger_mock._experiment_id = 'mlflow-experiment-id' mlflow_logger_mock._run_id = 'mlflow-run-id' mlflow_logger_mock._enabled = True + mlflow_logger_mock.log_model = MagicMock() + mlflow_logger_mock.model_registry_prefix = '' + mlflow_logger_mock.model_registry_uri = 'databricks' + mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} + mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) mlflow_logger_mock.run_url = 'fake-url' + mlflow_logger_mock.tracking_uri = 'databricks' return mlflow_logger_mock From a5fe32272ba7d5b5310121f3fce208a7417c13d3 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 31 Oct 2024 23:58:04 -0700 Subject: [PATCH 38/46] license --- llmfoundry/callbacks/hf_checkpointer.py | 48 +++++++++++-------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 132d93189e..9b648dfd15 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -127,29 +127,13 @@ def _maybe_get_license_filename( return None -def _log_license_file_with_mlflow( - mlflow_logger: MLFlowLogger, - pretrained_model_name: Optional[str], - save_path: str, -): - # Get and log the license file. - license_filename = _maybe_get_license_filename( - save_path, - pretrained_model_name, - ) - if license_filename is not None: - mlflow_logger._mlflow_client.log_artifact( - mlflow_logger._run_id, - os.path.join(save_path, license_filename), - ) - - def _log_model_with_multi_process( mlflow_logger: MLFlowLogger, python_logging_level: int, transformers_model: str, artifact_path: str, task: str, + pretrained_model_name: str, registered_model_name: Optional[str], metadata: dict[str, Any], await_registration_for: int, @@ -185,6 +169,15 @@ def save_model_patch(*args: Any, **kwargs: Any): os.remove( os.path.join(save_path, 'model', tokenizer_file_name), ) + license_filename = _maybe_get_license_filename( + save_path, + pretrained_model_name, + ) + if license_filename is not None: + mlflow_logger._mlflow_client.log_artifact( + mlflow_logger._run_id, + os.path.join(save_path, license_filename), + ) except Exception as e: log.error( f'Exception when removing duplicate tokenizer files in the model directory', @@ -808,12 +801,6 @@ def tensor_hook( f'Registering model to UC at {mlflow_logger.model_registry_prefix}.{self.mlflow_registered_model_name}', ) - _log_license_file_with_mlflow( - mlflow_logger=mlflow_logger, - pretrained_model_name=self.pretrained_model_name, - save_path=register_save_dir, - ) - # Save the monitor process to be restored after registering the model. with _monitor_process_saver(mlflow_logger): process = SpawnProcess( @@ -829,6 +816,8 @@ def tensor_hook( 'final_model_checkpoint', 'task': self.mlflow_logging_config['task'], + 'pretrained_model_name': + self.pretrained_model_name, 'registered_model_name': self.mlflow_registered_model_name, 'metadata': @@ -898,11 +887,16 @@ def _save_and_register_peft_model( mlflow_logger.save_model(**model_saving_kwargs) # Upload the license file generated by mlflow during the model saving. - _log_license_file_with_mlflow( - mlflow_logger=mlflow_logger, - pretrained_model_name=self.pretrained_model_name, - save_path=local_save_path, + # Get and log the license file. + license_filename = _maybe_get_license_filename( + local_save_path, + self.pretrained_model_name, ) + if license_filename is not None: + mlflow_logger._mlflow_client.log_artifact( + mlflow_logger._run_id, + os.path.join(local_save_path, license_filename), + ) self.pre_register_edit(local_save_path) From 5d37fd52ab00213eb7fccb544b501f0aef248ef1 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 1 Nov 2024 00:24:10 -0700 Subject: [PATCH 39/46] pr ffeedback and test remove start_run --- llmfoundry/callbacks/hf_checkpointer.py | 37 +++++++++++++++++-------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 9b648dfd15..d14f052533 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -47,6 +47,8 @@ from llmfoundry.utils.huggingface_hub_utils import \ edit_files_for_hf_compatibility +from llmfoundry.utils.exceptions import StoragePermissionError + try: import transformer_engine.pytorch as te is_te_imported = True @@ -144,6 +146,17 @@ def _log_model_with_multi_process( files in the model directory. Then, register the model to mlflow from a child process. """ + # Setup logging for child process. This ensures that any logs from composer are surfaced. + if python_logging_level > 0: + # If logging_level is 0, then the composer logger was unset. + logging.basicConfig( + format= + f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', + force=True, + ) + logging.getLogger('composer').setLevel(python_logging_level) + logging.getLogger('llmfoundry').setLevel(python_logging_level) + import mlflow original_save_model = mlflow.transformers.save_model @@ -189,17 +202,6 @@ def save_model_patch(*args: Any, **kwargs: Any): mlflow.set_tracking_uri(mlflow_logger.tracking_uri) if mlflow_logger.model_registry_uri is not None: mlflow.set_registry_uri(mlflow_logger.model_registry_uri) - mlflow.start_run(run_id=mlflow_logger._run_id,) - # Setup logging for child process. This ensures that any logs from composer are surfaced. - if python_logging_level > 0: - # If logging_level is 0, then the composer logger was unset. - logging.basicConfig( - format= - f'%(asctime)s: rank{dist.get_global_rank()}[%(process)d][%(threadName)s]: %(levelname)s: %(name)s: %(message)s', - force=True, - ) - logging.getLogger('composer').setLevel(python_logging_level) - logging.getLogger('llmfoundry').setLevel(python_logging_level) register_model_path = f'{mlflow_logger.model_registry_prefix}.{registered_model_name}' if registered_model_name else None mlflow_logger.log_model( @@ -404,6 +406,16 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: + f'Got {type(state.model)} instead.', ) if self.remote_ud is not None: + try: + self.remote_ud.init(state, logger) + except PermissionError as e: + if 'Client Error' in str( + e, + ): # thrown from composer.utils._wrap_mlflow_exceptions + raise StoragePermissionError( + 'Error when write to save_folder.', + ) from e + raise e self.remote_ud.init(state, logger) state.callbacks.append(self.remote_ud) @@ -791,7 +803,8 @@ def tensor_hook( ) new_model_instance.save_pretrained(register_save_dir) - original_tokenizer.save_pretrained(register_save_dir) + if original_tokenizer: + original_tokenizer.save_pretrained(register_save_dir) self.pre_register_edit(register_save_dir) From 0132611db7840e46687d4d9e1cc79f85cd611001 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 1 Nov 2024 00:26:03 -0700 Subject: [PATCH 40/46] precommit --- llmfoundry/callbacks/hf_checkpointer.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index d14f052533..e5080a60bc 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -44,11 +44,10 @@ from llmfoundry.models.mpt import MPTConfig, MPTForCausalLM from llmfoundry.models.utils import init_empty_weights +from llmfoundry.utils.exceptions import StoragePermissionError from llmfoundry.utils.huggingface_hub_utils import \ edit_files_for_hf_compatibility -from llmfoundry.utils.exceptions import StoragePermissionError - try: import transformer_engine.pytorch as te is_te_imported = True @@ -156,7 +155,7 @@ def _log_model_with_multi_process( ) logging.getLogger('composer').setLevel(python_logging_level) logging.getLogger('llmfoundry').setLevel(python_logging_level) - + import mlflow original_save_model = mlflow.transformers.save_model @@ -416,7 +415,6 @@ def run_event(self, event: Event, state: State, logger: Logger) -> None: 'Error when write to save_folder.', ) from e raise e - self.remote_ud.init(state, logger) state.callbacks.append(self.remote_ud) if self.mlflow_registered_model_name is not None: From 18025f7212c39ecfd1c4ca42b47c54dc69b79e85 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 1 Nov 2024 00:31:42 -0700 Subject: [PATCH 41/46] start run unnecessary --- tests/a_scripts/inference/test_convert_composer_to_hf.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 236961b436..2e582ccb97 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -380,10 +380,6 @@ def _create_optimizer(original_model: torch.nn.Module) -> torch.optim.Optimizer: 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', new=MagicMock(), ) -@patch( - 'mlflow.start_run', - new=MagicMock(), -) def test_final_register_only( mlflow_registry_error: bool, mlflow_registered_model_name: Optional[str], @@ -491,10 +487,6 @@ def test_final_register_only( 'llmfoundry.callbacks.hf_checkpointer.SpawnProcess', new=MockSpawnProcess, ) -@patch( - 'mlflow.start_run', - new=MagicMock(), -) @patch( 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', new=MagicMock(), From 356e3b21e9809b6c5d0fa42e6f9c3ffbc8807eb4 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Fri, 1 Nov 2024 00:54:20 -0700 Subject: [PATCH 42/46] typing --- llmfoundry/callbacks/hf_checkpointer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index e5080a60bc..db335edc2e 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -796,11 +796,11 @@ def tensor_hook( temp_save_dir, 'register_save', ) - new_model_instance = self.transform_model_pre_registration( - new_model_instance, - ) - - new_model_instance.save_pretrained(register_save_dir) + if new_model_instance is not None: + new_model_instance = self.transform_model_pre_registration( + new_model_instance, + ) + new_model_instance.save_pretrained(register_save_dir) if original_tokenizer: original_tokenizer.save_pretrained(register_save_dir) From 6daecb8e5b353c2a73d760a60f2481ac09f32f56 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 1 Nov 2024 08:43:46 +0000 Subject: [PATCH 43/46] fix ci --- llmfoundry/callbacks/hf_checkpointer.py | 14 ++---- .../inference/test_convert_composer_to_hf.py | 48 ++++++++----------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index db335edc2e..1561bf0291 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -133,11 +133,10 @@ def _log_model_with_multi_process( python_logging_level: int, transformers_model: str, artifact_path: str, - task: str, pretrained_model_name: str, registered_model_name: Optional[str], - metadata: dict[str, Any], await_registration_for: int, + mlflow_logging_config: dict[str, Any], ): """Call MLFlowLogger.log_model. @@ -202,16 +201,15 @@ def save_model_patch(*args: Any, **kwargs: Any): if mlflow_logger.model_registry_uri is not None: mlflow.set_registry_uri(mlflow_logger.model_registry_uri) - register_model_path = f'{mlflow_logger.model_registry_prefix}.{registered_model_name}' if registered_model_name else None + register_model_path = f'{mlflow_logger.model_registry_prefix}.{registered_model_name}' if mlflow_logger.model_registry_prefix and registered_model_name else registered_model_name mlflow_logger.log_model( transformers_model=transformers_model, flavor='transformers', artifact_path=artifact_path, registered_model_name=register_model_path, - task=task, - metadata=metadata, run_id=mlflow_logger._run_id, await_registration_for=await_registration_for, + **mlflow_logging_config, ) @@ -825,16 +823,14 @@ def tensor_hook( register_save_dir, 'artifact_path': 'final_model_checkpoint', - 'task': - self.mlflow_logging_config['task'], 'pretrained_model_name': self.pretrained_model_name, 'registered_model_name': self.mlflow_registered_model_name, - 'metadata': - self.mlflow_logging_config['metadata'], 'await_registration_for': 3600, + 'mlflow_logging_config': + self.mlflow_logging_config, }, ) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 2e582ccb97..2f40e24999 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -13,6 +13,7 @@ from unittest.mock import ANY, MagicMock, patch import catalogue +import numpy as np import pytest import torch import torch.nn as nn @@ -491,14 +492,6 @@ def test_final_register_only( 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', new=MagicMock(), ) -@patch( - 'composer.callbacks.checkpoint_saver.CheckpointSaver._save_checkpoint', - new=MagicMock(), -) -@patch( - 'llmfoundry.callbacks.hf_checkpointer._log_model_with_multi_process', - new=MagicMock(), -) @patch( 'mlflow.transformers.save_model', new=MagicMock(), @@ -539,15 +532,12 @@ def test_huggingface_conversion_callback_interval( mlflow_logger_mock = _create_mlflow_logger_mock() - mpt_tokenizer.save_pretrained = MagicMock() - checkpointer_callback.transform_model_pre_registration = MagicMock( wraps=checkpointer_callback.transform_model_pre_registration, ) checkpointer_callback.pre_register_edit = MagicMock( wraps=checkpointer_callback.pre_register_edit, ) - checkpointer_callback.mlflow_logging_config = MagicMock() trainer = Trainer( model=original_model, device='gpu', @@ -567,12 +557,15 @@ def test_huggingface_conversion_callback_interval( mlflow_logger_mock.log_model.assert_called_with( transformers_model=ANY, flavor='transformers', - artifact_path='last_model_checkpoint', - input_example=ANY, + artifact_path='final_model_checkpoint', + registered_model_name='dummy-registered-name', + run_id='mlflow-run-id', + await_registration_for=3600, metadata=ANY, task=ANY, - registered_model_name=ANY, - await_creation_for=3600, + input_example={ + 'prompt': np.array(['What is Machine Learning?']), + }, ) assert checkpointer_callback.transform_model_pre_registration.call_count == 1 assert checkpointer_callback.pre_register_edit.call_count == 1 @@ -580,7 +573,7 @@ def test_huggingface_conversion_callback_interval( else: assert checkpointer_callback.transform_model_pre_registration.call_count == 0 assert checkpointer_callback.pre_register_edit.call_count == 0 - assert mlflow_logger_mock.save_model.call_count == 0 + assert mlflow_logger_mock.log_model.call_count == 0 normal_checkpoints = [ name for name in os.listdir(os.path.join(tmp_path, 'checkpoints')) @@ -735,7 +728,6 @@ def _assert_mlflow_logger_calls( peft_config: Optional[dict] = None, ): if dist.get_global_rank() == 0: - assert mlflow_logger_mock.save_model.call_count == 1 if peft_config is not None: expectation = { 'flavor': 'peft', @@ -743,24 +735,24 @@ def _assert_mlflow_logger_calls( 'save_pretrained_dir': ANY, 'metadata': {}, } + assert mlflow_logger_mock.save_model.call_count == 1 else: - import numpy as np - default_input_example = { 'prompt': np.array(['What is Machine Learning?']), } - expectation = { - 'flavor': 'transformers', 'transformers_model': ANY, - 'path': ANY, - 'task': 'llm/v1/completions', + 'flavor': 'transformers', + 'artifact_path': 'final_model_checkpoint', + 'registered_model_name': 'dummy-registered-name', + 'run_id': 'mlflow-run-id', + 'await_registration_for': 3600, + 'metadata': ANY, + 'task': ANY, 'input_example': default_input_example, - 'metadata': {}, - 'pip_requirements': ANY, } - mlflow_logger_mock.save_model.assert_called_with(**expectation) - assert mlflow_logger_mock.log_model.call_count == 1 + assert mlflow_logger_mock.log_model.call_count == 1 + mlflow_logger_mock.log_model.assert_called_with(**expectation) else: assert mlflow_logger_mock.log_model.call_count == 0 assert mlflow_logger_mock.log_model.call_count == 0 @@ -1081,6 +1073,8 @@ def test_huggingface_conversion_callback( mlflow_logger_mock._run_id = 'mlflow-run-id' mlflow_logger_mock._enabled = True mlflow_logger_mock.run_url = 'fake-url' + mlflow_logger_mock.tracking_uri = None + mlflow_logger_mock.model_registry_uri = None trainer = Trainer( model=original_model, device='gpu', From e3b28bf741c7e696abf330bf6a2a3abd4381730e Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 1 Nov 2024 08:46:40 +0000 Subject: [PATCH 44/46] fix --- llmfoundry/callbacks/hf_checkpointer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 1561bf0291..ddd53a376c 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -794,11 +794,11 @@ def tensor_hook( temp_save_dir, 'register_save', ) - if new_model_instance is not None: - new_model_instance = self.transform_model_pre_registration( - new_model_instance, - ) - new_model_instance.save_pretrained(register_save_dir) + assert new_model_instance is not None + new_model_instance = self.transform_model_pre_registration( + new_model_instance, + ) + new_model_instance.save_pretrained(register_save_dir) if original_tokenizer: original_tokenizer.save_pretrained(register_save_dir) From 674506de28b82a63a5be021f0df70bf49f38b36d Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 1 Nov 2024 09:15:59 +0000 Subject: [PATCH 45/46] clean up tests --- .../inference/test_convert_composer_to_hf.py | 36 ++----------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/tests/a_scripts/inference/test_convert_composer_to_hf.py b/tests/a_scripts/inference/test_convert_composer_to_hf.py index 2f40e24999..f599ebbc16 100644 --- a/tests/a_scripts/inference/test_convert_composer_to_hf.py +++ b/tests/a_scripts/inference/test_convert_composer_to_hf.py @@ -348,11 +348,11 @@ def _create_mlflow_logger_mock() -> MagicMock: mlflow_logger_mock._enabled = True mlflow_logger_mock.log_model = MagicMock() mlflow_logger_mock.model_registry_prefix = '' - mlflow_logger_mock.model_registry_uri = 'databricks' + mlflow_logger_mock.model_registry_uri = None mlflow_logger_mock.state_dict = lambda *args, **kwargs: {} mlflow_logger_mock.save_model = MagicMock(wraps=_save_model_mock) mlflow_logger_mock.run_url = 'fake-url' - mlflow_logger_mock.tracking_uri = 'databricks' + mlflow_logger_mock.tracking_uri = None return mlflow_logger_mock @@ -377,10 +377,6 @@ def _create_optimizer(original_model: torch.nn.Module) -> torch.optim.Optimizer: 'llmfoundry.callbacks.hf_checkpointer.SpawnProcess', new=MockSpawnProcess, ) -@patch( - 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', - new=MagicMock(), -) def test_final_register_only( mlflow_registry_error: bool, mlflow_registered_model_name: Optional[str], @@ -413,12 +409,9 @@ def test_final_register_only( mlflow_logger_mock = _create_mlflow_logger_mock() - original_model.save_pretrained = MagicMock() - checkpointer_callback._save_checkpoint = MagicMock( wraps=checkpointer_callback._save_checkpoint, ) - checkpointer_callback.mlflow_logging_config = MagicMock() trainer = Trainer( model=original_model, device='gpu', @@ -488,14 +481,6 @@ def test_final_register_only( 'llmfoundry.callbacks.hf_checkpointer.SpawnProcess', new=MockSpawnProcess, ) -@patch( - 'llmfoundry.callbacks.hf_checkpointer._maybe_get_license_filename', - new=MagicMock(), -) -@patch( - 'mlflow.transformers.save_model', - new=MagicMock(), -) def test_huggingface_conversion_callback_interval( tmp_path: pathlib.Path, log_to_mlflow: bool, @@ -587,11 +572,6 @@ def test_huggingface_conversion_callback_interval( assert len(huggingface_checkpoints) == expected_hf_checkpoints # Load the last huggingface checkpoint - from transformers.models.auto.configuration_auto import CONFIG_MAPPING - CONFIG_MAPPING._extra_content['mpt'] = MPTConfig - MPTConfig.register_for_auto_class() - MPTForCausalLM.register_for_auto_class('AutoModelForCausalLM') - loaded_model = transformers.AutoModelForCausalLM.from_pretrained( os.path.join( tmp_path, @@ -755,7 +735,6 @@ def _assert_mlflow_logger_calls( mlflow_logger_mock.log_model.assert_called_with(**expectation) else: assert mlflow_logger_mock.log_model.call_count == 0 - assert mlflow_logger_mock.log_model.call_count == 0 def _get_fsdp_config(fsdp_state_dict_type: Optional[str]): @@ -1696,14 +1675,6 @@ def __init__(self, config: PretrainedConfig): else: self.generation_config = config.generation_config - def save_pretrained(self, output_path: str): - os.makedirs(output_path, exist_ok=True) - with open( - os.path.join(output_path, 'generation_config.json'), - 'w', - ) as f: - f.write(str(self.generation_config)) - config = AutoConfig.from_pretrained('gpt2') # Convert dict to GenerationConfig if needed if isinstance(generation_config, dict): @@ -1713,11 +1684,10 @@ def save_pretrained(self, output_path: str): mock_model = MockModel(config) logger = MagicMock() state = MagicMock() - tokenizer = MagicMock() state.timestamp.batch = 1 state.is_model_ddp = False state.model.model = mock_model - state.model.tokenizer = tokenizer + state.model.tokenizer = None checkpointer = HuggingFaceCheckpointer( save_folder='test', From 30b69279223a89b41c1b1be5e34f463da76f4089 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 1 Nov 2024 13:00:04 -0700 Subject: [PATCH 46/46] type ignore --- llmfoundry/callbacks/hf_checkpointer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llmfoundry/callbacks/hf_checkpointer.py b/llmfoundry/callbacks/hf_checkpointer.py index 8abe845f1b..688d8deb74 100644 --- a/llmfoundry/callbacks/hf_checkpointer.py +++ b/llmfoundry/callbacks/hf_checkpointer.py @@ -195,7 +195,7 @@ def save_model_patch(*args: Any, **kwargs: Any): e, ) - mlflow.transformers.save_model = save_model_patch + mlflow.transformers.save_model = save_model_patch # type: ignore mlflow.set_tracking_uri(mlflow_logger.tracking_uri) if mlflow_logger.model_registry_uri is not None: