From fc3217e92ccedf808e191e846d983702b9f85dcd Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 28 Jan 2025 15:46:43 -0800 Subject: [PATCH] go: sqle: dolt_gc: Add DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE env variable to control dolt_gc safepoint behavior. This is a short-term setting which will allow choosing the session-aware gc safepoint behavior, instead of the legacy behavior which kills all in-flight connections when performing a GC. --- go/libraries/doltcore/dconfig/envvars.go | 4 + .../doltcore/sqle/dprocedures/dolt_gc.go | 32 ++++-- .../sqle/dsess/gc_safepoint_controller.go | 1 + .../concurrent_gc_test.go | 97 +++++++++++++------ 4 files changed, 92 insertions(+), 42 deletions(-) diff --git a/go/libraries/doltcore/dconfig/envvars.go b/go/libraries/doltcore/dconfig/envvars.go index f9df29d7e99..9c7d11f8e4b 100755 --- a/go/libraries/doltcore/dconfig/envvars.go +++ b/go/libraries/doltcore/dconfig/envvars.go @@ -45,4 +45,8 @@ const ( EnvDbNameReplace = "DOLT_DBNAME_REPLACE" EnvDoltRootHost = "DOLT_ROOT_HOST" EnvDoltRootPassword = "DOLT_ROOT_PASSWORD" + + // If set, must be "kill_connections" or "session_aware" + // Will go away after session_aware is made default-and-only. + EnvGCSafepointControllerChoice = "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE" ) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 5869c25656c..2b674635ad9 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -37,10 +37,19 @@ const ( cmdSuccess = 0 ) +var useSessionAwareSafepointController bool + func init() { if os.Getenv(dconfig.EnvDisableGcProcedure) != "" { DoltGCFeatureFlag = false } + if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" { + if choice == "session_aware" { + useSessionAwareSafepointController = true + } else if choice != "kill_connections" { + panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections") + } + } } var DoltGCFeatureFlag = true @@ -162,8 +171,8 @@ type sessionAwareSafepointController struct { callCtx *sql.Context origEpoch int - waiter *dsess.GCSafepointWaiter - keeper func(hash.Hash) bool + waiter *dsess.GCSafepointWaiter + keeper func(hash.Hash) bool } func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error { @@ -261,14 +270,17 @@ func (impl *DoltGCProcedure) doGC(ctx *sql.Context, args []string) (int, error) } var sc types.GCSafepointController - sc = killConnectionsSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - } - sc = &sessionAwareSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - controller: impl.gcSafepointController, + if useSessionAwareSafepointController { + sc = &sessionAwareSafepointController{ + origEpoch: origepoch, + callCtx: ctx, + controller: impl.gcSafepointController, + } + } else { + sc = killConnectionsSafepointController{ + origEpoch: origepoch, + callCtx: ctx, + } } err = ddb.GC(ctx, mode, sc) if err != nil { diff --git a/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go b/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go index ff7a5ab1689..dc99301d792 100644 --- a/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go +++ b/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go @@ -144,6 +144,7 @@ func (w *GCSafepointWaiter) Wait(ctx context.Context) error { } var closedCh = make(chan struct{}) + func init() { close(closedCh) } diff --git a/integration-tests/go-sql-server-driver/concurrent_gc_test.go b/integration-tests/go-sql-server-driver/concurrent_gc_test.go index c6a3c5e6c27..23f0706c53c 100644 --- a/integration-tests/go-sql-server-driver/concurrent_gc_test.go +++ b/integration-tests/go-sql-server-driver/concurrent_gc_test.go @@ -31,22 +31,43 @@ import ( ) func TestConcurrentGC(t *testing.T) { - var gct gcTest - gct.numThreads = 8 - gct.duration = 10 * time.Second + var base = gcTest { + numThreads: 8, + duration: 10 * time.Second, + } t.Run("NoCommits", func(t *testing.T) { - gct.run(t) + var base = base + base.commit = false + t.Run("KillConnections", func(t *testing.T) { + var gct = base + gct.run(t) + }) + t.Run("SessionAware", func(t *testing.T) { + var gct = base + gct.sessionAware = true + gct.run(t) + }) }) - gct.commit = true t.Run("WithCommits", func(t *testing.T) { - gct.run(t) + var base = base + base.commit = true + t.Run("KillConnections", func(t *testing.T) { + var gct = base + gct.run(t) + }) + t.Run("SessionAware", func(t *testing.T) { + var gct = base + gct.sessionAware = true + gct.run(t) + }) }) } type gcTest struct { - numThreads int - duration time.Duration - commit bool + numThreads int + duration time.Duration + commit bool + sessionAware bool } func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) { @@ -69,13 +90,19 @@ func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) { func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) error { conn, err := db.Conn(ctx) - if err != nil { + if gct.sessionAware { + if !assert.NoError(t, err) { + return nil + } + } else if err != nil { t.Logf("err in Conn: %v", err) return nil } defer conn.Close() _, err = conn.ExecContext(ctx, "update vals set val = val+1 where id = ?", i) - if err != nil { + if gct.sessionAware { + assert.NoError(t, err) + } else if err != nil { if !assert.NotContains(t, err.Error(), "dangling ref") { return err } @@ -89,7 +116,9 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) } if gct.commit { _, err = conn.ExecContext(ctx, fmt.Sprintf("call dolt_commit('-am', 'increment vals id = %d')", i)) - if err != nil { + if gct.sessionAware { + assert.NoError(t, err) + } else if err != nil { if !assert.NotContains(t, err.Error(), "dangling ref") { return err } @@ -107,30 +136,27 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error { conn, err := db.Conn(ctx) - if err != nil { + if gct.sessionAware { + if !assert.NoError(t, err) { + return nil + } + } else if err != nil { t.Logf("err in Conn for dolt_gc: %v", err) return nil } - defer func() { - // After calling dolt_gc, the connection is bad. Remove it from the connection pool. - conn.Raw(func(_ any) error { - return sqldriver.ErrBadConn - }) - }() + if !gct.sessionAware { + defer func() { + // After calling dolt_gc, the connection is bad. Remove it from the connection pool. + conn.Raw(func(_ any) error { + return sqldriver.ErrBadConn + }) + }() + } else { + defer conn.Close() + } b := time.Now() _, err = conn.ExecContext(ctx, "call dolt_gc()") - if err != nil { - if !assert.NotContains(t, err.Error(), "dangling ref") { - return err - } - if !assert.NotContains(t, err.Error(), "is unexpected noms value") { - return err - } - if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") { - return err - } - t.Logf("err in Exec dolt_gc: %v", err) - } else { + if assert.NoError(t, err) { t.Logf("successful dolt_gc took %v", time.Since(b)) } return nil @@ -184,7 +210,14 @@ func (gct gcTest) run(t *testing.T) { repo, err := rs.MakeRepo("concurrent_gc_test") require.NoError(t, err) - server := MakeServer(t, repo, &driver.Server{}) + srvSettings := &driver.Server{} + if gct.sessionAware { + srvSettings.Envs = append(srvSettings.Envs, "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE=session_aware") + } else { + srvSettings.Envs = append(srvSettings.Envs, "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE=kill_connections") + } + + server := MakeServer(t, repo, srvSettings) server.DBName = "concurrent_gc_test" db, err := server.DB(driver.Connection{User: "root"})