From 69546fe318194aafeb3291d4b4aa1aedfdfbab54 Mon Sep 17 00:00:00 2001 From: Yevhenii Havrylko Date: Wed, 3 Jan 2024 13:29:13 -0500 Subject: [PATCH] Add pipelining example --- numba_dpex/examples/kernel/pipelining.py | 145 +++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 numba_dpex/examples/kernel/pipelining.py diff --git a/numba_dpex/examples/kernel/pipelining.py b/numba_dpex/examples/kernel/pipelining.py new file mode 100644 index 0000000000..3dfd9eb16f --- /dev/null +++ b/numba_dpex/examples/kernel/pipelining.py @@ -0,0 +1,145 @@ +# SPDX-FileCopyrightText: 2020 - 2023 Intel Corporation +# +# SPDX-License-Identifier: Apache-2.0 + +""" +This example shows how you can run kernels asynchronously and pipeline them. + +In theory it runs computation and memory copy independently but because memory +copy takes less time than the deviation on computation time, I was unable to +capture any measurable time improvement. +""" + +import argparse +import time + +import dpctl +import dpnp +import numpy as np + +import numba_dpex as dpex +import numba_dpex.experimental as dpex_exp + + +@dpex_exp.kernel +def async_kernel(x): + idx = dpex.get_global_id(0) + + for i in range(1300): + den = x.dtype.type(i + 1) + x[idx] += x.dtype.type(1) / (den * den * den) + + +def run_serial(host_arr, n_itr): + t0 = time.time() + q = dpctl.SyclQueue() + + a_host = dpnp.asarray(host_arr, usm_type="host", sycl_queue=q) + usm_host_data = dpnp.get_usm_ndarray(a_host).usm_data + + batch_shape = (n_itr,) + a_host.shape + device_alloc = dpnp.empty(batch_shape, usm_type="device", sycl_queue=q) + + for offset in range(n_itr): + _a = device_alloc[offset] + _a_data = dpnp.get_usm_ndarray(_a).usm_data + + q.memcpy(_a_data, usm_host_data, usm_host_data.nbytes) + + dpex_exp.call_kernel( + async_kernel, + dpex.Range(len(_a)), + _a, + ) + + dt = time.time() - t0 + + return dt, None, None + + +def run_pipeline(host_arr, n_itr): + t0 = time.time() + q = dpctl.SyclQueue() + + a_host = dpnp.asarray(host_arr, usm_type="host", sycl_queue=q) + usm_host_data = dpnp.get_usm_ndarray(a_host).usm_data + + batch_shape = (n_itr,) + a_host.shape + device_alloc = dpnp.empty(batch_shape, usm_type="device", sycl_queue=q) + + e_a = None + e_b = None + + for offset in range(n_itr): + _a = device_alloc[offset] + _a_data = dpnp.get_usm_ndarray(_a).usm_data + + e_a = q.memcpy_async( + _a_data, + usm_host_data, + usm_host_data.nbytes, + [e_a] if e_a is not None else [], + ) + + e_a.wait() + + _, e_a = dpex_exp.call_kernel_async( + async_kernel, + dpex.Range(len(_a)), + (e_a,), + _a, + ) + + e_a.wait() + + e_a, e_b = e_b, e_a + + q.wait() + dt = time.time() - t0 + + return dt, None, None + + +def main(): + parser = argparse.ArgumentParser(description="Process some integers.") + parser.add_argument( + "--n", + type=int, + default=2_000_000, + help="an integer for the input array", + ) + parser.add_argument( + "--n_itr", type=int, default=100, help="number of iterations" + ) + parser.add_argument("--reps", type=int, default=5, help="number of repeats") + parser.add_argument( + "--algo", + type=str, + default="pipeline", + choices=["pipeline", "serial"], + help="algo", + ) + + args = parser.parse_args() + + print( + "timing %d elements for %d iterations" % (args.n, args.n_itr), + flush=True, + ) + + print("using %f MB of memory" % (args.n * 4 / 1024 / 1024), flush=True) + + a = np.arange(args.n, dtype=np.float32) + + algo_func = { + "pipeline": run_pipeline, + "serial": run_serial, + }.get(args.algo) + + for _ in range(args.reps): + dtp = algo_func(a, args.n_itr) + print(f"{args.algo} time tot|pci|cmp|speedup: {dtp}", flush=True) + + +if __name__ == "__main__": + main()