Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/database/sql: Close DB Stats goroutine on db.Close() #3025

Merged
merged 23 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8f6d3a0
implement stopchan on contribroutines
mtoffl01 Dec 10, 2024
c7f7fd3
initialize channel at package root
mtoffl01 Dec 10, 2024
1b8bb5f
initialize stop channel at package root
mtoffl01 Dec 10, 2024
f0e844b
remove comment
mtoffl01 Dec 10, 2024
f0f73af
copywrite header
mtoffl01 Dec 10, 2024
bf4f376
Cleanup var declaration
mtoffl01 Dec 11, 2024
9e86acc
lock access to prevent race condition
mtoffl01 Dec 13, 2024
12241c9
added race detection tests to contribroutines_test.go
mtoffl01 Dec 16, 2024
41c3ad9
move stop chan initialization out of pollDBStats
mtoffl01 Dec 16, 2024
2cb3519
Apply stopchan to pgx pollPoolStats
mtoffl01 Dec 16, 2024
37b52f9
move lock outside of once.Do
mtoffl01 Dec 16, 2024
291e014
Merge branch 'main' into mtoff/contrib-routines
mtoffl01 Jan 13, 2025
11f13fa
Hook pollDBStats stop condition into db.Close() invocation
mtoffl01 Jan 13, 2025
42935d6
nits
mtoffl01 Jan 13, 2025
3fc1121
make tracer initialize new contribroutines.stop channel on startup
mtoffl01 Jan 13, 2025
5b116f8
Add reminder: implement stop condition on db.Close for pgx
mtoffl01 Jan 14, 2025
79ee5f5
revamp: tie pollDBStats to lifetime of db connection, only
mtoffl01 Jan 17, 2025
5bb8a4c
remove unused helper fn
mtoffl01 Jan 17, 2025
7b93aa2
nits: cleanup comments, fix undefined code call in metrics_test.go
mtoffl01 Jan 23, 2025
b55e322
Update godoc for tracedConnector.Close
mtoffl01 Jan 27, 2025
1ebd386
Merge branch 'main' into mtoff/contrib-routines
mtoffl01 Jan 31, 2025
8823e71
Merge branch 'main' into mtoff/contrib-routines
kakkoyun Feb 3, 2025
823375a
Merge branch 'main' into mtoff/contrib-routines
darccio Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions contrib/database/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,27 @@ var interval = 10 * time.Second

// pollDBStats calls (*DB).Stats on the db at a predetermined interval. It pushes the DBStats off to the statsd client.
// the caller should always ensure that db & statsd are non-nil
func pollDBStats(statsd internal.StatsdClient, db *sql.DB) {
func pollDBStats(statsd internal.StatsdClient, db *sql.DB, stop chan struct{}) {
log.Debug("DB stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("Reporting DB.Stats metrics...")
stat := db.Stats()
statsd.Gauge(MaxOpenConnections, float64(stat.MaxOpenConnections), []string{}, 1)
statsd.Gauge(OpenConnections, float64(stat.OpenConnections), []string{}, 1)
statsd.Gauge(InUse, float64(stat.InUse), []string{}, 1)
statsd.Gauge(Idle, float64(stat.Idle), []string{}, 1)
statsd.Gauge(WaitCount, float64(stat.WaitCount), []string{}, 1)
statsd.Timing(WaitDuration, stat.WaitDuration, []string{}, 1)
statsd.Gauge(MaxIdleClosed, float64(stat.MaxIdleClosed), []string{}, 1)
statsd.Gauge(MaxIdleTimeClosed, float64(stat.MaxIdleTimeClosed), []string{}, 1)
statsd.Gauge(MaxLifetimeClosed, float64(stat.MaxLifetimeClosed), []string{}, 1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("Reporting DB.Stats metrics...")
stat := db.Stats()
statsd.Gauge(MaxOpenConnections, float64(stat.MaxOpenConnections), []string{}, 1)
statsd.Gauge(OpenConnections, float64(stat.OpenConnections), []string{}, 1)
statsd.Gauge(InUse, float64(stat.InUse), []string{}, 1)
statsd.Gauge(Idle, float64(stat.Idle), []string{}, 1)
statsd.Gauge(WaitCount, float64(stat.WaitCount), []string{}, 1)
statsd.Timing(WaitDuration, stat.WaitDuration, []string{}, 1)
statsd.Gauge(MaxIdleClosed, float64(stat.MaxIdleClosed), []string{}, 1)
statsd.Gauge(MaxIdleTimeClosed, float64(stat.MaxIdleTimeClosed), []string{}, 1)
statsd.Gauge(MaxLifetimeClosed, float64(stat.MaxLifetimeClosed), []string{}, 1)
case <-stop:
return
}
}
}

Expand Down
23 changes: 23 additions & 0 deletions contrib/database/sql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
package sql

import (
"sync"
"testing"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
)

Expand Down Expand Up @@ -64,3 +68,22 @@ func TestStatsTags(t *testing.T) {
})
resetGlobalConfig()
}

