-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadManager.cpp
81 lines (67 loc) · 3 KB
/
ThreadManager.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include <cstdio>
#include <iomanip>
#include "ThreadManager.h"
#include "easylogging++.h"
#include "Logger.h"
void ThreadManager::processRequestMessage(const int receivedMessage[], MPI_Status receivedMessageStatus) {
LOG(INFO) << REC_MESS << REQ_MESS << this->toString() <<
"Request from [" << receivedMessageStatus.MPI_SOURCE << "]";
int receivedClock, receivedWeight;
receivedClock = receivedMessage[0];
receivedWeight = receivedMessage[1];
this->lock();
this->updateClock(receivedClock);
this->addRequestToQueue(
QueueElement(receivedMessageStatus.MPI_SOURCE, receivedClock, receivedWeight));
//TODO It could be moved outside this function
std::stringstream s2;
s2 << REC_MESS << this->toString() << "Queue: ";
for (QueueElement &elem : this->getQueue()) {
s2 << elem.toString() << ",";
}
LOG(INFO) << s2.str();
int *msg = this->constructMessage();
LOG(INFO) << REC_MESS << REQ_MESS << this->toString() << "I will send ACK";
MPI_Send(msg, ThreadManagerBase::MSG_SIZE, MPI_INT, receivedMessageStatus.MPI_SOURCE, ACK, MPI_COMM_WORLD);
LOG(INFO) << REC_MESS << REQ_MESS << this->toString() << "ACK was sent";
this->unlock();
}
void ThreadManager::processAckMessage(MPI_Status receivedMessageStatus) {
LOG(INFO) << REC_MESS << ACK_MESS << this->toString() <<
"Request from [" << receivedMessageStatus.MPI_SOURCE << "]";
this->lock();
this->tabAcks[receivedMessageStatus.MPI_SOURCE] = 1;
LOG(INFO) << REC_MESS << REL_MESS << this->toString() << "Signal";
this->signal();
//TODO It could be moved outside this function
std::stringstream s1, s2, s3;
s1 << REC_MESS << ACK_MESS << this->toString() << "Ack was received from [" << receivedMessageStatus.MPI_SOURCE
<< "] and ACK Table was updated" << endl;
s2 << REC_MESS << ACK_MESS << this->toString() << left << setw(4) << "ID:";
s3 << REC_MESS << ACK_MESS << this->toString() << left << setw(4) << "TAB:";
for (int i = 0; i < this->getSize(); i++) {
s2 << setw(2) << i;
s3 << setw(2) << this->getTabAcks().at(i);
}
s2 << endl;
string result = s1.str() + s2.str() + s3.str();
LOG(INFO) << result;
this->unlock();
}
void ThreadManager::processReleaseMessage(MPI_Status receivedMessageStatus) {
this->lock();
LOG(INFO) << REC_MESS << REL_MESS << this->toString() <<
"Request from [" << receivedMessageStatus.MPI_SOURCE << "]";
this->removeFromQueueById(receivedMessageStatus.MPI_SOURCE);
//TODO It could be moved outside this function
std::stringstream s2;
s2 << REC_MESS << this->toString() << "Queue: ";
for (QueueElement &elem : this->getQueue()) {
s2 << elem.toString() << ",";
}
LOG(INFO) << s2.str();
LOG(INFO) << REC_MESS << REL_MESS << this->toString() << "Signal";
this->signal();
this->unlock();
}
ThreadManager::ThreadManager(int rank, int size, char processorName[]) : ThreadManagerBase(rank, size, processorName) {}