diff --git a/src/link.c b/src/link.c index 29d8ae3..a7d6680 100644 --- a/src/link.c +++ b/src/link.c @@ -638,6 +638,8 @@ static void uninstall_service_relay(struct link *link, int64_t service_id) switch (service_relay->state) { case relay_state_unsynced: + LIST_REMOVE(service_relay, entry); + relay_destroy(service_relay); break; case relay_state_syncing: service_relay->pending_unsync = true; @@ -693,6 +695,8 @@ static void uninstall_sub_relay(struct link *link, int64_t sub_id) switch (sub_relay->state) { case relay_state_unsynced: + LIST_REMOVE(sub_relay, entry); + relay_destroy(sub_relay); break; case relay_state_syncing: sub_relay->pending_unsync = true; diff --git a/test/paf_testcases.c b/test/paf_testcases.c index 37f060e..96b9e37 100644 --- a/test/paf_testcases.c +++ b/test/paf_testcases.c @@ -803,6 +803,96 @@ TESTCASE_F(paf, match_with_most_servers_down, REQUIRE_NO_LOCAL_PORT_BIND) return UTEST_SUCCESS; } +TESTCASE(paf, interleaved_subscribe_unsubscribe) +{ + struct paf_context *context = paf_attach(ts_domain_name); + + bool server_started = false; + + const double test_time = 3; + + const int max_pubs = 100; + int64_t pub_ids[max_pubs]; + int num_pubs = 0; + + const int max_subs = 100; + int64_t sub_ids[max_subs]; + int num_subs = 0; + + double deadline = ut_ftime(CLOCK_MONOTONIC) + test_time; + + struct paf_props *props = paf_props_create(); + paf_props_add_str(props, "name", "foo"); + + struct hits hits = {}; + + do { + bool publish = num_pubs < max_pubs && tu_randbool(); + + if (publish) { + int64_t pub_id = paf_publish(context, props); + pub_ids[num_pubs++] = pub_id; + } + + bool unpublish = num_pubs > 0 && tu_randbool(); + + if (unpublish) { + int last_idx = num_pubs - 1; + int idx = tu_randint(0, last_idx); + + paf_unpublish(context, pub_ids[idx]); + + if (idx != last_idx) + pub_ids[idx] = pub_ids[last_idx]; + + num_pubs--; + } + + bool subscribe = num_subs < max_subs && tu_randbool(); + + if (subscribe) { + int64_t sub_id = + paf_subscribe(context, NULL, count_match_cb, &hits); + sub_ids[num_subs++] = sub_id; + } + + bool unsubscribe = num_subs > 0 && tu_randbool(); + + if (unsubscribe) { + int last_idx = num_subs - 1; + int idx = tu_randint(0, last_idx); + + paf_unsubscribe(context, sub_ids[idx]); + + if (idx != last_idx) + sub_ids[idx] = sub_ids[last_idx]; + + num_subs--; + } + + if (tu_randbool()) + CHKNOERR(paf_process(context)); + + if (tu_randbool()) + tu_msleep(tu_randint(1, 5)); + + if (tu_randint(0, 99) == 0) { + if (server_started) + CHKNOERR(ts_stop_servers()); + else + CHKNOERR(ts_start_servers()); + + server_started = !server_started; + } + } while (hits.appeared == 0 || ut_ftime(CLOCK_MONOTONIC) < deadline); + + paf_props_destroy(props); + + paf_close(context); + + return UTEST_SUCCESS; +} + #define NAME "*name*" #define VALUE "()<>;"