Skip to content

Commit

Permalink
8 node reduce with random allocation works
Browse files Browse the repository at this point in the history
  • Loading branch information
jpkenny committed Aug 22, 2024
1 parent 2cf8651 commit 0492f32
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 32 deletions.
17 changes: 7 additions & 10 deletions src/sst/elements/iris/sumi/sim_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class SumiServer :

void registerProc(int rank, SimTransport* proc){
int app_id = proc->sid().app_;
output.output("SumiServer registering rank %d for app %d", rank, app_id);
SimTransport*& slot = procs_[app_id][rank];
if (slot){
sst_hg_abort_printf("SumiServer: already registered rank %d for app %d on node %d",
Expand Down Expand Up @@ -235,7 +234,8 @@ SimTransport::SimTransport(SST::Params& params, SST::Hg::App* parent, SST::Compo
completion_queues_[0] = std::bind(&DefaultProgressQueue::incoming,
&default_progress_queue_, 0, std::placeholders::_1);
null_completion_notify_ = std::bind(&SimTransport::drop, this, std::placeholders::_1);
rank_ = sid().task_;
//rank_ = sid().task_;
rank_ = os_->addr();
auto* server_lib = parent->os()->lib(server_libname_);
SumiServer* server;
// only do one server per app per node
Expand Down Expand Up @@ -264,18 +264,15 @@ SimTransport::SimTransport(SST::Params& params, SST::Hg::App* parent, SST::Compo
RankMapping::addGlobalMapping(sid().app_, "foo", bar);

output.output("%d", sid().app_);
// rank_mapper_ = RankMapping::globalMapping(sid().app_);
// nproc_ = rank_mapper_->nproc();
nproc_ = 2;
//nproc_ = os_->nranks();
nproc_ = os_->nranks();

auto qos_params = params.get_scoped_params("qos");
auto qos_name = qos_params.find<std::string>("name", "null");
qos_analysis_ = SST::Hg::create<QoSAnalysis>("macro", qos_name, qos_params);

server->registerProc(rank_, this);

// if (!engine_) engine_ = new CollectiveEngine(params, this);
if (!engine_) engine_ = new CollectiveEngine(params, this);

smp_optimize_ = params.find<bool>("smp_optimize", false);
}
Expand All @@ -298,8 +295,8 @@ void
SimTransport::init()
{
if (smp_optimize_){
// engine_->barrier(-1, Message::default_cq);
// engine_->blockUntilNext(Message::default_cq);
engine_->barrier(-1, Message::default_cq);
engine_->blockUntilNext(Message::default_cq);

SumiServer* server = safe_cast(SumiServer, api_parent_app_->os()->lib(server_libname_));
auto& map = server->getProcs(sid().app_);
Expand All @@ -324,7 +321,7 @@ SimTransport::~SimTransport()
bool del = server->unregisterProc(rank_, this);
if (del) delete server;

//if (engine_) delete engine_;
if (engine_) delete engine_;

//if (spy_bytes_) delete spy_bytes_;
//if (spy_num_messages_) delete spy_num_messages_;
Expand Down
5 changes: 4 additions & 1 deletion src/sst/elements/mask-mpi/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
#

comp_LTLIBRARIES = libmask_mpi.la sendrecv.la alltoall.la
comp_LTLIBRARIES = libmask_mpi.la sendrecv.la alltoall.la reduce.la

compdir = $(pkglibdir)

Expand Down Expand Up @@ -81,17 +81,20 @@ library_include_HEADERS = \

sendrecv_la_SOURCES = tests/sendrecv.cc
alltoall_la_SOURCES = tests/alltoall.cc
reduce_la_SOURCES = tests/reduce.cc

EXTRA_DIST = \
tests/testsuite_default_mask_mpi.py \
tests/platform_file_mask_mpi_test.py \
tests/test_alltoall.py \
tests/test_sendrecv.py \
tests/test_reduce.py \
tests/refFiles/test_sendrecv.out

libmask_mpi_la_LDFLAGS = -module -avoid-version
sendrecv_la_LDFLAGS = -module -avoid-version
alltoall_la_LDFLAGS = -module -avoid-version
reduce_la_LDFLAGS = -module -avoid-version

install-exec-hook:
$(SST_REGISTER_TOOL) SST_ELEMENT_SOURCE mask-mpi=$(abs_srcdir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"output_buf_size" : "16kB"
})

platdef.addClassType("network_interface","sst.merlin.interface.LinkControl")
platdef.addClassType("network_interface","sst.merlin.interface.ReorderLinkControl")

platdef.addParamSet("router",{
"link_bw" : "12 GB/s",
Expand Down
95 changes: 95 additions & 0 deletions src/sst/elements/mask-mpi/tests/reduce.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
Copyright 2009-2024 National Technology and Engineering Solutions of Sandia,
LLC (NTESS). Under the terms of Contract DE-NA-0003525, the U.S. Government
retains certain rights in this software.
Sandia National Laboratories is a multimission laboratory managed and operated
by National Technology and Engineering Solutions of Sandia, LLC., a wholly
owned subsidiary of Honeywell International, Inc., for the U.S. Department of
Energy's National Nuclear Security Administration under contract DE-NA0003525.
Copyright (c) 2009-2024, NTESS
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Questions? Contact [email protected]
*/

#define ssthg_app_name reduce

#include <stddef.h>
#include <stdio.h>
#include <iostream>

#include <mask_mpi.h>
#include <mercury/common/skeleton.h>

int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv);

// Get number of processes and check that 8 processes are used
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
if(size != 8)
{
printf("This application is meant to be run with 8 MPI processes.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}

// Get my rank
int my_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

// Define my value
int values[8];
for(int i = 0; i < 8; i++)
{
values[i] = my_rank;
}
printf("Process %d, my value = %d\n", my_rank, values[my_rank]);
for (int i = 0; i < 8; i++) {
printf("%d ", values[i]);
}
printf("\n");

MPI_Barrier(MPI_COMM_WORLD);

int recv_values[8];
MPI_Reduce(values, recv_values, 8, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
if (my_rank == 0) {
if (recv_values[0] == recv_values[7] && recv_values[0] == 28)
printf("SUCCESS! %d\n",recv_values[0]);
}

MPI_Finalize();

return EXIT_SUCCESS;
}
44 changes: 44 additions & 0 deletions src/sst/elements/mask-mpi/tests/test_reduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python
#
# Copyright 2009-2024 NTESS. Under the terms
# of Contract DE-NA0003525 with NTESS, the U.S.
# Government retains certain rights in this software.
#
# Copyright (c) 2009-2024, NTESS
# All rights reserved.
#
# This file is part of the SST software package. For license
# information, see the LICENSE file in the top level directory of the
# distribution.

import sst
from sst.merlin.base import *
from sst.merlin.endpoint import *
from sst.merlin.interface import *
from sst.merlin.topology import *
from sst.hg import *

if __name__ == "__main__":

PlatformDefinition.loadPlatformFile("platform_file_mask_mpi_test")
PlatformDefinition.setCurrentPlatform("platform_mask_mpi_test")
platform = PlatformDefinition.getCurrentPlatform()

platform.addParamSet("operating_system", {
"verbose" : "0",
"app1.name" : "reduce",
"app1.exe" : "reduce.so",
"app1.apis" : ["systemAPI:libsystemapi.so", "SimTransport:libsumi.so", "MpiApi:libmask_mpi.so"],
})

topo = topoSingle()
topo.link_latency = "20ns"
topo.num_ports = 32

ep = HgJob(0,8)

system = System()
system.setTopology(topo)
system.allocateNodes(ep,"random",42)

system.build()
8 changes: 4 additions & 4 deletions src/sst/elements/mercury/components/nic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ NIC::NIC(uint32_t id, SST::Params& params, Node* parent) :
SST::Hg::SubComponent(id),
//NIC::NIC(uint32_t id, SST::Params& params, Node* parent) :
// ConnectableSubcomponent(id, "nic", parent),
parent_(parent),
my_addr_(parent->addr()-1),
parent_(parent),
my_addr_(parent->os()->addr()),
// logp_link_(nullptr),
// spy_bytes_(nullptr),
// xmit_flows_(nullptr),
Expand Down Expand Up @@ -200,8 +200,8 @@ NIC::incomingPacket(int vn){
auto* payload = myreq->takePayload();
MessageEvent* ev = payload ? static_cast<MessageEvent*>(payload) : nullptr;
Flow* flow = cq_.recv(myreq->flow_id, bytes, ev ? ev->msg() : nullptr);
// nic_debug("receiving packet of size %d for flow %lu on vn %d: %s",
// (myreq->size_in_bits/8), myreq->flow_id, vn, (flow ? flow->toString().c_str() : "no flow"));
// printf("Rank %d receiving packet of size %d for flow %lu on vn %d: %s",
// my_addr_,(myreq->size_in_bits/8), myreq->flow_id, vn, (flow ? flow->toString().c_str() : "no flow"));
if (flow){
auto* msg = static_cast<NetworkMessage*>(flow);
// nic_debug("fully received message %s", msg->toString().c_str());
Expand Down
5 changes: 3 additions & 2 deletions src/sst/elements/mercury/components/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extern template SST::TimeConverter* HgBase<SST::Component>::time_converter_;
Node::Node(ComponentId_t id, Params &params)
: SST::Hg::Component(id), nic_(0) {

my_addr_ = getId();
my_addr_ = params.find<unsigned int>("logicalID",-1);
unsigned int verbose = params.find<unsigned int>("verbose",0);
out_ = std::unique_ptr<SST::Output>(new SST::Output(sprintf("Node%d:",my_addr_), verbose, 0, Output::STDOUT));

Expand All @@ -49,7 +49,8 @@ Node::Node(ComponentId_t id, Params &params)
netLink_ = configureLink("network");
}


unsigned int nranks = params.find<unsigned int>("nranks",-1);
os_->set_nranks(nranks);

int ncores_ = params.find<std::int32_t>("ncores", 1);
int nsockets_ = params.find<std::int32_t>("nsockets",1);
Expand Down
2 changes: 1 addition & 1 deletion src/sst/elements/mercury/components/operating_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ OperatingSystem::OperatingSystem(SST::ComponentId_t id, SST::Params& params, Nod
active_os_.resize(num_ranks.thread);
}

my_addr_ = node_ ? node_->addr() : 0;
my_addr_ = node_->addr();
//auto os_params = params.get_scoped_params("operating_system");
//params.print_all_params(std::cerr);
unsigned int verbose = params.find<unsigned int>("verbose",0);
Expand Down
11 changes: 8 additions & 3 deletions src/sst/elements/mercury/components/operating_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class OperatingSystem : public SST::Hg::SubComponent {
/// to this context on every context switch.
ThreadContext *des_context_;

int nranks_;
Node* node_;
Thread* active_thread_;
Thread* blocked_thread_;
Expand Down Expand Up @@ -242,9 +243,13 @@ class OperatingSystem : public SST::Hg::SubComponent {
// return NodeId( rank_mapper_->mapRank(rank) );
// }

// int32_t nranks() {
// return rank_mapper_->getWorldSize();
// }
void set_nranks(int32_t ranks) {
nranks_ = ranks;
}

int32_t nranks() {
return nranks_;
}

// SST::Ember::EmberRankMap* rank_mapper_;

Expand Down
21 changes: 11 additions & 10 deletions src/sst/elements/mercury/pymercury.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
from sst.merlin import *

class HgJob(Job):
def __init__(self, job_id, num_nodes, numCores = 1, nicsPerNode = 1):
Job.__init__(self,job_id,num_nodes * nicsPerNode)
def __init__(self, job_id, numNodes, numCores = 1, nicsPerNode = 1):
Job.__init__(self,job_id,numNodes * nicsPerNode)
self._declareClassVariables(["_numCores","_nicsPerNode","node","nic","os","_params","_numNodes"])
self._numCores = numCores
self._nicsPerNode = nicsPerNode
self._numNodes = num_nodes
self._numNodes = numNodes
self.node = HgNode()
self.os = HgOS()

Expand All @@ -35,15 +35,13 @@ def getName(self):


def build(self, nodeID, extraKeys):
node = self.node.build(nodeID)
logical_id = self._nid_map[nodeID]
node = self.node.build(nodeID,logical_id,self._numNodes * self._numCores)
os = self.os.build(node,"os_slot")
nic = node.setSubComponent("nic_slot", "hg.nic")

# Build NetworkInterface
# FIXME
#logical_id = self._nid_map[nodeID]
logical_id = 0
networkif, port_name = self.network_interface.build(node,"link_control_slot",0,self.job_id,self.size,logical_id,False)
networkif, port_name = self.network_interface.build(node,"link_control_slot",0,self.job_id,self.size,logical_id,True)

return (networkif, port_name)

Expand All @@ -52,8 +50,11 @@ class HgNode(TemplateBase):
def __init__(self):
TemplateBase.__init__(self)

def build(self,nID):
node = sst.Component("node" + str(nID), "hg.node")
def build(self,nid,lid,nranks):
node = sst.Component("node" + str(nid), "hg.node")
node.addParam("nodeID", nid)
node.addParam("logicalID", lid)
node.addParam("nranks", nranks)
return node

class HgOS(TemplateBase):
Expand Down

0 comments on commit 0492f32

Please sign in to comment.