Skip to content

Commit

Permalink
fix: do not re-search for other channels when doing initial channel s…
Browse files Browse the repository at this point in the history
…earch

When creating a large number of Channels at once, we can end up calling
`ContextImpl::poke(true)` many times in quick succession.  This results
in a flood of UDP broadcasts where we are searching for channels that we
only just sent out the initial search request for.

This can easily lead to packets getting lost and us not receiving a
reply for some Channels.  Moreover, as we keep resending search requests
for Channels, we reschedule them further and further in the future (as
`nSearch` is increased).  After the dust settles and we stop poking,
this can result in a wait of several seconds before a Channel which we
have not found is searched for again.

In this commit we avoid this issue by using a separate bucket to hold
channels waiting for their initial search request.  Rather than poking
`tickSearch` to do the initial search and also resend requests for
outstanding channels, we schedule a call to new call `tickSearch` which
will only send the initial search requests.  As such, we will avoid
rebroadcasting search requests for channels we have only just searched
for.

We have prompted the `discover` bool to an enum to distinguish between
the now three different situations `tickSearch` can be called in.
  • Loading branch information
thomasives committed Apr 11, 2023
1 parent c8f28f3 commit 64c480d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
55 changes: 40 additions & 15 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& cont
context->chanByName[namekey] = chan;

if(server.empty()) {
context->searchBuckets[context->currentBucket].push_back(chan);
context->initialSearchBucket.push_back(chan);

context->poke(true);
context->scheduleInitialSearch();

} else { // bypass search and connect so a specific server
chan->forcedServer = forceServer;
Expand Down Expand Up @@ -694,6 +694,19 @@ void ContextImpl::poke(bool force)
throw std::runtime_error("Unable to schedule searchTimer");
}

void ContextImpl::scheduleInitialSearch()
{
if (!initialSearchScheduled)
{
log_debug_printf(setup, "scheduleInitialSearch()%s\n", "");

initialSearchScheduled = true;
tcp_loop.dispatch([this]() {
tickSearch(SearchKind::initial);
});
}
}

void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
{
epicsTimeStamp now;
Expand Down Expand Up @@ -958,27 +971,38 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
}
}

void ContextImpl::tickSearch(bool discover)
void ContextImpl::tickSearch(SearchKind kind)
{
// If !discover, then this is a discovery ping.
// If kind == SearchKind::discover, then this is a discovery ping.
// these are really empty searches with must-reply set.
// So if !discover, then we should not be modifying any internal state
{
//
// If kind == SearchKind::initial we are sending the first search request
// for the channels in initalSearchBucket, and not resending requests for
// channels in the searchBuckets.
//
// If kind == SearchKind::check then we may have been poked.
if (kind == SearchKind::check) {
Guard G(pokeLock);
poked = false;
} else if (kind == SearchKind::initial) {
initialSearchScheduled = false;
}

auto idx = currentBucket;
if(!discover)
if(kind == SearchKind::check)
currentBucket = (currentBucket+1u)%searchBuckets.size();

log_debug_printf(io, "Search tick %zu\n", idx);

decltype (searchBuckets)::value_type bucket;
if(!discover)
if (kind == SearchKind::initial) {
initialSearchBucket.swap(bucket);
} else if(kind == SearchKind::check) {
searchBuckets[idx].swap(bucket);
}

while(!bucket.empty() || discover) {
while(!bucket.empty() || kind == SearchKind::discover) {
// when 'discover' we only loop once

searchMsg.resize(0x10000);
Expand All @@ -991,7 +1015,8 @@ void ContextImpl::tickSearch(bool discover)
// flags and reserved.
// initially flags[7] is cleared (bcast)
auto pflags = M.save();
to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(kind == SearchKind::discover ?
pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(0u));
to_wire(M, uint16_t(0u));

Expand All @@ -1004,7 +1029,7 @@ void ContextImpl::tickSearch(bool discover)
auto pport = M.save();
to_wire(M, uint16_t(searchRxPort));

if(discover) {
if(kind == SearchKind::discover) {
to_wire(M, uint8_t(0u));

} else {
Expand All @@ -1019,7 +1044,7 @@ void ContextImpl::tickSearch(bool discover)

bool payload = false;
while(!bucket.empty()) {
assert(!discover);
assert(kind != SearchKind::discover);

auto chan = bucket.front().lock();
if(!chan || chan->state!=Channel::Searching) {
Expand Down Expand Up @@ -1076,7 +1101,7 @@ void ContextImpl::tickSearch(bool discover)
}
assert(M.good());

if(!payload && !discover)
if(!payload && kind != SearchKind::discover)
break;

{
Expand Down Expand Up @@ -1144,18 +1169,18 @@ void ContextImpl::tickSearch(bool discover)
// fail silently, will retry
}

if(discover)
if(kind == SearchKind::discover)
break;
}

if(event_add(searchTimer.get(), &bucketInterval))
if(kind != SearchKind::initial && event_add(searchTimer.get(), &bucketInterval))
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
}

void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<ContextImpl*>(raw)->tickSearch(false);
static_cast<ContextImpl*>(raw)->tickSearch(SearchKind::check);
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
}
Expand Down
2 changes: 1 addition & 1 deletion src/clientdiscover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ std::shared_ptr<Operation> DiscoverBuilder::exec()
if(first && ping) {
log_debug_printf(setup, "Starting Discover%s", "\n");

context->tickSearch(true);
context->tickSearch(ContextImpl::SearchKind::discover);
}
});

Expand Down
12 changes: 11 additions & 1 deletion src/clientimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
epicsTimeStamp lastPoke{};
bool poked = false;

// unlike `poke`, `scheduleInitialSearch` is only ever called from the
// tcp_loop so this does not need to be guarded by a mutex
bool initialSearchScheduled = false;

// map: endpoint+proto -> Beaconer
typedef std::pair<SockAddr, std::string> BeaconServer;
struct BeaconInfo {
Expand All @@ -287,6 +291,9 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
std::vector<std::pair<SockEndpoint, bool>> searchDest;

size_t currentBucket = 0u;
// Channels where we have yet to send out an initial search request
std::list<std::weak_ptr<Channel>> initialSearchBucket;
// Channels where we are waiting for a search response
std::vector<std::list<std::weak_ptr<Channel>>> searchBuckets;

std::list<std::unique_ptr<UDPListener> > beaconRx;
Expand Down Expand Up @@ -330,9 +337,12 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>

void onBeacon(const UDPManager::Beacon& msg);

void scheduleInitialSearch();

bool onSearch(evutil_socket_t fd);
static void onSearchS(evutil_socket_t fd, short evt, void *raw);
void tickSearch(bool discover);
enum class SearchKind { discover, initial, check };
void tickSearch(SearchKind kind);
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
void tickBeaconClean();
static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw);
Expand Down

0 comments on commit 64c480d

Please sign in to comment.