Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared tensor mechanism for avoiding tensor serialization/ipc costs #58

Merged
merged 5 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def sanity_check(args):

# change these if you want to use different client/loader/runner impls
from rnb_logging import logmeta, logroot
from control import TerminationFlag, BenchmarkQueues
from control import TerminationFlag, SharedQueues, SharedTensors
from client import *
from runner import runner

Expand All @@ -152,17 +152,21 @@ def sanity_check(args):
parser.add_argument('-p', '--per_gpu_queue',
help='Whether to place intermediate queues on each GPU',
action='store_true')
parser.add_argument('-t', '--tensors_per_process',
help='Number of shared output tensors per process',
type=positive_int, default=100)
args = parser.parse_args()
print('Args:', args)

sanity_check(args)

job_id = '%s-mi%d-b%d-v%d-qs%d-p%d' % (dt.today().strftime('%y%m%d_%H%M%S'),
args.mean_interval_ms,
args.batch_size,
args.videos,
args.queue_size,
args.per_gpu_queue)
job_id = '%s-mi%d-b%d-v%d-qs%d-p%d-t%d' % (dt.today().strftime('%y%m%d_%H%M%S'),
args.mean_interval_ms,
args.batch_size,
args.videos,
args.queue_size,
args.per_gpu_queue,
args.tensors_per_process)

# do a quick pass through the pipeline to count the total number of runners
with open(args.config_file_path, 'r') as f:
Expand Down Expand Up @@ -191,10 +195,10 @@ def sanity_check(args):
# (mean_interval_ms = 0 is a special case where all videos are put in queues at once)
queue_size = args.queue_size if args.mean_interval_ms > 0 else args.videos + num_runners + 1

# create BenchmarkQueues object for managing queues between steps
benchmark_queues = BenchmarkQueues(Queue, queue_size, pipeline,
args.per_gpu_queue)
filename_queue = benchmark_queues.get_filename_queue()
# create SharedQueues object for managing queues between steps
shared_queues = SharedQueues(Queue, queue_size, pipeline,
args.per_gpu_queue)
filename_queue = shared_queues.get_filename_queue()

video_path_iterator = config['video_path_iterator']

Expand All @@ -210,6 +214,9 @@ def sanity_check(args):
process_client = Process(target=client_impl,
args=client_args)

# create SharedTensors object for managing shared tensors between steps
shared_tensors = SharedTensors(pipeline, args.tensors_per_process)

process_runner_list = []
for step_idx, step in enumerate(pipeline):
is_final_step = step_idx == len(pipeline) - 1
Expand All @@ -222,7 +229,10 @@ def sanity_check(args):
for instance_idx, gpu in enumerate(gpus):
is_first_instance = instance_idx == 0

prev_queue, next_queue = benchmark_queues.get_tensor_queue(step_idx, gpu)
prev_queue, next_queue = shared_queues.get_tensor_queue(step_idx, gpu)

shared_input_tensors, shared_output_tensors = \
shared_tensors.get_tensors(step_idx, instance_idx)

# check the replica index of this particular runner, for this gpu
# if this runner is the first, then give it index 0
Expand All @@ -236,11 +246,12 @@ def sanity_check(args):
process_runner = Process(target=runner,
args=(prev_queue, next_queue,
print_summary,
job_id, gpu, replica_idx,
job_id, gpu, replica_idx, instance_idx,
global_inference_counter, args.videos,
termination_flag, step_idx,
sta_bar, fin_bar,
model),
model, shared_input_tensors,
shared_output_tensors),
kwargs=step)

