Skip to content

Commit

Permalink
Add single thread support to z_api_matching_test
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 4, 2025
1 parent 9002655 commit a718a8a
Showing 1 changed file with 89 additions and 10 deletions.
99 changes: 89 additions & 10 deletions tests/z_api_matching_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,34 @@ static int SUBSCRIBER_TESTS_COUNT = 3;
typedef enum { NONE, MATCH, UNMATCH, DROP } context_state_t;

typedef struct context_t {
#if Z_FEATURE_MULTI_THREAD == 1
z_owned_condvar_t cv;
z_owned_mutex_t m;
#else
z_loaned_session_t* s1;
z_loaned_session_t* s2;
#endif
context_state_t state;
} context_t;

static void _context_init(context_t* c) {
#if Z_FEATURE_MULTI_THREAD == 1
z_condvar_init(&c->cv);
z_mutex_init(&c->m);
#endif
c->state = NONE;
}

static void _context_drop(context_t* c) {
#if Z_FEATURE_MULTI_THREAD == 1
z_condvar_drop(z_condvar_move(&c->cv));
z_mutex_drop(z_mutex_move(&c->m));
#else
(void)c;
#endif
}

#if Z_FEATURE_MULTI_THREAD == 1
static void _context_wait(context_t* c, context_state_t state, unsigned long timeout_s) {
z_mutex_lock(z_mutex_loan_mut(&c->m));
if (c->state != state) {
Expand All @@ -78,17 +90,42 @@ static void _context_wait(context_t* c, context_state_t state, unsigned long tim
c->state = NONE;
z_mutex_unlock(z_mutex_loan_mut(&c->m));
}
#else
static void _context_wait(context_t* c, context_state_t state, unsigned long timeout_s) {
unsigned long tm = timeout_s * 1000;
while (c->state == NONE && tm > 0) {
zp_read(c->s1, NULL);
zp_send_keep_alive(c->s1, NULL);
zp_read(c->s2, NULL);
zp_send_keep_alive(c->s2, NULL);
z_sleep_ms(100);
tm -= 100;
}
if (tm <= 0) {
fprintf(stderr, "Timeout waiting for state %d\n", state);
assert(false);
}
if (c->state != state) {
fprintf(stderr, "Expected state %d, got %d\n", state, c->state);
}
c->state = NONE;
}
#endif

static void _context_notify(context_t* c, context_state_t state) {
#if Z_FEATURE_MULTI_THREAD == 1
z_mutex_lock(z_mutex_loan_mut(&c->m));
#endif
if (c->state != NONE) {
fprintf(stderr, "State already set %d\n", c->state);
assert(false);
}
c->state = state;
fprintf(stderr, "State recieved %d\n", state);
#if Z_FEATURE_MULTI_THREAD == 1
z_condvar_signal(z_condvar_loan_mut(&c->cv));
z_mutex_unlock(z_mutex_loan_mut(&c->m));
#endif
}

#define assert_ok(x) \
Expand Down Expand Up @@ -127,10 +164,15 @@ void test_matching_publisher_sub(bool background) {
assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_start_read_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_read_task(z_loan_mut(s2), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL));
#else
context.s1 = z_loan_mut(s1);
context.s2 = z_loan_mut(s2);
#endif

z_owned_publisher_t pub;
assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL));
Expand Down Expand Up @@ -168,10 +210,12 @@ void test_matching_publisher_sub(bool background) {
if (!background) {
z_matching_listener_drop(z_matching_listener_move(&matching_listener));
}
#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
assert_ok(zp_stop_lease_task(z_loan_mut(s1)));
assert_ok(zp_stop_lease_task(z_loan_mut(s2)));
#endif

z_session_drop(z_session_move(&s1));
z_session_drop(z_session_move(&s2));
Expand All @@ -196,10 +240,15 @@ void test_matching_querier_sub(bool background) {
assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_start_read_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_read_task(z_loan_mut(s2), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL));
#else
context.s1 = z_loan_mut(s1);
context.s2 = z_loan_mut(s2);
#endif

z_owned_querier_t querier;
assert_ok(z_declare_querier(z_session_loan(&s1), &querier, z_view_keyexpr_loan(&k_querier), NULL));
Expand Down Expand Up @@ -238,24 +287,36 @@ void test_matching_querier_sub(bool background) {
z_matching_listener_drop(z_matching_listener_move(&matching_listener));
}

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
assert_ok(zp_stop_lease_task(z_loan_mut(s1)));
assert_ok(zp_stop_lease_task(z_loan_mut(s2)));
#endif

z_session_drop(z_session_move(&s1));
z_session_drop(z_session_move(&s2));

_context_drop(&context);
}

