-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shared tensor mechanism for avoiding tensor serialization/ipc costs #58
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for the PR with an amazing speedup!
The description is so kind and the code is pretty clean.
I've left a few minor comments. Please check them out!
runner_model.py
Outdated
@@ -17,6 +17,11 @@ def input_shape(self): | |||
"""Returns the expected shape of the input tensor to this model.""" | |||
raise NotImplementedError | |||
|
|||
@staticmethod | |||
def output_shape(): | |||
"""Returns the expected shape of the output tensor of this model.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed previously, could we add the description for the case of None
in the input/output shape (i.e., not a Tensor at the dimension)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change should be applied to input_shape()
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yunseong I've addressed your comments, thank you very much! Could you take another look? I've added quite a few changes in both code and comments, regarding the tensor/non-tensor generalization.
runner_model.py
Outdated
@@ -17,6 +17,11 @@ def input_shape(self): | |||
"""Returns the expected shape of the input tensor to this model.""" | |||
raise NotImplementedError | |||
|
|||
@staticmethod | |||
def output_shape(): | |||
"""Returns the expected shape of the output tensor of this model.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
Sorry for the extra commits, I left out a few details. I'm really ready for a review now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me. Thank you so much for fixing this issue.
I'm merging this.
Closes #57.
This PR implements the solution described in #57, using shared tensors and integer signals to pass intermediate tensor values instead of directly serializing tensors and sending them through queues (which are linux pipes, internally).
First, the main benchmark process examines the given pipeline and creates all shared tensors in advance. Each runner process is given a certain amount of output tensors to write their outputs. The device of each output tensor is set to be equal to their corresponding processes, and the shape is assumed to be given by the model implementation (
RunnerModel.output_shape()
). Note that all output tensors for step N are given to all processes of step N+1 as input tensors. The newly added classBenchmarkTensors
acts as an abstraction for managing such shared tensors.The number of output tensors per process can be configured via the new command line argument
-t
/--tensors_per_process
, which has been added to the log directory format as well.Now, the information that needs to be passed through queues is an index indicating "which shared tensor is ready." I've implemented the index simply as a tuple of two integers: one for denoting the process index (there can be multiple processes per step), and one for denoting the tensor index (out of
tensors_per_process
tensors). Each process writes its outputs to one of itstensors_per_process
output tensors and sends the correct index afterwards. In case alltensors_per_process
tensors have been used up, it goes back to the first tensor and repeats the whole procedure, sort of like a ring buffer. To ensure that the process does not write to an output tensor that has not yet been consumed by a process from the next step, I've added amultiprocessing.Event
object to every shared tensor so that consumers can notify waiting producers. Check the comments in the newTensorEvent
class and the usages inrunner.py
for more details.Test:
Before this PR, the average time taken between
inference0_finish
andrunner1_start
(the ipc part) was 13.010609 ms. After applying this PR, the same section takes only 2.661710 ms.