Skip to content

Commit

Permalink
Add pipelining example
Browse files Browse the repository at this point in the history
  • Loading branch information
ZzEeKkAa committed Jan 3, 2024
1 parent 0301ac8 commit 69546fe
Showing 1 changed file with 145 additions and 0 deletions.
145 changes: 145 additions & 0 deletions numba_dpex/examples/kernel/pipelining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# SPDX-FileCopyrightText: 2020 - 2023 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0

"""
This example shows how you can run kernels asynchronously and pipeline them.
In theory it runs computation and memory copy independently but because memory
copy takes less time than the deviation on computation time, I was unable to
capture any measurable time improvement.
"""

import argparse
import time

import dpctl
import dpnp
import numpy as np

import numba_dpex as dpex
import numba_dpex.experimental as dpex_exp


@dpex_exp.kernel
def async_kernel(x):
idx = dpex.get_global_id(0)

for i in range(1300):
den = x.dtype.type(i + 1)
x[idx] += x.dtype.type(1) / (den * den * den)


def run_serial(host_arr, n_itr):
t0 = time.time()
q = dpctl.SyclQueue()

a_host = dpnp.asarray(host_arr, usm_type="host", sycl_queue=q)
usm_host_data = dpnp.get_usm_ndarray(a_host).usm_data

batch_shape = (n_itr,) + a_host.shape
device_alloc = dpnp.empty(batch_shape, usm_type="device", sycl_queue=q)

for offset in range(n_itr):
_a = device_alloc[offset]
_a_data = dpnp.get_usm_ndarray(_a).usm_data

q.memcpy(_a_data, usm_host_data, usm_host_data.nbytes)

dpex_exp.call_kernel(
async_kernel,
dpex.Range(len(_a)),
_a,
)

dt = time.time() - t0

return dt, None, None


def run_pipeline(host_arr, n_itr):
t0 = time.time()
q = dpctl.SyclQueue()

a_host = dpnp.asarray(host_arr, usm_type="host", sycl_queue=q)
usm_host_data = dpnp.get_usm_ndarray(a_host).usm_data

batch_shape = (n_itr,) + a_host.shape
device_alloc = dpnp.empty(batch_shape, usm_type="device", sycl_queue=q)

e_a = None
e_b = None

for offset in range(n_itr):
_a = device_alloc[offset]
_a_data = dpnp.get_usm_ndarray(_a).usm_data

e_a = q.memcpy_async(
_a_data,
usm_host_data,
usm_host_data.nbytes,
[e_a] if e_a is not None else [],
)

e_a.wait()

_, e_a = dpex_exp.call_kernel_async(
async_kernel,
dpex.Range(len(_a)),
(e_a,),
_a,
)

e_a.wait()

e_a, e_b = e_b, e_a

q.wait()
dt = time.time() - t0

return dt, None, None


def main():
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument(
"--n",
type=int,
default=2_000_000,
help="an integer for the input array",
)
parser.add_argument(
"--n_itr", type=int, default=100, help="number of iterations"
)
parser.add_argument("--reps", type=int, default=5, help="number of repeats")
parser.add_argument(
"--algo",
type=str,
default="pipeline",
choices=["pipeline", "serial"],
help="algo",
)

args = parser.parse_args()

print(
"timing %d elements for %d iterations" % (args.n, args.n_itr),
flush=True,
)

print("using %f MB of memory" % (args.n * 4 / 1024 / 1024), flush=True)

a = np.arange(args.n, dtype=np.float32)

algo_func = {
"pipeline": run_pipeline,
"serial": run_serial,
}.get(args.algo)

for _ in range(args.reps):
dtp = algo_func(a, args.n_itr)
print(f"{args.algo} time tot|pci|cmp|speedup: {dtp}", flush=True)


if __name__ == "__main__":
main()

0 comments on commit 69546fe

Please sign in to comment.