static void _check_publisher_status(z_owned_publisher_t* pub, bool expected) {
static void _check_publisher_status(z_owned_publisher_t* pub, z_loaned_session_t* s1, z_loaned_session_t* s2,
bool expected) {
z_matching_status_t status;
status.matching = !expected;
z_clock_t clock = z_clock_now();
while (status.matching != expected && z_clock_elapsed_s(&clock) < DEFAULT_TIMEOUT_S) {
assert_ok(z_publisher_get_matching_status(z_publisher_loan(pub), &status));
z_sleep_ms(100);
#if Z_FEATURE_MULTI_THREAD == 1
(void)s1;
(void)s2;
#else
zp_read(s1, NULL);
zp_send_keep_alive(s1, NULL);
zp_read(s2, NULL);
zp_send_keep_alive(s2, NULL);
#endif
}
if (status.matching != expected) {
fprintf(stderr, "Expected matching status %d, got %d\n", expected, status.matching);
Expand All @@ -278,16 +339,18 @@ void test_matching_publisher_get(void) {
assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_start_read_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_read_task(z_loan_mut(s2), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL));
#endif

z_owned_publisher_t pub;
assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL));
z_sleep_s(1);

_check_publisher_status(&pub, false);
_check_publisher_status(&pub, z_loan_mut(s1), z_loan_mut(s2), false);

z_owned_subscriber_t sub_wrong;
z_owned_closure_sample_t callback_wrong;
Expand All @@ -296,38 +359,50 @@ void test_matching_publisher_get(void) {
z_closure_sample_move(&callback_wrong), NULL));
z_sleep_s(1);

_check_publisher_status(&pub, false);
_check_publisher_status(&pub, z_loan_mut(s1), z_loan_mut(s2), false);

z_owned_subscriber_t sub;
z_owned_closure_sample_t callback;
z_closure_sample(&callback, NULL, NULL, NULL);
assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub),
z_closure_sample_move(&callback), NULL));

_check_publisher_status(&pub, true);
_check_publisher_status(&pub, z_loan_mut(s1), z_loan_mut(s2), true);

z_subscriber_drop(z_subscriber_move(&sub));

_check_publisher_status(&pub, false);
_check_publisher_status(&pub, z_loan_mut(s1), z_loan_mut(s2), false);

z_publisher_drop(z_publisher_move(&pub));
z_subscriber_drop(z_subscriber_move(&sub_wrong));

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
assert_ok(zp_stop_lease_task(z_loan_mut(s1)));
assert_ok(zp_stop_lease_task(z_loan_mut(s2)));
#endif

z_session_drop(z_session_move(&s1));
z_session_drop(z_session_move(&s2));
}

static void _check_querier_status(z_owned_querier_t* querier, bool expected) {
static void _check_querier_status(z_owned_querier_t* querier, z_loaned_session_t* s1, z_loaned_session_t* s2,
bool expected) {
z_matching_status_t status;
status.matching = !expected;
z_clock_t clock = z_clock_now();
while (status.matching != expected && z_clock_elapsed_s(&clock) < DEFAULT_TIMEOUT_S) {
assert_ok(z_querier_get_matching_status(z_querier_loan(querier), &status));
#if Z_FEATURE_MULTI_THREAD == 1
(void)s1;
(void)s2;
#else
zp_read(s1, NULL);
zp_send_keep_alive(s1, NULL);
zp_read(s2, NULL);
zp_send_keep_alive(s2, NULL);
#endif
z_sleep_ms(100);
}
if (status.matching != expected) {
Expand All @@ -351,16 +426,18 @@ void test_matching_querier_get(void) {
assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_start_read_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_read_task(z_loan_mut(s2), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL));
#endif

z_owned_querier_t querier;
assert_ok(z_declare_querier(z_session_loan(&s1), &querier, z_view_keyexpr_loan(&k_querier), NULL));
z_sleep_s(1);

_check_querier_status(&querier, false);
_check_querier_status(&querier, z_loan_mut(s1), z_loan_mut(s2), false);

z_owned_queryable_t queryable_wrong;
z_owned_closure_query_t callback_wrong;
Expand All @@ -369,27 +446,29 @@ void test_matching_querier_get(void) {
z_closure_query_move(&callback_wrong), NULL));
z_sleep_s(1);

_check_querier_status(&querier, false);
_check_querier_status(&querier, z_loan_mut(s1), z_loan_mut(s2), false);

z_owned_queryable_t queryable;
z_owned_closure_query_t callback;
z_closure_query(&callback, NULL, NULL, NULL);
assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable, z_view_keyexpr_loan(&k_sub),
z_closure_query_move(&callback), NULL));

_check_querier_status(&querier, true);
_check_querier_status(&querier, z_loan_mut(s1), z_loan_mut(s2), true);

z_queryable_drop(z_queryable_move(&queryable));

_check_querier_status(&querier, false);
_check_querier_status(&querier, z_loan_mut(s1), z_loan_mut(s2), false);

z_querier_drop(z_querier_move(&querier));
z_queryable_drop(z_queryable_move(&queryable_wrong));

#if Z_FEATURE_MULTI_THREAD == 1
assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
assert_ok(zp_stop_lease_task(z_loan_mut(s1)));
assert_ok(zp_stop_lease_task(z_loan_mut(s2)));
#endif

z_session_drop(z_session_move(&s1));
z_session_drop(z_session_move(&s2));
Expand Down

0 comments on commit a718a8a

Please sign in to comment.