Skip to content

Commit

Permalink
fix: ensure connection count is correctly reported (#586)
Browse files Browse the repository at this point in the history
A recent refactor broke the open connections metric and caused all
counts to be reported as either 1 or 0. This commit updates the test to
protect against making the same mistake and fixes the original problem
(using value semanatics and not pointer semantics).
  • Loading branch information
enocom authored Jun 6, 2024
1 parent 28839d9 commit b640ffb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 27 deletions.
9 changes: 5 additions & 4 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type connectionInfoCache interface {
// monitoredCache is a wrapper around a connectionInfoCache that tracks the
// number of connections to the associated instance.
type monitoredCache struct {
openConns uint64
openConns *uint64
connectionInfoCache
}

Expand Down Expand Up @@ -345,13 +345,13 @@ func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption)

latency := time.Since(startTime).Milliseconds()
go func() {
n := atomic.AddUint64(&cache.openConns, 1)
n := atomic.AddUint64(cache.openConns, 1)
trace.RecordOpenConnections(ctx, int64(n), d.dialerID, inst.String())
trace.RecordDialLatency(ctx, instance, d.dialerID, latency)
}()

return newInstrumentedConn(tlsConn, func() {
n := atomic.AddUint64(&cache.openConns, ^uint64(0))
n := atomic.AddUint64(cache.openConns, ^uint64(0))
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, inst.String())
}), nil
}
Expand Down Expand Up @@ -597,7 +597,8 @@ func (d *Dialer) connectionInfoCache(
d.refreshTimeout, d.dialerID,
)
}
c = monitoredCache{connectionInfoCache: cache}
var open uint64
c = monitoredCache{openConns: &open, connectionInfoCache: cache}
d.cache[uri] = c
}
}
Expand Down
77 changes: 54 additions & 23 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package alloydbconn

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -26,50 +28,62 @@ import (
)

type spyMetricsExporter struct {
mu sync.Mutex
data []*view.Data
mu sync.Mutex
viewData []*view.Data
}

func (e *spyMetricsExporter) ExportView(vd *view.Data) {
e.mu.Lock()
defer e.mu.Unlock()
e.data = append(e.data, vd)
e.viewData = append(e.viewData, vd)
}

type metric struct {
name string
data view.AggregationData
}

