Skip to content

Commit

Permalink
fix multi bug and compatible ACL test
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyuecai committed Jul 19, 2024
1 parent afa99e1 commit 7ff7a4b
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 433 deletions.
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class PikaClientConn : public net::RedisConn {
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetAllTxnFailed();
void SetTxnFailedIfKeyExists(const std::string target_db_name = "");
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();
bool IsInTxn();
Expand Down
2 changes: 1 addition & 1 deletion pikatests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function setup_pika_bin {
exit 1
fi
cp $PIKA_BIN src/redis-server
cp conf/pika.conf tests/assets/default.conf
cp tests/conf/pika.conf tests/assets/default.conf
}


Expand Down
55 changes: 38 additions & 17 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,29 +186,29 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
}

// Process Command
c_ptr->Execute();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) {
LOG(INFO) << c_ptr->name();
if (c_ptr->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(c_ptr);
SetTxnFailedFromDBs(flushdb->GetFlushDBname());
SetTxnFailedIfKeyExists(flushdb->GetFlushDBname());
} else if (c_ptr->name() == kCmdNameFlushall) {
SetAllTxnFailed();
SetTxnFailedIfKeyExists();
} else {
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
key = c_ptr->db_name().append(key);
key = c_ptr->db_name().append("_").append(key);
}
SetTxnFailedFromKeys(table_keys);
}
}

// Process Command
c_ptr->Execute();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, c_ptr->GetDoDuration());
}
Expand Down Expand Up @@ -394,14 +394,35 @@ void PikaClientConn::SetTxnFailedFromKeys(const std::vector<std::string>& db_key
}
}