func TestPollDBStatsStop(t *testing.T) {
driverName := "postgres"
Register(driverName, &pq.Driver{}, WithServiceName("postgres-test"), WithAnalyticsRate(0.2))
defer unregister(driverName)
db, err := Open(driverName, "postgres://postgres:[email protected]:5432/postgres?sslmode=disable")
require.NoError(t, err)
defer db.Close()

var wg sync.WaitGroup
stop := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
pollDBStats(&statsd.NoOpClientDirect{}, db, stop)
}()
close(stop)
wg.Wait()
}
11 changes: 10 additions & 1 deletion contrib/database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type tracedConnector struct {
connector driver.Connector
driverName string
cfg *config
dbClose chan struct{}
}

func (t *tracedConnector) Connect(ctx context.Context) (driver.Conn, error) {
Expand Down Expand Up @@ -171,6 +172,13 @@ func (t *tracedConnector) Driver() driver.Driver {
return t.connector.Driver()
}

// Close closes the dbClose channel
// This method will be invoked when DB.Close() is called, which we expect to occur only once: https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/database/sql/sql.go;l=918-950
func (t *tracedConnector) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me realize we probably had a bug before, since this wraps the original driver.Connector, if the original one implemented Close() error, our traced version didn't (until this PR).

Maybe we should internally call the original connector Close method, in case it implements io.Closer?

(btw this is ok to do outside of this PR, since this issue has been there before).

close(t.dbClose)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// from Go stdlib implementation of sql.Open
type dsnConnector struct {
dsn string
Expand Down Expand Up @@ -208,10 +216,11 @@ func OpenDB(c driver.Connector, opts ...Option) *sql.DB {
connector: c,
driverName: driverName,
cfg: cfg,
dbClose: make(chan struct{}),
}
db := sql.OpenDB(tc)
if cfg.dbStats && cfg.statsdClient != nil {
go pollDBStats(cfg.statsdClient, db)
go pollDBStats(cfg.statsdClient, db, tc.dbClose)
}
return db
}
Expand Down
8 changes: 7 additions & 1 deletion contrib/database/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,13 @@ func TestOpenOptions(t *testing.T) {
var tg statsdtest.TestStatsdClient
Register(driverName, &pq.Driver{})
defer unregister(driverName)
_, err := Open(driverName, dsn, withStatsdClient(&tg), WithDBStats())
db, err := Open(driverName, dsn, withStatsdClient(&tg), WithDBStats())
require.NoError(t, err)

// The polling interval has been reduced to 500ms for the sake of this test, so at least one round of `pollDBStats` should be complete in 1s
deadline := time.Now().Add(1 * time.Second)
wantStats := []string{MaxOpenConnections, OpenConnections, InUse, Idle, WaitCount, WaitDuration, MaxIdleClosed, MaxIdleTimeClosed, MaxLifetimeClosed}
var calls1 []string
for {
if time.Now().After(deadline) {
t.Fatalf("Stats not collected in expected interval of %v", interval)
Expand All @@ -300,11 +301,16 @@ func TestOpenOptions(t *testing.T) {
}
}
// all expected stats have been collected; exit out of loop, test should pass
calls1 = calls
break
}
// not all stats have been collected yet, try again in 50ms
time.Sleep(50 * time.Millisecond)
}
// Close DB and assert the no further stats have been collected; db.Close should stop the pollDBStats goroutine.
db.Close()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, calls1, tg.CallNames())
})
}

Expand Down
1 change: 1 addition & 0 deletions contrib/jackc/pgx.v5/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var interval = 10 * time.Second

// pollPoolStats calls (*pgxpool).Stats on the pool at a predetermined interval. It pushes the pool Stats off to the statsd client.
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool) {
// TODO: Create stop condition for pgx on db.Close
log.Debug("contrib/jackc/pgx.v5: Traced pool connection found: Pool stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
Expand Down
Loading