-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel.h
173 lines (162 loc) · 4.39 KB
/
channel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#ifndef CHANNEL_H
#define CHANNEL_H
#include <inttypes.h>
#include <math.h>
#include <vector>
#include <unordered_map>
#include "kvincr.h"
template <typename ChannelT, class DerivedT>
class ChannelStoreBase {
public:
void Add(const uint64_t index, const ChannelT value) {
static_cast<DerivedT *>(this)->AddImpl(index, value);
}
ChannelT Get(const uint64_t index) const {
return static_cast<DerivedT *>(this)->GetImpl(index);
}
uint64_t Occupied() const {
return static_cast<const DerivedT *>(this)->Occupied();
}
ChannelT Sum() const {
return static_cast<const DerivedT *>(this)->Sum();
}
};
template <typename ChannelT>
class ChannelStoreSimple :
public ChannelStoreBase< ChannelT, ChannelStoreSimple<ChannelT> >
{
friend class ChannelStoreBase< ChannelT, ChannelStoreSimple<ChannelT> >;
public:
ChannelStoreSimple(const uint64_t total_bins) : bins_(total_bins, 0) { };
uint64_t total_bins() const { return bins_.size(); }
protected:
void AddImpl(const uint64_t index, const ChannelT value) {
bins_[index] += value;
}
ChannelT GetImpl(const uint64_t index) {
return bins_[index];
}
uint64_t Occupied() const {
uint64_t occupied = 0;
for (auto i = bins_.begin(); i != bins_.end(); ++i) {
if (i > 0) occupied++;
}
return occupied;
}
ChannelT Sum() const {
ChannelT sum{};
for (auto i = bins_.begin(); i != bins_.end(); ++i) {
sum += i;
}
return sum;
}
private:
std::vector<ChannelT> bins_;
};
template <typename ChannelT>
class ChannelStoreSparse :
public ChannelStoreBase< ChannelT, ChannelStoreSparse<ChannelT> >
{
friend class ChannelStoreBase< ChannelT, ChannelStoreSparse<ChannelT> >;
public:
ChannelStoreSparse(const uint64_t num) { };
protected:
void AddImpl(const uint64_t index, const ChannelT value) {
bins_[index] += value;
}
ChannelT GetImpl(const uint64_t index) {
ChannelT result{};
auto iter = bins_.find(index);
if (iter != bins_.end())
result = iter->second;
return result;
}
uint64_t Occupied() const {
return bins_.size();
}
ChannelT Sum() const {
ChannelT sum{};
for (auto i = bins_.begin(); i != bins_.end(); ++i) {
sum += i->second;
}
return sum;
}
private:
std::unordered_map<uint64_t, ChannelT> bins_;
};
template <typename ChannelT>
class ChannelStoreKvStore :
public ChannelStoreBase< ChannelT, ChannelStoreKvStore<ChannelT> >
{
friend class ChannelStoreBase< ChannelT, ChannelStoreKvStore<ChannelT> >;
public:
ChannelStoreKvStore(const uint64_t num) : conn_(NULL) { };
~ChannelStoreKvStore() {
KvDisconnect(conn_);
}
bool Connect(const std::string &locator, const uint16_t num_nodes) {
conn_ = KvConnect(locator.c_str(), num_nodes);
return conn_ != NULL;
}
void SetName(const std::string &name) { name_ = name; }
bool Commit() {
const uint16_t max_adds = 10240;
uint64_t bins[max_adds];
int64_t valuesInt[max_adds];
double valuesFloat[max_adds];
uint16_t num_adds = 0;
for (auto i = bins_.begin(); i != bins_.end(); ++i) {
bins[num_adds] = i->first;
if (std::is_integral<ChannelT>::value) {
valuesInt[num_adds] = i->second;
valuesFloat[num_adds] = 0.0;
} else if (std::is_floating_point<ChannelT>::value) {
valuesInt[num_adds] = 0;
valuesFloat[num_adds] = i->second;
} else {
return false;
}
num_adds++;
if (num_adds == max_adds) {
bool retval =
KvIncr(conn_, name_.c_str(), num_adds, bins, valuesInt, valuesFloat);
if (!retval)
return false;
num_adds = 0;
}
}
if (num_adds > 0) {
bool retval =
KvIncr(conn_, name_.c_str(), num_adds, bins, valuesInt, valuesFloat);
if (!retval)
return false;
}
return true;
}
protected:
void AddImpl(const uint64_t index, const ChannelT value) {
bins_[index] += value;
}
ChannelT GetImpl(const uint64_t index) {
ChannelT result{};
auto iter = bins_.find(index);
if (iter != bins_.end())
result = iter->second;
return result;
}
uint64_t Occupied() const {
return bins_.size();
}
ChannelT Sum() const {
ChannelT sum{};
for (auto i = bins_.begin(); i != bins_.end(); ++i) {
sum += i->second;
}
return sum;
}
private:
std::unordered_map<uint64_t, ChannelT> bins_;
std::string name_;
KV_CONNECTION *conn_;
};
#endif