From 9d040ed1827b407fea85a0fadb9410b8aa53f67d Mon Sep 17 00:00:00 2001 From: KepingYan Date: Thu, 16 Nov 2023 15:34:03 +0800 Subject: [PATCH] Ipex upgrade (#121) * merge and update * upd * update asyncio.sleep time --- inference/deepspeed_predictor.py | 29 +++++++++++++++++++++++------ inference/run_model_serve.py | 12 +++++++----- inference/transformer_predictor.py | 20 +++++++++++++++++--- requirements.txt | 6 +++--- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/inference/deepspeed_predictor.py b/inference/deepspeed_predictor.py index f415c99ee..222e1aba6 100644 --- a/inference/deepspeed_predictor.py +++ b/inference/deepspeed_predictor.py @@ -3,7 +3,7 @@ import deepspeed import oneccl_bindings_for_pytorch -from transformers import AutoModelForCausalLM, AutoTokenizer +from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig from ray.air.util.torch_dist import ( TorchDistributedWorker, init_torch_dist_process_group, @@ -13,7 +13,6 @@ from ray.air import ScalingConfig from typing import List import os -# import math from peft import PeftModel from deltatuner import DeltaTunerModel @@ -26,6 +25,7 @@ def __init__( stopping_criteria, dtype, device, + deployment_mode=False, deltatuner_model_id_or_path=None ): @@ -34,8 +34,16 @@ def __init__( self.pad_token_id = pad_token_id self.stopping_criteria = stopping_criteria + + config = None + if deployment_mode: + trust_remote_code = model_load_config.get("trust_remote_code", None) + config = AutoConfig.from_pretrained( + model_id_or_path, torchscript=deployment_mode, trust_remote_code=trust_remote_code + ) self.model = AutoModelForCausalLM.from_pretrained(model_id_or_path, torch_dtype=dtype, + config=config, low_cpu_mem_usage=True, **model_load_config) @@ -72,7 +80,7 @@ class PredictionWorker(TorchDistributedWorker): Multiple PredictionWorkers of the same WorkerGroup form a PyTorch DDP process group and work together under the orchestration of DeepSpeed. """ - def __init__(self, world_size: int, model_id, model_load_config, device_name, amp_enabled, amp_dtype, pad_token_id, stopping_criteria, ipex_enabled=False, deltatuner_model_id=None): + def __init__(self, world_size: int, model_id, model_load_config, device_name, amp_enabled, amp_dtype, pad_token_id, stopping_criteria, ipex_enabled=False, deployment_mode=False, deltatuner_model_id=None): self.world_size = world_size self.model_id = model_id self.model_load_config = model_load_config @@ -84,6 +92,7 @@ def __init__(self, world_size: int, model_id, model_load_config, device_name, am self.pad_token_id = pad_token_id self.stopping_criteria = stopping_criteria self.ipex_enabled = ipex_enabled + self.deployment_mode = deployment_mode def init_model(self, local_rank: int): """Initialize model for inference.""" @@ -117,9 +126,16 @@ def init_model(self, local_rank: int): if self.ipex_enabled: import intel_extension_for_pytorch as ipex + + torch._C._jit_set_texpr_fuser_enabled(False) try: ipex._C.disable_jit_linear_repack() except: pass - pipe.model = ipex.optimize(pipe.model.eval(), dtype=self.amp_dtype, inplace=True) + pipe.model = ipex.optimize_transformers( + pipe.model.eval(), + dtype=self.amp_dtype, + inplace=True, + deployment_mode=self.deployment_mode, + ) self.generator = pipe @@ -131,7 +147,7 @@ def generate(self, inputs, **config): class DeepSpeedPredictor: def __init__(self, model_id, model_load_config, device_name, amp_enabled, amp_dtype, pad_token_id, stopping_criteria, - ipex_enabled, cpus_per_worker, gpus_per_worker, workers_per_group, deltatuner_model_id) -> None: + ipex_enabled, deployment_mode, cpus_per_worker, gpus_per_worker, workers_per_group, deltatuner_model_id) -> None: self.model_id = model_id self.model_load_config = model_load_config self.deltatuner_model_id = deltatuner_model_id @@ -141,6 +157,7 @@ def __init__(self, model_id, model_load_config, device_name, amp_enabled, amp_dt self.pad_token_id = pad_token_id self.stopping_criteria = stopping_criteria self.ipex_enabled = ipex_enabled + self.deployment_mode = deployment_mode use_gpu = True if (device_name == "cuda") else False @@ -195,7 +212,7 @@ def _init_worker_group(self, scaling_config: ScalingConfig): self.prediction_workers = [ prediction_worker_cls.remote(scaling_config.num_workers, self.model_id, self.model_load_config, self. device_name, self.amp_enabled, self.amp_dtype, - self.pad_token_id, self.stopping_criteria, self.ipex_enabled, self.deltatuner_model_id) + self.pad_token_id, self.stopping_criteria, self.ipex_enabled, self.deployment_mode, self.deltatuner_model_id) for i in range(scaling_config.num_workers) ] diff --git a/inference/run_model_serve.py b/inference/run_model_serve.py index 7f456b391..9743e95ae 100644 --- a/inference/run_model_serve.py +++ b/inference/run_model_serve.py @@ -35,7 +35,7 @@ def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor): @serve.deployment class PredictDeployment: def __init__(self, model_id, tokenizer_name_or_path, model_load_config, device_name, amp_enabled, amp_dtype, chat_processor_name, prompt, - ipex_enabled=False, deepspeed_enabled=False, cpus_per_worker=1, gpus_per_worker=0, workers_per_group=2, deltatuner_model_id=None): + ipex_enabled=False, deployment_mode=False, deepspeed_enabled=False, cpus_per_worker=1, gpus_per_worker=0, workers_per_group=2, deltatuner_model_id=None): self.device_name = device_name self.device = torch.device(device_name) self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path) @@ -55,10 +55,10 @@ def __init__(self, model_id, tokenizer_name_or_path, model_load_config, device_n if self.use_deepspeed: self.streamer = self.create_streamer() self.predictor = DeepSpeedPredictor(model_id, model_load_config, device_name, amp_enabled, amp_dtype, self.tokenizer.pad_token_id, self.stopping_criteria, - ipex_enabled, cpus_per_worker, gpus_per_worker, workers_per_group, deltatuner_model_id) + ipex_enabled, deployment_mode, cpus_per_worker, gpus_per_worker, workers_per_group, deltatuner_model_id) else: self.predictor = TransformerPredictor(model_id, model_load_config, device_name, amp_enabled, amp_dtype, self.tokenizer.pad_token_id, self.stopping_criteria, - ipex_enabled, deltatuner_model_id) + ipex_enabled, deployment_mode, deltatuner_model_id) self.loop = asyncio.get_running_loop() def create_streamer(self): @@ -125,7 +125,7 @@ async def consume_streamer_async(self, streamer: TextIteratorStreamer): # The streamer raises an Empty exception if the next token # hasn't been generated yet. `await` here to yield control # back to the event loop so other coroutines can run. - await asyncio.sleep(0.1) + await asyncio.sleep(0.001) async def __call__(self, http_request: Request) -> Union[StreamingResponse, str]: json_request: str = await http_request.json() @@ -176,6 +176,8 @@ async def __call__(self, http_request: Request) -> Union[StreamingResponse, str] parser.add_argument("--deepspeed", action='store_true', help="enable deepspeed inference") parser.add_argument("--workers_per_group", default="2", type=int, help="workers per group, used with --deepspeed") parser.add_argument("--ipex", action='store_true', help="enable ipex optimization") + parser.add_argument("--deployment_mode", action="store_true", help="Whether to apply the optimized model for deployment \ + of model generation, only used when ipex optimization is True.") parser.add_argument("--device", default="cpu", type=str, help="cpu, xpu or cuda") args = parser.parse_args() @@ -227,7 +229,7 @@ async def __call__(self, http_request: Request) -> Union[StreamingResponse, str] model_config["model_id_or_path"], model_config["tokenizer_name_or_path"], model_load_config, args.device, amp_enabled, amp_dtype, chat_processor_name = model_config["chat_processor"], prompt=model_config["prompt"], - ipex_enabled=args.ipex, + ipex_enabled=args.ipex, deployment_mode=args.deployment_mode, deepspeed_enabled=args.deepspeed, cpus_per_worker=args.cpus_per_worker, gpus_per_worker=args.gpus_per_worker, workers_per_group=args.workers_per_group, deltatuner_model_id=model_config["deltatuner_model_id_or_path"]) diff --git a/inference/transformer_predictor.py b/inference/transformer_predictor.py index 26068d58f..7254d9fd6 100644 --- a/inference/transformer_predictor.py +++ b/inference/transformer_predictor.py @@ -1,16 +1,23 @@ import torch -from transformers import AutoModelForCausalLM +from transformers import AutoModelForCausalLM, AutoConfig from peft import PeftModel from deltatuner import DeltaTunerModel class TransformerPredictor: - def __init__(self, model_id, model_load_config, device_name, amp_enabled, amp_dtype, pad_token_id, stopping_criteria, ipex_enabled, deltatuner_model_id=None): + def __init__(self, model_id, model_load_config, device_name, amp_enabled, amp_dtype, pad_token_id, stopping_criteria, ipex_enabled, deployment_mode, deltatuner_model_id=None): self.amp_enabled = amp_enabled self.amp_dtype = amp_dtype self.device = torch.device(device_name) + config = None + if deployment_mode: + trust_remote_code = model_load_config.get("trust_remote_code", None) + config = AutoConfig.from_pretrained( + model_id, torchscript=deployment_mode, trust_remote_code=trust_remote_code + ) model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=amp_dtype, + config=config, low_cpu_mem_usage=True, **model_load_config ) @@ -25,9 +32,16 @@ def __init__(self, model_id, model_load_config, device_name, amp_enabled, amp_dt # to ipex if ipex_enabled: import intel_extension_for_pytorch as ipex + + torch._C._jit_set_texpr_fuser_enabled(False) try: ipex._C.disable_jit_linear_repack() except: pass - self.model = ipex.optimize(model, dtype=amp_dtype, inplace=True) + self.model = ipex.optimize_transformers( + model.eval(), + dtype=amp_dtype, + inplace=True, + deployment_mode=deployment_mode, + ) else: self.model = model self.pad_token_id = pad_token_id diff --git a/requirements.txt b/requirements.txt index a551b1765..184f0ac78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,6 @@ einops peft==0.4.0 deltatuner==1.1.9 py-cpuinfo -torch==2.0.0+cpu -oneccl_bind_pt==2.0.0 -intel_extension_for_pytorch==2.0.100+cpu \ No newline at end of file +torch==2.1.0+cpu +oneccl_bind_pt==2.1.0 +intel_extension_for_pytorch==2.1.0+cpu