Skip to content

Commit

Permalink
go: sqle: dolt_gc: Add DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE env variab…
Browse files Browse the repository at this point in the history
…le to control dolt_gc safepoint behavior.

This is a short-term setting which will allow choosing the session-aware
gc safepoint behavior, instead of the legacy behavior which kills all
in-flight connections when performing a GC.
  • Loading branch information
reltuk committed Jan 28, 2025
1 parent bdc8ff1 commit fc3217e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 42 deletions.
4 changes: 4 additions & 0 deletions go/libraries/doltcore/dconfig/envvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ const (
EnvDbNameReplace = "DOLT_DBNAME_REPLACE"
EnvDoltRootHost = "DOLT_ROOT_HOST"
EnvDoltRootPassword = "DOLT_ROOT_PASSWORD"

// If set, must be "kill_connections" or "session_aware"
// Will go away after session_aware is made default-and-only.
EnvGCSafepointControllerChoice = "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE"
)
32 changes: 22 additions & 10 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,19 @@ const (
cmdSuccess = 0
)

var useSessionAwareSafepointController bool

func init() {
if os.Getenv(dconfig.EnvDisableGcProcedure) != "" {
DoltGCFeatureFlag = false
}
if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" {
if choice == "session_aware" {
useSessionAwareSafepointController = true
} else if choice != "kill_connections" {
panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections")
}
}
}

var DoltGCFeatureFlag = true
Expand Down Expand Up @@ -162,8 +171,8 @@ type sessionAwareSafepointController struct {
callCtx *sql.Context
origEpoch int

waiter *dsess.GCSafepointWaiter
keeper func(hash.Hash) bool
waiter *dsess.GCSafepointWaiter
keeper func(hash.Hash) bool
}

func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error {
Expand Down Expand Up @@ -261,14 +270,17 @@ func (impl *DoltGCProcedure) doGC(ctx *sql.Context, args []string) (int, error)
}

var sc types.GCSafepointController
sc = killConnectionsSafepointController{
origEpoch: origepoch,
callCtx: ctx,
}
sc = &sessionAwareSafepointController{
origEpoch: origepoch,
callCtx: ctx,
controller: impl.gcSafepointController,
if useSessionAwareSafepointController {
sc = &sessionAwareSafepointController{
origEpoch: origepoch,
callCtx: ctx,
controller: impl.gcSafepointController,
}
} else {
sc = killConnectionsSafepointController{
origEpoch: origepoch,
callCtx: ctx,
}
}
err = ddb.GC(ctx, mode, sc)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (w *GCSafepointWaiter) Wait(ctx context.Context) error {
}

var closedCh = make(chan struct{})

func init() {
close(closedCh)
}
Expand Down
97 changes: 65 additions & 32 deletions integration-tests/go-sql-server-driver/concurrent_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,43 @@ import (
)

func TestConcurrentGC(t *testing.T) {
var gct gcTest
gct.numThreads = 8
gct.duration = 10 * time.Second
var base = gcTest {
numThreads: 8,
duration: 10 * time.Second,
}
t.Run("NoCommits", func(t *testing.T) {
gct.run(t)
var base = base
base.commit = false
t.Run("KillConnections", func(t *testing.T) {
var gct = base
gct.run(t)
})
t.Run("SessionAware", func(t *testing.T) {
var gct = base
gct.sessionAware = true
gct.run(t)
})
})
gct.commit = true
t.Run("WithCommits", func(t *testing.T) {
gct.run(t)
var base = base
base.commit = true
t.Run("KillConnections", func(t *testing.T) {
var gct = base
gct.run(t)
})
t.Run("SessionAware", func(t *testing.T) {
var gct = base
gct.sessionAware = true
gct.run(t)
})
})
}

type gcTest struct {
numThreads int
duration time.Duration
commit bool
numThreads int
duration time.Duration
commit bool
sessionAware bool
}

func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {
Expand All @@ -69,13 +90,19 @@ func (gct gcTest) createDB(t *testing.T, ctx context.Context, db *sql.DB) {

func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int) error {
conn, err := db.Conn(ctx)
if err != nil {
if gct.sessionAware {
if !assert.NoError(t, err) {
return nil
}
} else if err != nil {
t.Logf("err in Conn: %v", err)
return nil
}
defer conn.Close()
_, err = conn.ExecContext(ctx, "update vals set val = val+1 where id = ?", i)
if err != nil {
if gct.sessionAware {
assert.NoError(t, err)
} else if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
Expand All @@ -89,7 +116,9 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int)
}
if gct.commit {
_, err = conn.ExecContext(ctx, fmt.Sprintf("call dolt_commit('-am', 'increment vals id = %d')", i))
if err != nil {
if gct.sessionAware {
assert.NoError(t, err)
} else if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
Expand All @@ -107,30 +136,27 @@ func (gct gcTest) doUpdate(t *testing.T, ctx context.Context, db *sql.DB, i int)

func (gct gcTest) doGC(t *testing.T, ctx context.Context, db *sql.DB) error {
conn, err := db.Conn(ctx)
if err != nil {
if gct.sessionAware {
if !assert.NoError(t, err) {
return nil
}
} else if err != nil {
t.Logf("err in Conn for dolt_gc: %v", err)
return nil
}
defer func() {
// After calling dolt_gc, the connection is bad. Remove it from the connection pool.
conn.Raw(func(_ any) error {
return sqldriver.ErrBadConn
})
}()
if !gct.sessionAware {
defer func() {
// After calling dolt_gc, the connection is bad. Remove it from the connection pool.
conn.Raw(func(_ any) error {
return sqldriver.ErrBadConn
})
}()
} else {
defer conn.Close()
}
b := time.Now()
_, err = conn.ExecContext(ctx, "call dolt_gc()")
if err != nil {
if !assert.NotContains(t, err.Error(), "dangling ref") {
return err
}
if !assert.NotContains(t, err.Error(), "is unexpected noms value") {
return err
}
if !assert.NotContains(t, err.Error(), "interface conversion: types.Value is nil") {
return err
}
t.Logf("err in Exec dolt_gc: %v", err)
} else {
if assert.NoError(t, err) {
t.Logf("successful dolt_gc took %v", time.Since(b))
}
return nil
Expand Down Expand Up @@ -184,7 +210,14 @@ func (gct gcTest) run(t *testing.T) {
repo, err := rs.MakeRepo("concurrent_gc_test")
require.NoError(t, err)

server := MakeServer(t, repo, &driver.Server{})
srvSettings := &driver.Server{}
if gct.sessionAware {
srvSettings.Envs = append(srvSettings.Envs, "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE=session_aware")
} else {
srvSettings.Envs = append(srvSettings.Envs, "DOLT_GC_SAFEPOINT_CONTROLLER_CHOICE=kill_connections")
}

server := MakeServer(t, repo, srvSettings)
server.DBName = "concurrent_gc_test"

db, err := server.DB(driver.Connection{User: "root"})
Expand Down

0 comments on commit fc3217e

Please sign in to comment.