Skip to content

Commit

Permalink
from Ex
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSze1 committed Aug 13, 2024
1 parent 2e2f097 commit 95355f0
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/base_cmd.cc
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) {
auto& key_to_conns = g_pikiwidb->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
// no client is waitting for this key
return;
}
read_latch.unlock();

Expand Down
7 changes: 3 additions & 4 deletions src/base_cmd.h
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(PClient* client);
void ServeAndUnblockConns(PClient* client);

protected:
// Execute a specific command
Expand Down Expand Up @@ -370,10 +370,9 @@ class BaseCmdGroup : public BaseCmd {
class BlockedConnNode {
public:
virtual ~BlockedConnNode() {}
BlockedConnNode(int64_t expire_time, PClient* client)
: expire_time_(expire_time), client_(client) {}
BlockedConnNode(int64_t expire_time, PClient* client) : expire_time_(expire_time), client_(client) {}
bool IsExpired();
PClient* GetBlockedClient() {return client_;}
PClient* GetBlockedClient() { return client_; }

private:
int64_t expire_time_;
Expand Down
8 changes: 4 additions & 4 deletions src/cmd_list.cc
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
*/

#include "cmd_list.h"
#include "pikiwidb.h"
#include "pstd_string.h"
#include "store.h"
#include "pikiwidb.h"

namespace pikiwidb {
LPushCmd::LPushCmd(const std::string& name, int16_t arity)
Expand Down Expand Up @@ -141,7 +141,7 @@ bool BLPopCmd::DoInitial(PClient* client) {
constexpr int64_t seconds_of_ten_years = 10 * 365 * 24 * 3600;
if (timeout < 0 || timeout > seconds_of_ten_years) {
client->SetRes(CmdRes::kErrOther,
"timeout can't be a negative value and can't exceed the number of seconds in 10 years");
"timeout can't be a negative value and can't exceed the number of seconds in 10 years");
return false;
}

Expand All @@ -161,13 +161,13 @@ void BLPopCmd::DoCmd(PClient* client) {
client->AppendString(client->Key());
client->AppendString(elements[0]);
return;
} else if (s.IsNotFound()){
} else if (s.IsNotFound()) {
BlockThisClientToWaitLRPush(elements, expire_time_, client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}

void BLPopCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, PClient* client) {
std::unique_lock<std::shared_mutex> latch(g_pikiwidb->GetBlockMtx());
Expand Down
4 changes: 2 additions & 2 deletions src/pikiwidb.cc
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ bool PikiwiDB::Init() {

event_server_->InitTimer(10);

//auto task = std::bind(&PikiwiDB::ScanExpiredBlockedConnsOfBlrpop, this);
//loop->ScheduleRepeatedly(250, task);
// auto task = std::bind(&PikiwiDB::ScanExpiredBlockedConnsOfBlrpop, this);
// loop->ScheduleRepeatedly(250, task);

auto timerTask = std::make_shared<net::CommonTimerTask>(1000);
timerTask->SetCallback([]() { PREPL.Cron(); });
Expand Down
9 changes: 5 additions & 4 deletions src/pikiwidb.h
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class PikiwiDB final {
event_server_->SendPacket(client, std::move(msg));
}

std::unordered_map<pikiwidb::BlockKey, std::unique_ptr<std::list<pikiwidb::BlockedConnNode>>, pikiwidb::BlockKeyHash>& GetMapFromKeyToConns() {
std::unordered_map<pikiwidb::BlockKey, std::unique_ptr<std::list<pikiwidb::BlockedConnNode>>, pikiwidb::BlockKeyHash>&
GetMapFromKeyToConns() {
return key_to_blocked_conns_;
}

std::shared_mutex& GetBlockMtx() { return block_mtx_; };

void ScanExpiredBlockedConnsOfBlrpop();


inline void SendPacket2Client(const std::shared_ptr<pikiwidb::PClient>& client, std::string&& msg) {
event_server_->SendPacket(client, std::move(msg));
}
Expand All @@ -84,15 +84,16 @@ class PikiwiDB final {

private:
pikiwidb::CmdThreadPool cmd_threads_;

/*
* Blpop/BRpop used
*/
/* key_to_blocked_conns_:
* mapping from key to a list that stored the nodes of client-connections that
* were blocked by command blpop/brpop with key.
*/
std::unordered_map<pikiwidb::BlockKey, std::unique_ptr<std::list<pikiwidb::BlockedConnNode>>, pikiwidb::BlockKeyHash> key_to_blocked_conns_;
std::unordered_map<pikiwidb::BlockKey, std::unique_ptr<std::list<pikiwidb::BlockedConnNode>>, pikiwidb::BlockKeyHash>
key_to_blocked_conns_;

/*
* latch of above map.
Expand Down

0 comments on commit 95355f0

Please sign in to comment.