Skip to content

Commit

Permalink
add triton paraformer large online (#1242)
Browse files Browse the repository at this point in the history
* add triton paraformer large online
  • Loading branch information
zhu-gu-an authored Jan 13, 2024
1 parent 08f7eaf commit 49e8e9d
Show file tree
Hide file tree
Showing 12 changed files with 10,109 additions and 0 deletions.
64 changes: 64 additions & 0 deletions runtime/triton_gpu/README_ONLINE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
### Steps:
1. Prepare model repo files
* git clone https://www.modelscope.cn/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-online-onnx.git
* Convert lfr_cmvn_pe.onnx model. For example: python export_lfr_cmvn_pe_onnx.py
* If you export to onnx, you should have several model files in `${MODEL_DIR}`:
```
├── README.md
└── model_repo_paraformer_large_online
├── cif_search
│   ├── 1
│   │   └── model.py
│   └── config.pbtxt
├── decoder
│   ├── 1
│   │   └── decoder.onnx
│   └── config.pbtxt
├── encoder
│   ├── 1
│   │   └── model.onnx
│   └── config.pbtxt
├── feature_extractor
│   ├── 1
│   │   └── model.py
│   ├── config.pbtxt
│   └── config.yaml
├── lfr_cmvn_pe
│   ├── 1
│   │   └── lfr_cmvn_pe.onnx
│   ├── am.mvn
│   ├── config.pbtxt
│   └── export_lfr_cmvn_pe_onnx.py
└── streaming_paraformer
├── 1
└── config.pbtxt
```

2. Follow below instructions to launch triton server
```sh
# using docker image Dockerfile/Dockerfile.server
docker build . -f Dockerfile/Dockerfile.server -t triton-paraformer:23.01
docker run -it --rm --name "paraformer_triton_server" --gpus all -v <path_host/model_repo_paraformer_large_online>:/workspace/ --shm-size 1g --net host triton-paraformer:23.01

# launch the service
cd /workspace
tritonserver --model-repository model_repo_paraformer_large_online \
--pinned-memory-pool-byte-size=512000000 \
--cuda-memory-pool-byte-size=0:1024000000

```

### Performance benchmark with a single A10

* FP32, onnx, [paraformer larger online](https://modelscope.cn/models/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-online-onnx/summary
),Our chunksize is 10 * 960 / 16000 = 0.6 s, so we should care about the perf of latency less than 0.6s so that it can be a realtime application.


| Concurrency | Throughput | Latency_p50 (ms) | Latency_p90 (ms) | Latency_p95 (ms) | Latency_p99 (ms) |
|-------------|------------|------------------|------------------|------------------|------------------|
| 20 | 309.252 | 56.913 | 76.267 | 85.598 | 138.462 |
| 40 | 391.058 | 97.911 | 145.509 | 150.545 | 185.399 |
| 60 | 426.269 | 138.244 | 185.855 | 201.016 | 236.528 |
| 80 | 431.781 | 170.991 | 227.983 | 252.453 | 412.273 |
| 100 | 473.351 | 206.205 | 262.612 | 288.964 | 463.337 |

Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
# Created on 2024-01-01
# Author: GuAn Zhu

import triton_python_backend_utils as pb_utils
import numpy as np
from torch.utils.dlpack import from_dlpack
import json
import yaml
import asyncio
from collections import OrderedDict


class LimitedDict(OrderedDict):
def __init__(self, max_length):
super().__init__()
self.max_length = max_length

def __setitem__(self, key, value):
if len(self) >= self.max_length:
self.popitem(last=False)
super().__setitem__(key, value)


class CIFSearch:
"""CIFSearch: https://github.com/alibaba-damo-academy/FunASR/blob/main/runtime/python/onnxruntime/funasr_onnx
/paraformer_online_bin.py """
def __init__(self):
self.cache = {"cif_hidden": np.zeros((1, 1, 512)).astype(np.float32),
"cif_alphas": np.zeros((1, 1)).astype(np.float32), "last_chunk": False}
self.chunk_size = [5, 10, 5]
self.tail_threshold = 0.45
self.cif_threshold = 1.0

def infer(self, hidden, alphas):
batch_size, len_time, hidden_size = hidden.shape
token_length = []
list_fires = []
list_frames = []
cache_alphas = []
cache_hiddens = []
alphas[:, :self.chunk_size[0]] = 0.0
alphas[:, sum(self.chunk_size[:2]):] = 0.0

if self.cache is not None and "cif_alphas" in self.cache and "cif_hidden" in self.cache:
hidden = np.concatenate((self.cache["cif_hidden"], hidden), axis=1)
alphas = np.concatenate((self.cache["cif_alphas"], alphas), axis=1)
if self.cache is not None and "last_chunk" in self.cache and self.cache["last_chunk"]:
tail_hidden = np.zeros((batch_size, 1, hidden_size)).astype(np.float32)
tail_alphas = np.array([[self.tail_threshold]]).astype(np.float32)
tail_alphas = np.tile(tail_alphas, (batch_size, 1))
hidden = np.concatenate((hidden, tail_hidden), axis=1)
alphas = np.concatenate((alphas, tail_alphas), axis=1)

len_time = alphas.shape[1]
for b in range(batch_size):
integrate = 0.0
frames = np.zeros(hidden_size).astype(np.float32)
list_frame = []
list_fire = []
for t in range(len_time):
alpha = alphas[b][t]
if alpha + integrate < self.cif_threshold:
integrate += alpha
list_fire.append(integrate)
frames += alpha * hidden[b][t]
else:
frames += (self.cif_threshold - integrate) * hidden[b][t]
list_frame.append(frames)
integrate += alpha
list_fire.append(integrate)
integrate -= self.cif_threshold
frames = integrate * hidden[b][t]

cache_alphas.append(integrate)
if integrate > 0.0:
cache_hiddens.append(frames / integrate)
else:
cache_hiddens.append(frames)

token_length.append(len(list_frame))
list_fires.append(list_fire)
list_frames.append(list_frame)

max_token_len = max(token_length)
list_ls = []
for b in range(batch_size):
pad_frames = np.zeros((max_token_len - token_length[b], hidden_size)).astype(np.float32)
if token_length[b] == 0:
list_ls.append(pad_frames)
else:
list_ls.append(np.concatenate((list_frames[b], pad_frames), axis=0))

self.cache["cif_alphas"] = np.stack(cache_alphas, axis=0)
self.cache["cif_alphas"] = np.expand_dims(self.cache["cif_alphas"], axis=0)
self.cache["cif_hidden"] = np.stack(cache_hiddens, axis=0)
self.cache["cif_hidden"] = np.expand_dims(self.cache["cif_hidden"], axis=0)

return np.stack(list_ls, axis=0).astype(np.float32), np.stack(token_length, axis=0).astype(np.int32)


class TritonPythonModel:
"""Your Python model must use the same class name. Every Python model
that is created must have "TritonPythonModel" as the class name.
"""

def initialize(self, args):
"""`initialize` is called only once when the model is being loaded.
Implementing `initialize` function is optional. This function allows
the model to initialize any state associated with this model.
Parameters
----------
args : dict
Both keys and values are strings. The dictionary keys and values are:
* model_config: A JSON string containing the model configuration
* model_instance_kind: A string containing model instance kind
* model_instance_device_id: A string containing model instance device ID
* model_repository: Model repository path
* model_version: Model version
* model_name: Model name
"""
self.model_config = model_config = json.loads(args['model_config'])
self.max_batch_size = max(model_config["max_batch_size"], 1)

# # Get OUTPUT0 configuration
output0_config = pb_utils.get_output_config_by_name(
model_config, "transcripts")
# # Convert Triton types to numpy types
self.out0_dtype = pb_utils.triton_string_to_numpy(
output0_config['data_type'])

self.init_vocab(self.model_config['parameters'])

self.cif_search_cache = LimitedDict(1024)
self.start = LimitedDict(1024)

def init_vocab(self, parameters):
for li in parameters.items():
key, value = li
value = value["string_value"]
if key == "vocabulary":
self.vocab_dict = self.load_vocab(value)

def load_vocab(self, vocab_file):
with open(str(vocab_file), 'rb') as f:
config = yaml.load(f, Loader=yaml.Loader)
return config['token_list']

async def execute(self, requests):
"""`execute` must be implemented in every Python model. `execute`
function receives a list of pb_utils.InferenceRequest as the only
argument. This function is called when an inference is requested
for this model.
Parameters
----------
requests : list
A list of pb_utils.InferenceRequest
Returns
-------
list
A list of pb_utils.InferenceResponse. The length of this list must
be the same as `requests`
"""
# Every Python backend must iterate through list of requests and create
# an instance of pb_utils.InferenceResponse class for each of them. You
# should avoid storing any of the input Tensors in the class attributes
# as they will be overridden in subsequent inference requests. You can
# make a copy of the underlying NumPy array and store it if it is
# required.

batch_end = []
responses = []
batch_corrid = []
qualified_corrid = []
batch_result = {}
inference_response_awaits = []

for request in requests:
hidden = pb_utils.get_input_tensor_by_name(request, "enc")
hidden = from_dlpack(hidden.to_dlpack()).cpu().numpy()
alphas = pb_utils.get_input_tensor_by_name(request, "alphas")
alphas = from_dlpack(alphas.to_dlpack()).cpu().numpy()
hidden_len = pb_utils.get_input_tensor_by_name(request, "enc_len")
hidden_len = from_dlpack(hidden_len.to_dlpack()).cpu().numpy()

in_start = pb_utils.get_input_tensor_by_name(request, "START")
start = in_start.as_numpy()[0][0]

in_corrid = pb_utils.get_input_tensor_by_name(request, "CORRID")
corrid = in_corrid.as_numpy()[0][0]

in_end = pb_utils.get_input_tensor_by_name(request, "END")
end = in_end.as_numpy()[0][0]

batch_end.append(end)
batch_corrid.append(corrid)

if start:
self.cif_search_cache[corrid] = CIFSearch()
self.start[corrid] = 1
if end:
self.cif_search_cache[corrid].cache["last_chunk"] = True

acoustic, acoustic_len = self.cif_search_cache[corrid].infer(hidden, alphas)
batch_result[corrid] = ''
if acoustic.shape[1] == 0:
continue
else:
qualified_corrid.append(corrid)
input_tensor0 = pb_utils.Tensor("enc", hidden)
input_tensor1 = pb_utils.Tensor("enc_len", np.array([hidden_len], dtype=np.int32))
input_tensor2 = pb_utils.Tensor("acoustic_embeds", acoustic)
input_tensor3 = pb_utils.Tensor("acoustic_embeds_len", np.array([acoustic_len], dtype=np.int32))
input_tensors = [input_tensor0, input_tensor1, input_tensor2, input_tensor3]

if self.start[corrid] and end:
flag = 3
elif end:
flag = 2
elif self.start[corrid]:
flag = 1
self.start[corrid] = 0
else:
flag = 0
inference_request = pb_utils.InferenceRequest(
model_name='decoder',
requested_output_names=['sample_ids'],
inputs=input_tensors,
request_id='',
correlation_id=corrid,
flags=flag
)
inference_response_awaits.append(inference_request.async_exec())

inference_responses = await asyncio.gather(*inference_response_awaits)

for index_corrid, inference_response in zip(qualified_corrid, inference_responses):
if inference_response.has_error():
raise pb_utils.TritonModelException(inference_response.error().message())
else:
sample_ids = pb_utils.get_output_tensor_by_name(inference_response, 'sample_ids')
token_ids = from_dlpack(sample_ids.to_dlpack()).cpu().numpy()[0]

# Change integer-ids to tokens
tokens = [self.vocab_dict[token_id] for token_id in token_ids]
batch_result[index_corrid] = "".join(tokens)

for i, index_corrid in enumerate(batch_corrid):
sent = np.array([batch_result[index_corrid]])
out0 = pb_utils.Tensor("transcripts", sent.astype(self.out0_dtype))
inference_response = pb_utils.InferenceResponse(output_tensors=[out0])
responses.append(inference_response)

if batch_end[i]:
del self.cif_search_cache[index_corrid]
del self.start[index_corrid]

return responses

def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is optional. This function allows
the model to perform any necessary clean ups before exit.
"""
print('Cleaning up...')

Loading

0 comments on commit 49e8e9d

Please sign in to comment.