Skip to content

Commit

Permalink
go: sqle,doltdb: Move commit hooks from doltdb to sqle.
Browse files Browse the repository at this point in the history
  • Loading branch information
reltuk committed Feb 5, 2025
1 parent 12488b1 commit 0cbff2c
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 52 deletions.
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func TestCommitHooksNoErrors(t *testing.T) {
t.Error("failed to produce noop hook")
} else {
switch h := hooks[0].(type) {
case *doltdb.LogHook:
case *sqle.LogHook:
default:
t.Errorf("expected LogHook, found: %s", h)
}
Expand Down
5 changes: 3 additions & 2 deletions go/cmd/dolt/commands/sqlserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func TestServerArgs(t *testing.T) {
}

func TestDeprecatedUserPasswordServerArgs(t *testing.T) {
ctx := context.Background()
controller := svcs.NewController()
dEnv, err := sqle.CreateEnvWithSeedData()
require.NoError(t, err)
defer func() {
assert.NoError(t, dEnv.DoltDB.Close())
assert.NoError(t, dEnv.DoltDB(ctx).Close())
}()
err = StartServer(context.Background(), "0.0.0", "dolt sql-server", []string{
err = StartServer(ctx, "0.0.0", "dolt sql-server", []string{
"-H", "localhost",
"-P", "15200",
"-u", "username",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package doltdb
package sqle

import (
"context"
Expand All @@ -23,44 +23,39 @@ import (

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)

type PushOnWriteHook struct {
destDB *DoltDB
destDB *doltdb.DoltDB
tmpDir string
out io.Writer
fmt *types.NomsBinFormat
}

var _ CommitHook = (*PushOnWriteHook)(nil)
var _ doltdb.CommitHook = (*PushOnWriteHook)(nil)

// NewPushOnWriteHook creates a ReplicateHook, parameterizaed by the backup database
// and a local tempfile for pushing
func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook {
func NewPushOnWriteHook(destDB *doltdb.DoltDB, tmpDir string) *PushOnWriteHook {
return &PushOnWriteHook{
destDB: destDB,
tmpDir: tmpDir,
fmt: destDB.Format(),
}
}

// Execute implements CommitHook, replicates head updates to the destDb field
func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) {
func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) {
return nil, pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir)
}

func pushDataset(ctx context.Context, destDB, srcDB *DoltDB, ds datas.Dataset, tmpDir string) error {
func pushDataset(ctx context.Context, destDB, srcDB *doltdb.DoltDB, ds datas.Dataset, tmpDir string) error {
addr, ok := ds.MaybeHeadAddr()
if !ok {
rf, err := ref.Parse(ds.ID())
if err != nil {
return err
}
err = destDB.DeleteBranch(ctx, rf, nil)
// TODO: fix up hack usage.
_, err := doltdb.HackDatasDatabaseFromDoltDB(destDB).Delete(ctx, ds, "")
return err
}

Expand Down Expand Up @@ -100,7 +95,7 @@ func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {

type PushArg struct {
ds datas.Dataset
db *DoltDB
db *doltdb.DoltDB
hash hash.Hash
}

Expand All @@ -116,10 +111,10 @@ const (
asyncPushSyncReplica = "async_push_sync_replica"
)

var _ CommitHook = (*AsyncPushOnWriteHook)(nil)
var _ doltdb.CommitHook = (*AsyncPushOnWriteHook)(nil)

// NewAsyncPushOnWriteHook creates a AsyncReplicateHook
func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *DoltDB, tmpDir string, logger io.Writer) (*AsyncPushOnWriteHook, error) {
func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *doltdb.DoltDB, tmpDir string, logger io.Writer) (*AsyncPushOnWriteHook, error) {
ch := make(chan PushArg, asyncPushBufferSize)
err := RunAsyncReplicationThreads(bThreads, ch, destDB, tmpDir, logger)
if err != nil {
Expand All @@ -133,7 +128,7 @@ func (*AsyncPushOnWriteHook) ExecuteForWorkingSets() bool {
}

// Execute implements CommitHook, replicates head updates to the destDb field
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) {
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) {
addr, _ := ds.MaybeHeadAddr()
// TODO: Unconditional push here seems dangerous.
ah.ch <- PushArg{ds: ds, db: db, hash: addr}
Expand All @@ -159,15 +154,15 @@ type LogHook struct {
out io.Writer
}

var _ CommitHook = (*LogHook)(nil)
var _ doltdb.CommitHook = (*LogHook)(nil)

// NewLogHook is a noop that logs to a writer when invoked
func NewLogHook(msg []byte) *LogHook {
return &LogHook{msg: msg}
}

// Execute implements CommitHook, writes message to log channel
func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) {
func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) {
if lh.out != nil {
_, err := lh.out.Write(lh.msg)
return nil, err
Expand All @@ -193,7 +188,7 @@ func (*LogHook) ExecuteForWorkingSets() bool {
return false
}

func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg, destDB *DoltDB, tmpDir string, logger io.Writer) error {
func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg, destDB *doltdb.DoltDB, tmpDir string, logger io.Writer) error {
mu := &sync.Mutex{}
var newHeads = make(map[string]PushArg, asyncPushBufferSize)

Expand Down
Loading

0 comments on commit 0cbff2c

Please sign in to comment.