Skip to content

Commit

Permalink
lci pp: add option to enable/disable in-buffer assembly of header mes…
Browse files Browse the repository at this point in the history
…sages
  • Loading branch information
JiakunYan committed Sep 25, 2023
1 parent bcc714a commit 0c003b2
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace hpx::parcelset::policies::lci {
static int ndevices;
// How many completion managers to use
static int ncomps;
// Whether to enable in-buffer assembly for the header messages.
static bool enable_in_buffer_assembly;

static void init_config(util::runtime_configuration const& rtcfg);
};
Expand Down
31 changes: 31 additions & 0 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,37 @@ namespace hpx::parcelset::policies::lci {
pos_piggy_back_address = 8 * sizeof(value_type) + 2
};

template <typename buffer_type, typename ChunkType>
static size_t get_header_size(
parcel_buffer<buffer_type, ChunkType> const& buffer,
size_t max_header_size) noexcept
{
HPX_ASSERT(max_header_size >= pos_piggy_back_address);

size_t current_header_size = pos_piggy_back_address;
if (buffer.data_.size() <= (max_header_size - current_header_size))
{
current_header_size += buffer.data_.size();
}
int num_zero_copy_chunks = buffer.num_chunks_.first;
[[maybe_unused]] int num_non_zero_copy_chunks =
buffer.num_chunks_.second;
if (num_zero_copy_chunks != 0)
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
{
current_header_size += tchunk_size;
}
}
return current_header_size;
}

template <typename buffer_type, typename ChunkType>
header(parcel_buffer<buffer_type, ChunkType> const& buffer,
char* header_buffer, size_t max_header_size) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ namespace hpx::traits {
"prepost_recv_num = 1\n"
"reg_mem = 1\n"
"ndevices = 1\n"
"ncomps = 1\n";
"ncomps = 1\n"
"enable_in_buffer_assembly = 1\n";
}
};
} // namespace hpx::traits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace hpx::parcelset::policies::lci {
hpx::chrono::high_resolution_timer timer_;
header header_;
LCI_mbuffer_t header_buffer;
std::vector<char> header_buffer_vector;
bool need_send_data;
bool need_send_tchunks;
LCI_tag_t tag;
Expand Down
2 changes: 2 additions & 0 deletions libs/full/parcelport_lci/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace hpx::parcelset::policies::lci {
bool config_t::reg_mem;
int config_t::ndevices;
int config_t::ncomps;
bool config_t::enable_in_buffer_assembly;

void config_t::init_config(util::runtime_configuration const& rtcfg)
{
Expand Down Expand Up @@ -105,6 +106,7 @@ namespace hpx::parcelset::policies::lci {
reg_mem = util::get_entry_as(rtcfg, "hpx.parcel.lci.reg_mem", 1);
ndevices = util::get_entry_as(rtcfg, "hpx.parcel.lci.ndevices", 1);
ncomps = util::get_entry_as(rtcfg, "hpx.parcel.lci.ncomps", 1);
enable_in_buffer_assembly = util::get_entry_as(rtcfg, "hpx.parcel.lci.enable_in_buffer_assembly", 1);

if (!enable_send_immediate && enable_lci_backlog_queue)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,21 @@ namespace hpx::parcelset::policies::lci {
postprocess_handler_ = HPX_MOVE(parcel_postprocess);

// build header
while (LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE);
header_ = header(
buffer_, (char*) header_buffer.address, header_buffer.length);
header_buffer.length = header_.size();
if (config_t::enable_in_buffer_assembly)
{
while (
LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE);
header_ = header(
buffer_, (char*) header_buffer.address, header_buffer.length);
header_buffer.length = header_.size();
} else {
header_buffer_vector.resize(header::get_header_size(
buffer_, LCI_MEDIUM_SIZE));
header_ = header(buffer_, static_cast<char*>(header_buffer_vector.data()),
header_buffer_vector.size());
}
HPX_ASSERT((header_.num_zero_copy_chunks() == 0) ==
buffer_.transmission_chunks_.empty());
need_send_data = false;
Expand Down Expand Up @@ -85,6 +94,13 @@ namespace hpx::parcelset::policies::lci {
"Rank %d Wrap around!\n", LCI_RANK);
header_.set_device_idx(device_p->idx);
header_.set_tag(tag);
if (!config_t::enable_in_buffer_assembly) {
while (
LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
memcpy(header_buffer.address, header_buffer_vector.data(), header_buffer_vector.size());
header_buffer.length = header_buffer_vector.size();
}
send_chunks_idx = 0;
completion = nullptr;
segment_to_use = LCI_SEGMENT_ALL;
Expand Down
6 changes: 6 additions & 0 deletions tests/performance/network/pingpong_performance2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
return 0;
}

if (window == 0)
{
std::cout << "window is 0!" << std::endl;
return 0;
}

std::vector<hpx::id_type> localities = hpx::find_remote_localities();

hpx::id_type to;
Expand Down

0 comments on commit 0c003b2

Please sign in to comment.