void PikaClientConn::SetAllTxnFailed() {
// if key in target_db exists, then the key been watched multi will be failed
void PikaClientConn::SetTxnFailedIfKeyExists(std::string target_db_name) {
auto dispatcher = dynamic_cast<net::DispatchThread*>(server_thread());
if (dispatcher != nullptr) {
auto involved_conns = dispatcher->GetAllTxns();
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr) {
LOG(INFO) << "SetAllTxnFailed";
c->SetTxnWatchFailState(true);
if (dispatcher == nullptr) {
return;
}
auto involved_conns = dispatcher->GetAllTxns();
for (auto& conn : involved_conns) {
std::shared_ptr<PikaClientConn> c;
if (c = std::dynamic_pointer_cast<PikaClientConn>(conn); c == nullptr) {
continue;
}

for (const auto& db_key : c->watched_db_keys_) {
size_t pos = db_key.find('_');
if (pos == std::string::npos) {
continue;
}

auto db_name = db_key.substr(0, pos);
auto key = db_key.substr(pos + 1);

if (target_db_name == "" || target_db_name == "all" || target_db_name == db_name) {
auto db = g_pika_server->GetDB(db_name);
// if watched key exists, set watch state to failed
if (db->storage()->Exists({key}) > 0) {
c->SetTxnWatchFailState(true);
break;
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ void ExecCmd::Do() {
if (cmd->name() == kCmdNameFlushall) {
auto flushall = std::dynamic_pointer_cast<FlushallCmd>(cmd);
flushall->FlushAllWithoutLock();
client_conn->SetAllTxnFailed();
client_conn->SetTxnFailedIfKeyExists();
} else if (cmd->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(cmd);
flushdb->FlushAllDBsWithoutLock();
if (cmd->res().ok()) {
cmd->res().SetRes(CmdRes::kOk);
}
client_conn->SetTxnFailedFromDBs(each_cmd_info.db_->GetDBName());
client_conn->SetTxnFailedIfKeyExists(each_cmd_info.db_->GetDBName());
} else {
cmd->Do();
if (cmd->res().ok() && cmd->is_write()) {
Expand Down Expand Up @@ -258,7 +258,7 @@ void WatchCmd::DoInitial() {
size_t pos = 1;
while (pos < argv_.size()) {
keys_.emplace_back(argv_[pos]);
db_keys_.push_back(db_name() + argv_[pos++]);
db_keys_.push_back(db_name() + "_" +argv_[pos++]);
}
}

Expand Down
99 changes: 86 additions & 13 deletions tests/conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221.
port : 9221

db-instance-num : 3
db-instance-num : 3
rocksdb-ttl-second : 86400 * 7;
rocksdb-periodic-second : 86400 * 3;

# Random value identifying the Pika server, its string length must be 40.
# If not set, Pika will generate a random string with a length of 40 random characters.
Expand All @@ -25,10 +27,24 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -70,7 +86,7 @@ requirepass :
# [NOTICE] The value of this parameter must match the "requirepass" setting on the master.
masterauth :

# The [password of user], which is empty by default.(Deprecated)
# The [password of user], which is empty by default.
# [NOTICE] If this user password is the same as admin password (including both being empty),
# the value of this parameter will be ignored and all users are considered as administrators,
# in this scenario, users are not subject to the restrictions imposed by the userblacklist.
Expand All @@ -92,7 +108,9 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
databases : 1
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 3

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
# By default, this num is set to 0, which means this feature is [not enabled]
Expand Down Expand Up @@ -219,6 +237,11 @@ slave-priority : 100
# [NOTICE]: compact-interval is prior than compact-cron.
#compact-interval :

# The disable_auto_compactions option is [true | false]
disable_auto_compactions : false

# Rocksdb max_subcompactions
max-subcompactions : 1
# The minimum disk usage ratio for checking resume.
# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume.
# Its default value is 0.7.
Expand Down Expand Up @@ -269,6 +292,7 @@ max-cache-statistic-keys : 0
# a small compact is triggered automatically if the small compaction feature is enabled.
# small-compaction-threshold default value is 5000 and the value range is [1, 100000].
small-compaction-threshold : 5000
small-compaction-duration-threshold : 10000

# The maximum total size of all live memtables of the RocksDB instance that owned by Pika.
# Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB
Expand All @@ -283,6 +307,30 @@ max-write-buffer-size : 10737418240
# If max-write-buffer-num > 3, writing will be slowed down.
max-write-buffer-num : 2

# `min_write_buffer_number_to_merge` is the minimum number of memtables
# that need to be merged before placing the order. For example, if the
# option is set to 2, immutable memtables will only be flushed if there
# are two of them - a single immutable memtable will never be flushed.
# If multiple memtables are merged together, less data will be written
# to storage because the two updates are merged into a single key. However,
# each Get() must linearly traverse all unmodifiable memtables and check
# whether the key exists. Setting this value too high may hurt performance.
min-write-buffer-number-to-merge : 1

# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families
# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when
# process restart.
max-total-wal-size : 1073741824

# rocksdb level0_stop_writes_trigger
level0-stop-writes-trigger : 36

# rocksdb level0_slowdown_writes_trigger
level0-slowdown-writes-trigger : 20

# rocksdb level0_file_num_compaction_trigger
level0-file-num-compaction-trigger : 4

# The maximum size of the response package to client to prevent memory
# exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response.
# Supported Units [K|M|G]. The default unit is in [bytes].
Expand Down Expand Up @@ -328,6 +376,12 @@ max-bytes-for-level-multiplier : 10
# slotmigrate [yes | no]
slotmigrate : no

# slotmigrate thread num
slotmigrate-thread-num : 1

# thread-migrate-keys-num 1/8 of the write_buffer_size_
thread-migrate-keys-num : 64

# BlockBasedTable block_size, default 4k
# block-size: 4096

Expand All @@ -346,6 +400,12 @@ slotmigrate : no
# The slot number of pika when used with codis.
default-slot-num : 1024

# enable-partitioned-index-filters [yes | no]
# When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache`
# and `cache-index-and-filter-blocks` is suggested to be enabled
# https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
# enable-partitioned-index-filters: default no

# whether or not index and filter blocks is stored in block cache
# cache-index-and-filter-blocks: no

Expand All @@ -364,6 +424,10 @@ default-slot-num : 1024
# https://github.com/EighteenZi/rocksdb_wiki/blob/master/Rate-Limiter.md
#######################################################################E#######

# rate limiter mode
# 0: Read 1: Write 2: ReadAndWrite
# rate-limiter-mode : default 1

# rate limiter bandwidth, default 2000MB/s
#rate-limiter-bandwidth : 2097152000

Expand Down Expand Up @@ -416,8 +480,16 @@ default-slot-num : 1024
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1

# Rsync Rate limiting configuration 200MB/s
# Rsync Rate limiting configuration [Default value is 200MB/s]
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000
# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
rsync-timeout-ms : 1000
# The valid range for max-rsync-parallel-num is [1, 4].
# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4.
max-rsync-parallel-num : 4

# The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same
Expand All @@ -432,7 +504,7 @@ cache-num : 16
# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash
cache-type: string, set, zset, list, hash, bit

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
Expand Down Expand Up @@ -498,18 +570,19 @@ cache-lfu-decay-time: 1
#
# aclfile : ../conf/users.acl

# (experimental)
# It is possible to change the name of dangerous commands in a shared environment.
# For instance the CONFIG command may be renamed into something Warning: To prevent
# data inconsistency caused by different configuration files, do not use the rename
# command to modify write commands on the primary and secondary servers. If necessary,
# ensure that the configuration files of the primary and secondary servers are consistent
# In addition, when using the command rename, you must not use "" to modify the command,
# for example, rename-command: FLUSHALL "360flushall" is incorrect; instead, use
# rename-command: FLUSHALL 360flushall is correct. After the rename command is executed,
# for example, rename-command: FLUSHDB "360flushdb" is incorrect; instead, use
# rename-command: FLUSHDB 360flushdb is correct. After the rename command is executed,
# it is most appropriate to use a numeric string with uppercase or lowercase letters
# for example: rename-command : FLUSHALL joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR
# for example: rename-command : FLUSHDB joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR
# Warning: Currently only applies to flushdb, slaveof, bgsave, shutdown, config command
# Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent
#
# Example:
#
# rename-command : FLUSHALL 360flushall
# rename-command : FLUSHDB 360flushdb
# rename-command : FLUSHDB 360flushdb
24 changes: 22 additions & 2 deletions tests/test_helper.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ set ::all_tests {
unit/printver
unit/basic
unit/scan
unit/multi
unit/quit
unit/pubsub
unit/slowlog
Expand All @@ -32,6 +31,7 @@ set ::all_tests {
unit/type/zset
unit/type/string
unit/type/hash
unit/type/multi
unit/type/stream
# unit/expire
# unit/protocol
Expand Down Expand Up @@ -79,7 +79,7 @@ set ::force_failure 0
set ::timeout 600; # 10 minutes without progresses will quit the test.
set ::last_progress [clock seconds]
set ::active_servers {} ; # Pids of active Redis instances.

set ::tls 0
# Set to 1 when we are running in client mode. The Redis test uses a
# server-client model to run tests simultaneously. The server instance
# runs the specified number of client instances that will actually run tests.
Expand Down Expand Up @@ -179,6 +179,26 @@ proc cleanup {} {
if {!$::quiet} {puts "OK"}
}

proc redis_client {args} {
set level 0
if {[llength $args] > 0 && [string is integer [lindex $args 0]]} {
set level [lindex $args 0]
set args [lrange $args 1 end]
}

# create client that won't defers reading reply
set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls]

# select the right db and read the response (OK), or at least ping
# the server if we're in a singledb mode.
if {$::singledb} {
$client ping
} else {
$client select 9
}
return $client
}

proc test_server_main {} {
cleanup
set tclsh [info nameofexecutable]
Expand Down
Loading

0 comments on commit 7ff7a4b

Please sign in to comment.