Skip to content

Commit

Permalink
!16993 【PROF】Master: dynolog for dynamic profiling
Browse files Browse the repository at this point in the history
Merge pull request !16993 from liyou_b/dynolog_master
  • Loading branch information
li-you-ran authored and it-is-a-robot committed Dec 18, 2024
1 parent eee88a2 commit 6ea73e1
Show file tree
Hide file tree
Showing 16 changed files with 999 additions and 164 deletions.
2 changes: 1 addition & 1 deletion torch_npu/csrc/profiler/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FILE(GLOB _PROF_SRCS *.cpp)
FILE(GLOB _PROF_SRCS *.cpp dyno/*.h dyno/*.cpp)

LIST(APPEND PROF_SRCS ${_PROF_SRCS})

Expand Down
29 changes: 29 additions & 0 deletions torch_npu/csrc/profiler/dyno/DynoLogNpuMonitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "DynoLogNpuMonitor.h"
#include "utils.h"
namespace torch_npu {
namespace profiler {
bool DynoLogNpuMonitor::Init()
{
if (isInitialized_) {
ASCEND_LOGW("DynoLog npu monitor is initialized !");
return true;
}
bool res = ipcClient_.RegisterInstance(npuId_);
if (res) {
isInitialized_ = true;
ASCEND_LOGI("DynoLog npu monitor initialized success !");
}
return res;
}
std::string DynoLogNpuMonitor::Poll()
{
std::string res = ipcClient_.IpcClientNpuConfig();
if (res.empty()) {
ASCEND_LOGI("Request for dynolog server is empty !");
return "";
}
ASCEND_LOGI("Received NPU configuration successfully");
return res;
}
} // namespace profiler
} // namespace torch_npu
20 changes: 20 additions & 0 deletions torch_npu/csrc/profiler/dyno/DynoLogNpuMonitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once
#include <torch_npu/csrc/toolkit/profiler/common/singleton.h>
#include "MonitorBase.h"
#include "NpuIpcClient.h"
namespace torch_npu {
namespace profiler {
class DynoLogNpuMonitor : public MonitorBase, public torch_npu::toolkit::profiler::Singleton<DynoLogNpuMonitor> {
friend class torch_npu::toolkit::profiler::Singleton<DynoLogNpuMonitor>;
public:
DynoLogNpuMonitor() = default;
bool Init() override;
std::string Poll() override;
void SetNpuId(int id) override { npuId_ = id;}
private:
bool isInitialized_ = false;
int32_t npuId_ = 0;
IpcClient ipcClient_;
};
} // namespace profiler
} // namespace torch_npu
12 changes: 12 additions & 0 deletions torch_npu/csrc/profiler/dyno/MonitorBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once
#include <string>
namespace torch_npu {
namespace profiler {
class MonitorBase {
public:
virtual bool Init() = 0;
virtual std::string Poll() = 0;
virtual void SetNpuId(int id) = 0;
};
} // namespace profiler
} // namespace torch_npu
134 changes: 134 additions & 0 deletions torch_npu/csrc/profiler/dyno/NpuIpcClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include "NpuIpcClient.h"

namespace torch_npu {
namespace profiler {
bool torch_npu::profiler::IpcClient::RegisterInstance(int32_t id)
{
NpuContext context{
.npu = id,
.pid = getpid(),
.jobId = JOB_ID,
};
std::unique_ptr<Message> message = Message::ConstructMessage<decltype(context)>(context, "ctxt");
try {
if (!SyncSendMessage(*message, std::string(DYNO_IPC_NAME))) {
ASCEND_LOGW("Failed to send register ctxt for pid %d with dyno", context.pid);
return false;
}
} catch (const std::exception &e) {
ASCEND_LOGW("Error when SyncSendMessage %s !", e.what());
return false;
}
ASCEND_LOGI("Resigter pid %d for dynolog success !", context.pid);
return true;
}
std::string IpcClient::IpcClientNpuConfig()
{
int size = pids_.size();
auto *req = (NpuRequest *)malloc(sizeof(NpuRequest) + sizeof(int32_t) * size);
req->type = DYNO_IPC_TYPE;
req->pidSize = size;
req->jobId = JOB_ID;
for (int i = 0; i < size; i++) {
req->pids[i] = pids_[i];
}
std::unique_ptr<Message> message = Message::ConstructMessage<NpuRequest, int32_t>(*req, "req", size);
if (!SyncSendMessage(*message, std::string(DYNO_IPC_NAME))) {
ASCEND_LOGW("Failed to send config to dyno server fail !");
free(req);
req = nullptr;
return "";
}
free(req);
message = PollRecvMessage(MAX_IPC_RETRIES, MAX_SLEEP_US);
if (!message) {
ASCEND_LOGW("Failed to receive on-demand config !");
return "";
}
std::string res = std::string((char *)message->buf.get(), message->metadata.size);
return res;
}
std::unique_ptr<Message> IpcClient::ReceiveMessage()
{
std::lock_guard<std::mutex> wguard(dequeLock_);
if (msgDynoDeque_.empty()) {
return nullptr;
}
std::unique_ptr<Message> message = std::move(msgDynoDeque_.front());
msgDynoDeque_.pop_front();
return message;
}
bool IpcClient::SyncSendMessage(const Message &message, const std::string &destName, int numRetry, int seepTimeUs)
{
if (destName.empty()) {
ASCEND_LOGW("Can not send to empty socket name !");
return false;
}
int i = 0;
std::vector<NpuPayLoad> npuPayLoad{ NpuPayLoad(sizeof(struct Metadata), (void *)&message.metadata),
NpuPayLoad(message.metadata.size, message.buf.get()) };
try {
auto ctxt = ep_.BuildSendNpuCtxt(destName, npuPayLoad, std::vector<int>());
while (!ep_.TrySendMessage(*ctxt) && i < numRetry) {
i++;
usleep(seepTimeUs);
seepTimeUs *= 2;
}
} catch (const std::exception &e) {
ASCEND_LOGW("Error when SyncSendMessage %s !", e.what());
return false;
}
return i < numRetry;
}
bool IpcClient::Recv()
{
try {
Metadata recvMetadata;
std::vector<NpuPayLoad> PeekNpuPayLoad{ NpuPayLoad(sizeof(struct Metadata), &recvMetadata) };
auto peekCtxt = ep_.BuildNpuRcvCtxt(PeekNpuPayLoad);
bool successFlag = false;
try {
successFlag = ep_.TryPeekMessage(*peekCtxt);
} catch (std::exception &e) {
ASCEND_LOGW("ERROR when TryPeekMessage: %s !", e.what());
return false;
}
if (successFlag) {
std::unique_ptr<Message> npuMessage = std::make_unique<Message>(Message());
npuMessage->metadata = recvMetadata;
npuMessage->buf = std::unique_ptr<unsigned char[]>(new unsigned char[recvMetadata.size]);
npuMessage->src = std::string(ep_.GetName(*peekCtxt));
std::vector<NpuPayLoad> npuPayLoad{
NpuPayLoad(sizeof(struct Metadata), (void *)&npuMessage->metadata),
NpuPayLoad(recvMetadata.size, npuMessage->buf.get()) };
auto recvCtxt = ep_.BuildNpuRcvCtxt(npuPayLoad);
try {
successFlag = ep_.TryRcvMessage(*recvCtxt);
} catch (std::exception &e) {
ASCEND_LOGW("Error when TryRecvMsg: %s !", e.what());
return false;
}
if (successFlag) {
std::lock_guard<std::mutex> wguard(dequeLock_);
msgDynoDeque_.push_back(std::move(npuMessage));
return true;
}
}
} catch (std::exception &e) {
ASCEND_LOGW("Error in Recv(): %s !", e.what());
return false;
}
return false;
}
std::unique_ptr<Message> IpcClient::PollRecvMessage(int maxRetry, int sleeTimeUs)
{
for (int i = 0; i < maxRetry; i++) {
if (Recv()) {
return ReceiveMessage();
}
usleep(sleeTimeUs);
}
return nullptr;
}
} // namespace profiler
} // namespace torch_npu
96 changes: 96 additions & 0 deletions torch_npu/csrc/profiler/dyno/NpuIpcClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once
#include <vector>
#include <string>
#include <memory>
#include <mutex>
#include <cstring>
#include <deque>
#include <random>
#include <sstream>
#include "NpuIpcEndPoint.h"
#include "utils.h"
namespace torch_npu {
namespace profiler {
constexpr int TYPE_SIZE = 32;
constexpr int JOB_ID = 0;
constexpr const char *DYNO_IPC_NAME = "dynolog";
constexpr const int DYNO_IPC_TYPE = 3;
constexpr const int MAX_IPC_RETRIES = 5;
constexpr const int MAX_SLEEP_US = 10000;
struct NpuRequest {
int type;
int pidSize;
int64_t jobId;
int32_t pids[0];
};
struct NpuContext {
int32_t npu;
pid_t pid;
int64_t jobId;
};
struct Metadata {
size_t size = 0;
char type[TYPE_SIZE] = "";
};
struct Message {
Metadata metadata;
std::unique_ptr<unsigned char[]> buf;
std::string src;
template <class T> static std::unique_ptr<Message> ConstructMessage(const T &data, const std::string &type)
{
std::unique_ptr<Message> ipcNpuMessage = std::make_unique<Message>(Message());
if (type.size() + 1 > sizeof(ipcNpuMessage->metadata.type)) {
throw std::runtime_error("Type string is too long to fit in metadata.type");
}
memcpy(ipcNpuMessage->metadata.type, type.c_str(), type.size() + 1);
#if __cplusplus >= 201703L
if constexpr (std::is_same<std::string, T>::value == true) {
ipcNpuMessage->metadata.size = data.size();
ipcNpuMessage->buf = std::make_unique<unsigned char[]>(ipcNpuMessage->metadata.size);
memcpy(ipcNpuMessage->buf.get(), data.c_str(), sizeof(data));
return ipcNpuMessage;
}
#endif
static_assert(std::is_trivially_copyable<T>::value);
ipcNpuMessage->metadata.size = sizeof(data);
ipcNpuMessage->buf = std::make_unique<unsigned char[]>(ipcNpuMessage->metadata.size);
memcpy(ipcNpuMessage->buf.get(), &data, sizeof(data));
return ipcNpuMessage;
}

template <class T, class U>
static std::unique_ptr<Message> ConstructMessage(const T &data, const std::string &type, int n)
{
std::unique_ptr<Message> ipcNpuMessage = std::make_unique<Message>(Message());
if (type.size() + 1 > sizeof(ipcNpuMessage->metadata.type)) {
throw std::runtime_error("Type string is too long to fit in metadata.type");
}
memcpy(ipcNpuMessage->metadata.type, type.c_str(), type.size() + 1);
static_assert(std::is_trivially_copyable<T>::value);
static_assert(std::is_trivially_copyable<U>::value);
ipcNpuMessage->metadata.size = sizeof(data) + sizeof(U) * n;
ipcNpuMessage->buf = std::make_unique<unsigned char[]>(ipcNpuMessage->metadata.size);
memcpy(ipcNpuMessage->buf.get(), &data, ipcNpuMessage->metadata.size);
return ipcNpuMessage;
}
};
class IpcClient {
public:
IpcClient(const IpcClient &) = delete;
IpcClient &operator = (const IpcClient &) = delete;
IpcClient() = default;
bool RegisterInstance(int32_t npu);
std::string IpcClientNpuConfig();
private:
std::vector<int32_t> pids_ = GetPids();
NpuIpcEndPoint<0> ep_{ "dynoconfigclient" + GenerateUuidV4() };
std::mutex dequeLock_;
std::deque<std::unique_ptr<Message>> msgDynoDeque_;
std::unique_ptr<Message> ReceiveMessage();
bool SyncSendMessage(const Message &message, const std::string &destName, int numRetry = 10,
int seepTimeUs = 10000);
bool Recv();
std::unique_ptr<Message> PollRecvMessage(int maxRetry, int sleeTimeUs);
};
} // namespace profiler
} // namespace torch_npu
Loading

0 comments on commit 6ea73e1

Please sign in to comment.