func (e *spyMetricsExporter) Data() []metric {
func (e *spyMetricsExporter) data() []metric {
e.mu.Lock()
defer e.mu.Unlock()
var res []metric
for _, d := range e.data {
for _, d := range e.viewData {
for _, r := range d.Rows {
res = append(res, metric{name: d.View.Name, data: r.Data})
}
}
return res
}

// dump marshals a value to JSON for better test reporting
func dump[T any](t *testing.T, data T) string {
b, err := json.MarshalIndent(data, "", " ")
if err != nil {
t.Fatal(err)
}
return fmt.Sprint(string(b))
}

// wantLastValueMetric ensures the provided metrics include a metric with the
// wanted name and at least data point.
func wantLastValueMetric(t *testing.T, wantName string, ms []metric) {
func wantLastValueMetric(t *testing.T, wantName string, ms []metric, wantValue int) {
t.Helper()
gotNames := make(map[string]view.AggregationData)
for _, m := range ms {
gotNames[m.name] = m.data
_, ok := m.data.(*view.LastValueData)
if m.name == wantName && ok {
d, ok := m.data.(*view.LastValueData)
if ok && m.name == wantName && d.Value == float64(wantValue) {
return
}
}
t.Fatalf("metric name want = %v with LastValueData, all metrics = %#v", wantName, gotNames)
t.Fatalf(
"want metric LastValueData{name = %q, value = %v}, got metrics = %v",
wantName, wantValue, dump(t, gotNames),
)
}

// wantDistributionMetric ensures the provided metrics include a metric with the
// wanted name and at least one data point.
// wantDistributionMetric ensures the provided metrics include a metric with
// the wanted name and at least one data point.
func wantDistributionMetric(t *testing.T, wantName string, ms []metric) {
t.Helper()
gotNames := make(map[string]view.AggregationData)
Expand All @@ -80,11 +94,14 @@ func wantDistributionMetric(t *testing.T, wantName string, ms []metric) {
return
}
}
t.Fatalf("metric name want = %v with DistributionData, all metrics = %#v", wantName, gotNames)
t.Fatalf(
"metric name want = %v with DistributionData, all metrics = %v",
wantName, dump(t, gotNames),
)
}

// wantCountMetric ensures the provided metrics include a metric with the wanted
// name and at least one data point.
// wantCountMetric ensures the provided metrics include a metric with the
// wanted name and at least one data point.
func wantCountMetric(t *testing.T, wantName string, ms []metric) {
t.Helper()
gotNames := make(map[string]view.AggregationData)
Expand All @@ -95,7 +112,10 @@ func wantCountMetric(t *testing.T, wantName string, ms []metric) {
return
}
}
t.Fatalf("metric name want = %v with CountData, all metrics = %#v", wantName, gotNames)
t.Fatalf(
"metric name want = %v with CountData, all metrics = %v",
wantName, dump(t, gotNames),
)
}

func TestDialerWithMetrics(t *testing.T) {
Expand All @@ -119,7 +139,9 @@ func TestDialerWithMetrics(t *testing.T) {
t.Fatalf("%v", err)
}
}()
c, err := alloydbadmin.NewAlloyDBAdminRESTClient(ctx, option.WithHTTPClient(mc), option.WithEndpoint(url))
c, err := alloydbadmin.NewAlloyDBAdminRESTClient(
ctx, option.WithHTTPClient(mc), option.WithEndpoint(url),
)
if err != nil {
t.Fatalf("expected NewClient to succeed, but got error: %v", err)
}
Expand All @@ -131,25 +153,34 @@ func TestDialerWithMetrics(t *testing.T) {
d.client = c

// dial a good instance
conn, err := d.Dial(ctx, "/projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance")
conn, err := d.Dial(ctx, testInstanceURI)
if err != nil {
t.Fatalf("expected Dial to succeed, but got error: %v", err)
}
defer conn.Close()
// dial a second time to ensure the counter is working
conn2, err := d.Dial(ctx, testInstanceURI)
if err != nil {
t.Fatalf("expected Dial to succeed, but got error: %v", err)
}
defer conn2.Close()
// dial a bogus instance
_, err = d.Dial(ctx, "/projects/my-project/locations/my-region/clusters/my-cluster/instances/notaninstance")
_, err = d.Dial(ctx,
"projects/my-project/locations/my-region/clusters/"+
"my-cluster/instances/notaninstance",
)
if err == nil {
t.Fatal("expected Dial to fail, but got no error")
}

time.Sleep(100 * time.Millisecond) // allow exporter a chance to run

// success metrics
wantLastValueMetric(t, "alloydbconn/open_connections", spy.Data())
wantDistributionMetric(t, "alloydbconn/dial_latency", spy.Data())
wantCountMetric(t, "alloydbconn/refresh_success_count", spy.Data())
wantLastValueMetric(t, "alloydbconn/open_connections", spy.data(), 2)
wantDistributionMetric(t, "alloydbconn/dial_latency", spy.data())
wantCountMetric(t, "alloydbconn/refresh_success_count", spy.data())

// failure metrics from dialing bogus instance
wantCountMetric(t, "alloydbconn/dial_failure_count", spy.Data())
wantCountMetric(t, "alloydbconn/refresh_failure_count", spy.Data())
wantCountMetric(t, "alloydbconn/dial_failure_count", spy.data())
wantCountMetric(t, "alloydbconn/refresh_failure_count", spy.data())
}

0 comments on commit b640ffb

Please sign in to comment.