Skip to content

Commit

Permalink
Reuse response allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
pskiran1 committed Jan 20, 2025
1 parent 596925a commit c9bcbd9
Showing 1 changed file with 82 additions and 30 deletions.
112 changes: 82 additions & 30 deletions src/grpc/infer_handler.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -102,9 +102,9 @@ struct RequestReleasePayload final {
//
// ResponseQueue
//
// A simple queue holding the responses to be written. Uses a
// vector of persistent message objects to prevent allocating
// memory for each response to be written.
// This class implements a queue to manage responses that need to be written.
// It internally uses a reusable pool of persistent message objects to avoid
// allocating memory for each response individually.
//
template <typename ResponseType>
class ResponseQueue {
Expand All @@ -113,19 +113,29 @@ class ResponseQueue {

~ResponseQueue()
{
// Delete all responses in the reusable pool
for (auto response : reusable_pool_) {
delete response;
}

// Delete all responses currently in the queue
for (auto response : responses_) {
delete response;
}
}

// Resets the queue
// Resets the queue to its initial state
void Reset()
{
std::lock_guard<std::mutex> lock(mtx_);
alloc_count_ = 0;
ready_count_ = 0;
current_index_ = 0;
for (auto response : responses_) {
response->Clear();
pop_count_ = 0;

while (!responses_.empty()) {
responses_.front()->Clear();
reusable_pool_.push_back(responses_.front());
responses_.pop_front();
}
}

Expand All @@ -137,17 +147,27 @@ class ResponseQueue {
std::lock_guard<std::mutex> lock(mtx_);
alloc_count_ = 1;
if (responses_.size() < 1) {
responses_.push_back(new ResponseType());
if (!reusable_pool_.empty()) {
responses_.push_back(reusable_pool_.front());
reusable_pool_.pop_front();
} else {
responses_.push_back(new ResponseType());
}
}
return responses_[0];
}

// Allocates a response on the head of the queue
// Allocates a response at the end of the queue
void AllocateResponse()
{
std::lock_guard<std::mutex> lock(mtx_);
alloc_count_++;
if (responses_.size() < alloc_count_) {

// Use a response from the reusable pool if available
if (!reusable_pool_.empty()) {
responses_.push_back(reusable_pool_.front());
reusable_pool_.pop_front();
} else {
responses_.push_back(new ResponseType());
}
}
Expand All @@ -156,12 +176,15 @@ class ResponseQueue {
ResponseType* GetLastAllocatedResponse()
{
std::lock_guard<std::mutex> lock(mtx_);
if (responses_.size() < alloc_count_) {

// Ensure that the requested response has been allocated
if ((responses_.size() + pop_count_) < alloc_count_) {
LOG_ERROR
<< "[INTERNAL] Attempting to access the response not yet allocated";
return nullptr;
}
return responses_[alloc_count_ - 1];

return responses_.back();
}

// Marks the next non-ready response complete
Expand All @@ -178,64 +201,93 @@ class ResponseQueue {
return true;
}

// Gets the current response from the tail of
// the queue.
// Gets the current response from the front of the queue
ResponseType* GetCurrentResponse()
{
std::lock_guard<std::mutex> lock(mtx_);
if (current_index_ >= ready_count_) {
if (pop_count_ >= ready_count_) {
LOG_ERROR << "[INTERNAL] Attempting to access current response when it "
"is not ready";
return nullptr;
}
return responses_[current_index_];
if (responses_.empty()) {
LOG_ERROR << "[INTERNAL] No responses are available in the queue.";
return nullptr;
}

return responses_.front();
}

// Gets the response at the specified index
ResponseType* GetResponseAt(const uint32_t index)
{
std::lock_guard<std::mutex> lock(mtx_);

// Check if the index is valid for allocated responses
if (index >= alloc_count_) {
LOG_ERROR << "[INTERNAL] Attempting to access response which is not yet "
"allocated";
return nullptr;
}
return responses_[index];
if (index < pop_count_) {
LOG_ERROR << "[INTERNAL] Attempting to access a response that has "
"already been removed from the queue.";
return nullptr;
}

// Adjust index based on number of popped responses to get actual index in
// 'responses_'
return responses_[index - pop_count_];
}

// Pops the response from the tail of the queue
// Removes the current response from the front of the queue
void PopResponse()
{
std::lock_guard<std::mutex> lock(mtx_);
current_index_++;

// Ensure there are responses in the queue to pop
if (responses_.empty()) {
LOG_ERROR << "[INTERNAL] No responses in the queue to pop.";
return;
}

// Clear and move the current response to the reusable pool
auto response = responses_.front();
response->Clear();
reusable_pool_.push_back(response);
responses_.pop_front();
pop_count_++;
}

// Returns whether the queue is empty
bool IsEmpty()
{
std::lock_guard<std::mutex> lock(mtx_);
return ((alloc_count_ == ready_count_) && (alloc_count_ == current_index_));
return (
(alloc_count_ == ready_count_) && (alloc_count_ == pop_count_) &&
responses_.empty());
}

// Returns whether the queue has responses
// ready to be written.
bool HasReadyResponse()
{
std::lock_guard<std::mutex> lock(mtx_);
return (ready_count_ > current_index_);
return (ready_count_ > pop_count_);
}

private:
std::vector<ResponseType*> responses_;
// Stores responses that need to be written. The front of the queue indicates
// the current response, while the back indicates the last allocated response.
std::deque<ResponseType*> responses_;
// Stores completed responses that can be reused
std::deque<ResponseType*> reusable_pool_;
std::mutex mtx_;

// There are three indices to track the responses in the queue
// Tracks the allocated response
uint32_t alloc_count_;
// Tracks the response that is ready to be written
uint32_t ready_count_;
// Tracks the response next in the queue to be written
uint32_t current_index_;
// Three counters are used to track and manage responses in the queue
uint32_t alloc_count_; // Number of allocated responses
uint32_t ready_count_; // Number of ready-to-write responses
uint32_t pop_count_; // Number of removed responses from the queue
};


Expand Down

0 comments on commit c9bcbd9

Please sign in to comment.