From 0c352749fd7d67697b72cbff71faa75fc51f89e7 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Tue, 28 Jan 2025 20:02:53 +0530 Subject: [PATCH 1/2] [release-21.0] Increase health check buffer size (#17636) Signed-off-by: Manan Gupta Signed-off-by: deepthi --- go/vt/discovery/healthcheck.go | 7 ++++- go/vt/discovery/healthcheck_test.go | 44 +++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 83c688c6a50..72b4214d5a8 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -91,6 +91,9 @@ var ( // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond + // Size of channel buffer for each subscriber + broadcastChannelBufferSize = 2048 + // HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the // HTML code required to render the cache of the HealthCheck. HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache") @@ -624,7 +627,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) { func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth { hc.subMu.Lock() defer hc.subMu.Unlock() - c := make(chan *TabletHealth, 2) + c := make(chan *TabletHealth, broadcastChannelBufferSize) hc.subscribers[c] = struct{}{} return c } @@ -643,6 +646,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { select { case c <- th: default: + // If the channel is full, we drop the message. + log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) } } } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 962200a6a3b..19722641375 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -1481,6 +1481,50 @@ func TestDebugURLFormatting(t *testing.T) { require.Contains(t, wr.String(), expectedURL, "output missing formatted URL") } +// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped. +// Added in response to https://github.com/vitessio/vitess/issues/17629. +func TestConcurrentUpdates(t *testing.T) { + ctx := utils.LeakCheckContext(t) + var mu sync.Mutex + // reset error counters + hcErrorCounters.ResetAll() + ts := memorytopo.NewServer(ctx, "cell") + defer ts.Close() + hc := createTestHc(ctx, ts) + // close healthcheck + defer hc.Close() + + // Subscribe to the healthcheck + // Make the receiver keep track of the updates received. + ch := hc.Subscribe() + totalCount := 0 + go func() { + for range ch { + mu.Lock() + totalCount++ + mu.Unlock() + // Simulate a somewhat slow consumer. + time.Sleep(100 * time.Millisecond) + } + }() + + // Run multiple updates really quickly + // one after the other. + totalUpdates := 10 + for i := 0; i < totalUpdates; i++ { + hc.broadcast(&TabletHealth{}) + } + // Unsubscribe from the healthcheck + // and verify we process all the updates eventually. + hc.Unsubscribe(ch) + defer close(ch) + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return totalUpdates == totalCount + }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") +} + func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) { connMapMu.Lock() defer connMapMu.Unlock() From 5d9f3bd316edc354d9bf6f1e8ee79d94ccf0e2f5 Mon Sep 17 00:00:00 2001 From: deepthi Date: Wed, 29 Jan 2025 16:30:49 -0800 Subject: [PATCH 2/2] kick CI Signed-off-by: deepthi