replica_dict[gpu] = replica_idx + 1
Expand Down
4 changes: 2 additions & 2 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def poisson_client(video_path_iterator, filename_queue, beta, termination_flag,
time_card.record('enqueue_filename')

try:
filename_queue.put_nowait((video_path, time_card))
filename_queue.put_nowait(((None, video_path), time_card))
except Full:
print('[WARNING] Filename queue is full. Aborting...')
termination_flag.value = TerminationFlag.FILENAME_QUEUE_FULL
Expand Down Expand Up @@ -81,7 +81,7 @@ def bulk_client(video_path_iterator, filename_queue, num_videos, termination_fla
time_card.record('enqueue_filename')

try:
filename_queue.put_nowait((video_path, time_card))
filename_queue.put_nowait(((None, video_path), time_card))
except Full:
print('[WARNING] Filename queue is full. Aborting...')
termination_flag.value = TerminationFlag.FILENAME_QUEUE_FULL
Expand Down
103 changes: 102 additions & 1 deletion control.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import torch
from collections import namedtuple
from torch.multiprocessing import Event
from utils.class_utils import load_class


class TerminationFlag:
"""An enum class for representing various termination states."""
UNSET = -1
Expand All @@ -6,7 +12,7 @@ class TerminationFlag:
FRAME_QUEUE_FULL = 2


class BenchmarkQueues:
class SharedQueues:
"""Manages intermediate queues that connect steps in the benchmark.

Args:
Expand Down Expand Up @@ -78,3 +84,98 @@ def get_tensor_queue(self, step_idx, gpu_idx):

def get_filename_queue(self):
return self.filename_queue


class TensorEvent:
"""Basically a tuple of several torch.Tensors and a multiprocessing.Event.

The Tensors can be used as "shared tensors" for passing intermediate tensors
across processes.

The Event should be used to signal that the consumer process has finished
reading from the Tensors. When writing values to Tensors, the producer
process should first check if Tensors are free, by calling event.wait(). If
the Tensors are indeed free, then event.wait() will return at once. If not,
then event.wait() will block until the consumer process calls event.set().
Thus, the consumer should make sure that it calls event.set() AFTER the
Tensors' contents have been copied to a safe area, such as the consumer's own
local tensors.
"""
def __init__(self, shapes, device, dtype=torch.float32):
self.tensors = tuple(torch.empty(*shape, dtype=dtype, device=device)
for shape in shapes)
self.event = Event()
self.event.set()


class SharedTensors:
"""Manages intermediate tensors that are passed across steps in the benchmark.

Args:
pipeline: The whole pipeline info parsed from the input configuration file
num_tensors_per_process: The number of shared output tensors that are given
to each process, for writing tensor values. A big value allows
processes to produce many tensors before having to block, but requires
a lot of GPU memory. A small value saves memory, but results in early
blocking. Note that if a step outputs several tensors during each
iteration, then this class allocates separate memory for each tensor,
but still treats them as one tensor when comparing the count with
num_tensors_per_process.
"""
def __init__(self, pipeline, num_tensors_per_process):
# self.tensors is a 3-level list of TensorEvents, e.g.,
# [
# None, (the first step does not need shared input tensors)
# [ (shared tensors between step 0 & 1)
# [tensorEvent000, tensorEvent001, ...] (outputs of process 0 in step 0)
# [tensorEvent010, tensorEvent011, ...] (outputs of process 1 in step 0)
# [tensorEvent020, tensorEvent021, ...] (outputs of process 2 in step 0)
# ],

# [ (shared tensors between step 1 & 2)
# [tensorEvent100, tensorEvent101, ...] (outputs of process 0 in step 1)
# [tensorEvent110, tensorEvent111, ...] (outputs of process 1 in step 1)
# [tensorEvent120, tensorEvent121, ...] (outputs of process 2 in step 1)
# ],
# ...,
# [None, None, ...] (the last step does not need shared output tensors)
# ]
self.tensors = [None]

# we exclude the last step since the last step does not need output tensors
for step in pipeline[:-1]:
# load the model class to check the output tensor shape of this step
model_module_path = step['model']
model_class = load_class(model_module_path)
shapes = model_class.output_shape()

if shapes is None:
# this step does not need shared output tensors
step_output_tensors = [None for _ in step(['gpus'])]

else:
step_output_tensors = []
for gpu in step['gpus']:
device = torch.device('cuda:%d' % gpu)
tensors = [TensorEvent(shapes, device)
for _ in range(num_tensors_per_process)]
step_output_tensors.append(tensors)

self.tensors.append(step_output_tensors)

# add Nones as output placeholders for the last step
self.tensors.append([None for _ in pipeline[-1]['gpus']])

def get_tensors(self, step_idx, instance_idx):
"""Returns the shared input tensors and output tensors for a given process.

The shared input tensors are returned as a 2-level list, containing the
output tensors of all processes of the previous step. On the other hand,
the output tensors are returned as a 1-level list, since this process does
not need to access the output tensors of other processes from the same step.
"""
return self.tensors[step_idx], self.tensors[step_idx + 1][instance_idx]


# An integer tuple for accessing tensors from SharedTensors.
Signal = namedtuple('Signal', ['instance_idx', 'tensor_idx'])
35 changes: 29 additions & 6 deletions models/r2p1d/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,17 @@ def __init__(self, device, start_index=1, end_index=5, num_classes=400, layer_si
stream.synchronize()

def input_shape(self):
return self.input_dict[self.start_index]
return (self.input_dict[self.start_index],)

@staticmethod
def output_shape():
# TODO: the output shape may not be (10, 400), depending on self.end_index
# need to change return value accordingly
return ((10, 400),)

def __call__(self, input):
return self.model(input)
(tensor,), _ = input
return ((self.model(tensor),), None)

class R2P1DVideoPathIterator(VideoPathIterator):
def __init__(self):
Expand Down Expand Up @@ -122,18 +129,26 @@ def __init__(self, device):
self.loader.flush()

def __call__(self, input):
self.loader.loadfile(input)
_, filename = input
self.loader.loadfile(filename)
for frames in self.loader:
pass
self.loader.flush()

frames = frames.float()
frames = frames.permute(0, 2, 1, 3, 4)
return frames
return ((frames,), None)

def __del__(self):
self.loader.close()

def input_shape(self):
return None

@staticmethod
def output_shape():
return ((10, 3, 8, 112, 112),)


class R2P1DSingleStep(RunnerModel):
"""RunnerModel impl that contains all inference logic regarding R(2+1)D.
Expand Down Expand Up @@ -187,15 +202,23 @@ def __init__(self, device, num_classes=400, layer_sizes=[2,2,2,2],
stream.synchronize()

def __call__(self, input):
self.loader.loadfile(input)
_, filename = input
self.loader.loadfile(filename)
for frames in self.loader:
pass
self.loader.flush()

frames = frames.float()
frames = frames.permute(0, 2, 1, 3, 4)

return self.model(frames)
return ((self.model(frames),), None)

def __del__(self):
self.loader.close()

def input_shape(self):
return None

@staticmethod
def output_shape():
return ((10, 400),)
Loading