Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set dr-test nodes to incoherent_wait #4944

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bbinc/comdb2_machine_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#define __INCLUDED_MACHINE_INFO_H

struct comdb2_machine_info {
int (*machine_is_up)(const char *host);
int (*machine_is_up)(const char *host, int *drtest);
int (*machine_status_init)(void);
int (*machine_class)(const char *host);
int (*machine_my_class)(void);
Expand Down
4 changes: 2 additions & 2 deletions bbinc/rtcpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
#ifndef INCLUDED_RTCPU_H
#define INCLUDED_RTCPU_H

void register_rtcpu_callbacks(int (*a)(const char *), int (*b)(void), int (*c)(const char *), int (*d)(void),
void register_rtcpu_callbacks(int (*a)(const char *, int *), int (*b)(void), int (*c)(const char *), int (*d)(void),
int (*e)(const char *), int (*f)(const char *), int (*g)(const char *, const char **),
int (*h)(const char **), int (*i)(const char *, int *, const char ***),
int (*j)(const char *, const char *));
int machine_is_up(const char *host);
int machine_is_up(const char *host, int *isdrtest);
int machine_class(const char *host);
int machine_my_class(void);
int machine_dc(const char *host);
Expand Down
4 changes: 4 additions & 0 deletions bdb/bdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ enum {
BDB_CALLBACK_SERIALCHECK,
BDB_CALLBACK_ADMIN_APPSOCK,
BDB_CALLBACK_SYNCMODE,
BDB_CALLBACK_NODEUP_DRTEST
};

enum { BDB_REPFAIL_NET, BDB_REPFAIL_TIMEOUT, BDB_REPFAIL_RMTBDB };
Expand Down Expand Up @@ -401,6 +402,9 @@ typedef void (*UNDOSHADOWFP)(struct bdb_osql_log *);
/* Callback to return sync type */
typedef int (*SYNCMODE)(bdb_state_type *);

/* Callback to dr-test aware rtcpu */
typedef int (*NODEUP_DRTEST)(bdb_state_type *, const char *hode, int *isdrtest);

typedef int (*BDB_CALLBACK_FP)();
bdb_callback_type *bdb_callback_create(void);
void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type,
Expand Down
1 change: 1 addition & 0 deletions bdb/bdb_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ struct bdb_callback_tag {
NODEDOWNFP nodedown_rtn;
SERIALCHECK serialcheck_rtn;
SYNCMODE syncmode_rtn;
NODEUP_DRTEST nodeup_drtest_rtn;
};

struct waiting_for_lsn {
Expand Down
4 changes: 4 additions & 0 deletions bdb/callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type,
bdb_callback->syncmode_rtn = (SYNCMODE)callback_rtn;
break;

case BDB_CALLBACK_NODEUP_DRTEST:
bdb_callback->nodeup_drtest_rtn = (NODEUP_DRTEST)callback_rtn;
break;

default:
break;
}
Expand Down
42 changes: 23 additions & 19 deletions bdb/rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2563,24 +2563,22 @@ static void got_new_seqnum_from_node(bdb_state_type *bdb_state,
/* new LSN from node: we may need to make the node coherent */
Pthread_mutex_lock(&(bdb_state->coherent_state_lock));
struct hostinfo *m = retrieve_hostinfo(bdb_state->repinfo->master_host_interned);
int nodeup = 0, is_drtest = 0;

if (change_coherency) {
if (h->coherent_state == STATE_INCOHERENT ||
h->coherent_state == STATE_INCOHERENT_WAIT) {
if (bdb_state->callback->nodeup_rtn) {
if ((bdb_state->callback->nodeup_rtn(bdb_state, host))) {
if (bdb_state->callback->nodeup_drtest_rtn) {
if ((nodeup = bdb_state->callback->nodeup_drtest_rtn(bdb_state, host, &is_drtest)) || is_drtest != 0) {
rc = bdb_wait_for_seqnum_from_node_nowait_int(
bdb_state, &m->seqnum, hostinterned);
if (rc == 0) {
/* prevent a node from becoming coherent for at least
* downgrade_penalty seconds after an event that would
* delay commits (the last downgrade) */
if (downgrade_penalty &&
(gettimeofday_ms() - h->last_downgrade_time) <=
downgrade_penalty) {
set_coherent_state(bdb_state, hostinterned,
STATE_INCOHERENT_WAIT, __func__,
__LINE__);
if ((!nodeup && is_drtest) ||
(downgrade_penalty && (gettimeofday_ms() - h->last_downgrade_time) <= downgrade_penalty)) {
set_coherent_state(bdb_state, hostinterned, STATE_INCOHERENT_WAIT, __func__, __LINE__);
} else {
/* dont send here under lock */
set_coherent_state(bdb_state, hostinterned, STATE_COHERENT,
Expand Down Expand Up @@ -2870,8 +2868,9 @@ static int bdb_wait_for_seqnum_from_node_int(bdb_state_type *bdb_state,
if (fakeincoherent) {
node_is_rtcpu = 1;
}
if (bdb_state->callback->nodeup_rtn)
if (!(bdb_state->callback->nodeup_rtn(bdb_state, host->str)))
int is_drtest = 0;
if (bdb_state->callback->nodeup_drtest_rtn)
if (!(bdb_state->callback->nodeup_drtest_rtn(bdb_state, host->str, &is_drtest)))
node_is_rtcpu = 1;

/* dont wait if it's in a skipped state */
Expand All @@ -2892,21 +2891,26 @@ static int bdb_wait_for_seqnum_from_node_int(bdb_state_type *bdb_state,
Pthread_mutex_lock(&(bdb_state->coherent_state_lock));
if (h->coherent_state == STATE_COHERENT ||
h->coherent_state == STATE_INCOHERENT_WAIT) {
if (h->coherent_state == STATE_COHERENT)

int newstate = is_drtest ? STATE_INCOHERENT_WAIT : STATE_INCOHERENT;
if (h->coherent_state == STATE_COHERENT) {
defer_commits(bdb_state, host->str, __func__);
h->last_downgrade_time = gettimeofday_ms();
set_coherent_state(bdb_state, host, STATE_INCOHERENT, __func__,
__LINE__);
bdb_state->repinfo->skipsinceepoch = comdb2_time_epoch();
}
if (h->coherent_state != newstate) {
h->last_downgrade_time = gettimeofday_ms();
set_coherent_state(bdb_state, host, newstate, __func__, __LINE__);
bdb_state->repinfo->skipsinceepoch = comdb2_time_epoch();
}
}

Pthread_mutex_unlock(&(bdb_state->coherent_state_lock));

if (bdb_state->attr->wait_for_seqnum_trace) {
logmsg(LOGMSG_USER, PR_LSN " %s became incoherent, not waiting\n",
PARM_LSN(seqnum->lsn), host->str);
if (!is_drtest) {
if (bdb_state->attr->wait_for_seqnum_trace) {
logmsg(LOGMSG_USER, PR_LSN " %s became incoherent, not waiting\n", PARM_LSN(seqnum->lsn), host->str);
}
return -2;
}
return -2;
}

Pthread_mutex_lock(&(bdb_state->seqnum_info->lock));
Expand Down
6 changes: 3 additions & 3 deletions db/fdb_boots.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static char *_get_node_initial(int nnodes, char **nodes, int *lcl,
*lcl_nnodes = 0;
*rescpu_nnodes = 0;
for (i = 0; i < nnodes; i++) {
if (!machine_is_up(nodes[i])) {
if (!machine_is_up(nodes[i], NULL)) {
continue;
}

Expand Down Expand Up @@ -255,7 +255,7 @@ static char *_get_node_next(int nnodes, char **nodes, int *lcl, char *arg,
break;

/* ignore rtcpu */
if (!machine_is_up(nodes[i])) {
if (!machine_is_up(nodes[i], NULL)) {
continue;
}

Expand Down Expand Up @@ -548,7 +548,7 @@ int fdb_get_rescpu_nodes(fdb_location_t *loc, int *locals)

rescpued = 0;
for (i = 0; i < loc->nnodes; i++) {
if (machine_is_up(loc->nodes[i])) {
if (machine_is_up(loc->nodes[i], NULL)) {
rescpued++;

if (loc->lcl[i] && locals)
Expand Down
15 changes: 12 additions & 3 deletions db/glue.c
Original file line number Diff line number Diff line change
Expand Up @@ -3123,6 +3123,15 @@ int serial_check_callback(char *tbname, int idxnum, void *key, int keylen,

int getroom_callback(void *dummy, const char *host) { return machine_dc(host); }

static int nodeup_drtest_callback(void *bdb_handle, const char *host, int *is_drtest)
{
extern char *tcmtest_routecpu_down_node;
if (host == tcmtest_routecpu_down_node) {
return 0;
}
return machine_is_up(host, is_drtest);
}

/* callback to report whether node is up or down through rtcpu */
static int nodeup_callback(void *bdb_handle, const char *host)
{
Expand All @@ -3135,7 +3144,7 @@ int is_node_up(const char *host)
if (host == tcmtest_routecpu_down_node) {
return 0;
}
return machine_is_up(host);
return machine_is_up(host, NULL);
}

/* callback to set dynamically configurable election settings */
Expand Down Expand Up @@ -3701,8 +3710,8 @@ int open_bdb_env(struct dbenv *dbenv)
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_WHOISMASTER,
new_master_callback);
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_NODEUP, nodeup_callback);
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_GETROOM,
getroom_callback);
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_NODEUP_DRTEST, nodeup_drtest_callback);
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_GETROOM, getroom_callback);
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_APPSOCK,
appsock_callback);
bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_ADMIN_APPSOCK,
Expand Down
54 changes: 54 additions & 0 deletions db/process_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -2973,6 +2973,60 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
}
logmsg(LOGMSG_WARN, "machine_cache requires find, add or dump\n");
return -1;

} else if (tokcmp(tok, ltok, "fakedr") == 0) {
/* Message-traps to verify behavior for drtesting */
/*
* fakedr add <node>
* fakedr del <node>
* fakedr dump
*/
tok = segtok(line, lline, &st, &ltok);

/* Add */
if (tokcmp(tok, ltok, "add") == 0) {

/* Host */
tok = segtok(line, lline, &st, &ltok);
if (!ltok) {
logmsg(LOGMSG_WARN, "machine_cluster add requires host & cluster\n");
return -1;
}
char *host = alloca(ltok + 1);
tokcpy(tok, ltok, host);

void add_fake_drtest(const char *host);
add_fake_drtest(host);
return 0;
}

/* Del */
if (tokcmp(tok, ltok, "del") == 0) {

/* Host */
tok = segtok(line, lline, &st, &ltok);
if (!ltok) {
logmsg(LOGMSG_WARN, "machine_cluster add requires host & cluster\n");
return -1;
}
char *host = alloca(ltok + 1);
tokcpy(tok, ltok, host);

void del_fake_drtest(const char *host);
del_fake_drtest(host);
return 0;
}

/* Dump */
if (tokcmp(tok, ltok, "dump") == 0) {
void dump_fake_drtest();
dump_fake_drtest();
return 0;
}

logmsg(LOGMSG_WARN, "fakedr requires add, del or dump\n");
return -1;

} else if (tokcmp(tok, ltok, "machine_cluster") == 0) {

/* machine_cluster add <machine-name> <cluster-name>
Expand Down
2 changes: 1 addition & 1 deletion net/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -2977,7 +2977,7 @@ char *net_get_osql_node(netinfo_type *netinfo_ptr)
continue;

/* is rtcpu-ed? */
if (machine_is_up(ptr->host) != 1)
if (machine_is_up(ptr->host, NULL) != 1)
continue;

if (nnodes >= REPMAX)
Expand Down
10 changes: 10 additions & 0 deletions tests/rtcpu_drtest.test/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ifeq ($(TESTSROOTDIR),)
include ../testcase.mk
else
include $(TESTSROOTDIR)/testcase.mk
endif
export CHECK_DB_AT_FINISH=0
ifeq ($(TEST_TIMEOUT),)
export TEST_TIMEOUT=4m
endif

2 changes: 2 additions & 0 deletions tests/rtcpu_drtest.test/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Verify that we wait-for-seqnum nodes which are dr-testing. This verifies correct
behavior for replicant_retry_on_not_durable 1.
2 changes: 2 additions & 0 deletions tests/rtcpu_drtest.test/lrl.options
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
replicant_retry_on_not_durable 1
forbid_remote_admin 0
47 changes: 47 additions & 0 deletions tests/rtcpu_drtest.test/runit
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
bash -n "$0" | exit 1

source ${TESTSROOTDIR}/tools/runit_common.sh
source ${TESTSROOTDIR}/tools/cluster_utils.sh

[[ -z "$CLUSTER" ]] && failexit "This test requires a CLUSTER"

export MASTER=$(get_master)

# Create a table
$CDB2SQL_EXE $DBNAME $CDB2_OPTIONS default "create table t1(a int)"

# Mark all nodes as offline (pretend we are dr-testing them)
for node in $CLUSTER ; do
if [[ "$node" != $MASTER ]]; then
# Tell every other node that this node is offline
for n in $CLUSTER ; do
$CDB2SQL_EXE --admin $DBNAME $CDB2_OPTIONS --host $n "exec procedure sys.cmd.send(\"fakedr add $node\")"
done
fi
done

# Make sure we can insert records
j=0
while [[ $j -lt 10 ]]; do
$CDB2SQL_EXE $DBNAME $CDB2_OPTIONS --host $MASTER "insert into t1 select * from generate_series(1, 1000)"
let j=j+1
done

# Verify that the master has the other nodes listed as 'incoherent_wait'
$CDB2SQL_EXE $DBNAME $CDB2_OPTIONS --host $MASTER "select * from comdb2_cluster"
cnt=$($CDB2SQL_EXE --tabs $DBNAME $CDB2_OPTIONS --host $MASTER "select count(*) from comdb2_cluster where is_master='N' and coherent_state='coherent'")

[[ "$cnt" == 0 ]] || failexit "Expected all non-master nodes to be INCOHERENT_WAIT"

# Mark all nodes as online again for test cleanup
for node in $CLUSTER ; do
if [[ "$node" != $MASTER ]]; then
# Tell every other node that this node is offline
for n in $CLUSTER ; do
$CDB2SQL_EXE --admin $DBNAME $CDB2_OPTIONS --host $n "exec procedure sys.cmd.send(\"fakedr del $node\")"
done
fi
done

echo "Success!"
1 change: 1 addition & 0 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@
(name='disable_exit_on_thread_error', description='don't exit on thread errors', type='BOOLEAN', value='OFF', read_only='N')
(name='disable_inplace_blob_optimization', description='Disables 'enable_inplace_blob_optimization'', type='BOOLEAN', value='OFF', read_only='Y')
(name='disable_inplace_blobs', description='Disables 'enable_inplace_blobs'', type='BOOLEAN', value='OFF', read_only='Y')
(name='disable_legacy_queues', description='Disable legacy queues and route messages through comdb2 queue buddy. (Default: on)', type='BOOLEAN', value='ON', read_only='N')
(name='disable_lowpri_snapisol', description='Disables 'enable_lowpri_snapisol'', type='BOOLEAN', value='ON', read_only='Y')
(name='disable_new_si_overhead', description='return immediately in several new snapisol functions', type='BOOLEAN', value='OFF', read_only='N')
(name='disable_osql_blob_optimization', description='Disables 'enable_osql_blob_optimization'', type='BOOLEAN', value='OFF', read_only='Y')
Expand Down
Loading