Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into nicktobey/lazy-load
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktobey committed Jan 30, 2025
2 parents d27b998 + 337671a commit b1aa361
Show file tree
Hide file tree
Showing 54 changed files with 1,596 additions and 785 deletions.
3 changes: 3 additions & 0 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ func (se *SqlEngine) GetUnderlyingEngine() *gms.Engine {

func (se *SqlEngine) Close() error {
if se.engine != nil {
if se.engine.Analyzer.Catalog.BinlogReplicaController != nil {
dblr.DoltBinlogReplicaController.Close()
}
return se.engine.Close()
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/dolt/doltversion/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
package doltversion

const (
Version = "1.47.2"
Version = "1.48.0"
)
2 changes: 1 addition & 1 deletion go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.19.1-0.20250124213954-8a1af52235d7
github.com/dolthub/go-mysql-server v0.19.1-0.20250128182847-3f5bb8c52cd8
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
github.com/dolthub/swiss v0.1.0
github.com/esote/minmaxheap v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90 h1:Sni8jrP0sy/w9ZYXoff4g/ixe+7bFCZlfCqXKJSU+zM=
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90/go.mod h1:ylU4XjUpsMcvl/BKeRRMXSH7e7WBrPXdSLvnRJYrxEA=
github.com/dolthub/go-mysql-server v0.19.1-0.20250124213954-8a1af52235d7 h1:DjirOAU+gMlWqr3Ut9PsVT5iqdirAcLr84Cbbi60Kis=
github.com/dolthub/go-mysql-server v0.19.1-0.20250124213954-8a1af52235d7/go.mod h1:jYEJ8tNkA7K3k39X8iMqaX3MSMmViRgh222JSLHDgVc=
github.com/dolthub/go-mysql-server v0.19.1-0.20250128182847-3f5bb8c52cd8 h1:eEGYHOC5Ft+56yPaH26gsdbonrZ2EiTwQLy8Oj3TAFE=
github.com/dolthub/go-mysql-server v0.19.1-0.20250128182847-3f5bb8c52cd8/go.mod h1:jYEJ8tNkA7K3k39X8iMqaX3MSMmViRgh222JSLHDgVc=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718 h1:lT7hE5k+0nkBdj/1UOSFwjWpNxf+LCApbRHgnCA17XE=
Expand Down
105 changes: 93 additions & 12 deletions go/libraries/doltcore/env/actions/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/dolthub/dolt/go/store/datas"
)

const DefaultPageSize = 100

type TagProps struct {
TaggerName string
TaggerEmail string
Expand Down Expand Up @@ -97,7 +99,31 @@ func DeleteTagsOnDB(ctx context.Context, ddb *doltdb.DoltDB, tagNames ...string)
return nil
}

// IterResolvedTags iterates over tags in dEnv.DoltDB(ctx) from newest to oldest, resolving the tag to a commit and calling cb().
// IterUnresolvedTags iterates over tags in dEnv.DoltDB, and calls cb() for each with an unresolved Tag.
func IterUnresolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *doltdb.TagResolver) (stop bool, err error)) error {
tagRefs, err := ddb.GetTags(ctx)
if err != nil {
return err
}

tagResolvers, err := ddb.GetTagResolvers(ctx, tagRefs)
if err != nil {
return err
}

for _, tagResolver := range tagResolvers {
stop, err := cb(&tagResolver)
if err != nil {
return err
}
if stop {
break
}
}
return nil
}

// IterResolvedTags iterates over tags in dEnv.DoltDB from newest to oldest, resolving the tag to a commit and calling cb().
func IterResolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *doltdb.Tag) (stop bool, err error)) error {
tagRefs, err := ddb.GetTags(ctx)

Expand All @@ -109,7 +135,7 @@ func IterResolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *dolt
for _, r := range tagRefs {
tr, ok := r.(ref.TagRef)
if !ok {
return fmt.Errorf("doltDB.GetTags() returned non-tag DoltRef")
return fmt.Errorf("DoltDB.GetTags() returned non-tag DoltRef")
}

tag, err := ddb.ResolveTag(ctx, tr)
Expand Down Expand Up @@ -138,26 +164,81 @@ func IterResolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *dolt
return nil
}

// IterUnresolvedTags iterates over tags in dEnv.DoltDB, and calls cb() for each with an unresovled Tag.
func IterUnresolvedTags(ctx context.Context, ddb *doltdb.DoltDB, cb func(tag *doltdb.TagResolver) (stop bool, err error)) error {
// IterResolvedTagsPaginated iterates over tags in dEnv.DoltDB in their default lexicographical order, resolving the tag to a commit and calling cb().
// Returns the next tag name if there are more results available.
func IterResolvedTagsPaginated(ctx context.Context, ddb *doltdb.DoltDB, startTag string, cb func(tag *doltdb.Tag) (stop bool, err error)) (string, error) {
// tags returned here are sorted lexicographically
tagRefs, err := ddb.GetTags(ctx)
if err != nil {
return err
return "", err
}

tagResolvers, err := ddb.GetTagResolvers(ctx, tagRefs)
if err != nil {
return err
// find starting index based on start tag
startIdx := 0
if startTag != "" {
for i, tr := range tagRefs {
if tr.GetPath() == startTag {
startIdx = i + 1 // start after the given tag
break
}
}
}

for _, tagResolver := range tagResolvers {
stop, err := cb(&tagResolver)
// get page of results
endIdx := startIdx + DefaultPageSize
if endIdx > len(tagRefs) {
endIdx = len(tagRefs)
}

pageTagRefs := tagRefs[startIdx:endIdx]

// resolve tags for this page
for _, tr := range pageTagRefs {
tag, err := ddb.ResolveTag(ctx, tr.(ref.TagRef))
if err != nil {
return err
return "", err
}

stop, err := cb(tag)
if err != nil {
return "", err
}

if stop {
break
}
}
return nil

// return next tag name if there are more results
if endIdx < len(tagRefs) {
lastTag := pageTagRefs[len(pageTagRefs)-1]
return lastTag.GetPath(), nil
}

return "", nil
}

// VisitResolvedTag iterates over tags in ddb until the given tag name is found, then calls cb() with the resolved tag.
func VisitResolvedTag(ctx context.Context, ddb *doltdb.DoltDB, tagName string, cb func(tag *doltdb.Tag) error) error {
tagRefs, err := ddb.GetTags(ctx)
if err != nil {
return err
}

for _, r := range tagRefs {
tr, ok := r.(ref.TagRef)
if !ok {
return fmt.Errorf("DoltDB.GetTags() returned non-tag DoltRef")
}

if tr.GetPath() == tagName {
tag, err := ddb.ResolveTag(ctx, tr)
if err != nil {
return err
}
return cb(tag)
}
}

return doltdb.ErrTagNotFound
}
150 changes: 150 additions & 0 deletions go/libraries/doltcore/env/actions/tag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package actions

import (
"context"
"fmt"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/types"
)

const (
testHomeDir = "/user/bheni"
workingDir = "/user/bheni/datasets/addresses"
credsDir = "creds"

configFile = "config.json"
GlobalConfigFile = "config_global.json"
)

func testHomeDirFunc() (string, error) {
return testHomeDir, nil
}

func createTestEnv() (*env.DoltEnv, *filesys.InMemFS) {
initialDirs := []string{testHomeDir, workingDir}
initialFiles := map[string][]byte{}

fs := filesys.NewInMemFS(initialDirs, initialFiles, workingDir)
dEnv := env.Load(context.Background(), testHomeDirFunc, fs, doltdb.InMemDoltDB, "test")

return dEnv, fs
}

func TestVisitResolvedTag(t *testing.T) {
dEnv, _ := createTestEnv()
ctx := context.Background()

// Initialize repo
err := dEnv.InitRepo(ctx, types.Format_Default, "test user", "[email protected]", "main")
require.NoError(t, err)

// Create a tag
tagName := "test-tag"
tagMsg := "test tag message"
err = CreateTag(ctx, dEnv, tagName, "main", TagProps{TaggerName: "test user", TaggerEmail: "[email protected]", Description: tagMsg})
require.NoError(t, err)

// Visit the tag and verify its properties
var foundTag *doltdb.Tag
err = VisitResolvedTag(ctx, dEnv.DoltDB, tagName, func(tag *doltdb.Tag) error {
foundTag = tag
return nil
})
require.NoError(t, err)
require.NotNil(t, foundTag)
require.Equal(t, tagName, foundTag.Name)
require.Equal(t, tagMsg, foundTag.Meta.Description)

// Test visiting non-existent tag
err = VisitResolvedTag(ctx, dEnv.DoltDB, "non-existent-tag", func(tag *doltdb.Tag) error {
return nil
})
require.Equal(t, doltdb.ErrTagNotFound, err)
}

func TestIterResolvedTagsPaginated(t *testing.T) {
dEnv, _ := createTestEnv()
ctx := context.Background()

// Initialize repo
err := dEnv.InitRepo(ctx, types.Format_Default, "test user", "[email protected]", "main")
require.NoError(t, err)

expectedTagNames := make([]string, DefaultPageSize*2)
// Create multiple tags with different timestamps
tagNames := make([]string, DefaultPageSize*2)
for i := range tagNames {
tagName := fmt.Sprintf("tag-%d", i)
err = CreateTag(ctx, dEnv, tagName, "main", TagProps{
TaggerName: "test user",
TaggerEmail: "[email protected]",
Description: fmt.Sprintf("test tag %s", tagName),
})
time.Sleep(2 * time.Millisecond)
require.NoError(t, err)
tagNames[i] = tagName
expectedTagNames[i] = tagName
}

// Sort expected tag names to ensure they are in the correct order
sort.Strings(expectedTagNames)

// Test first page
var foundTags []string
pageToken, err := IterResolvedTagsPaginated(ctx, dEnv.DoltDB, "", func(tag *doltdb.Tag) (bool, error) {
foundTags = append(foundTags, tag.Name)
return false, nil
})
require.NoError(t, err)
require.NotEmpty(t, pageToken) // Should have next page
require.Equal(t, DefaultPageSize, len(foundTags)) // Default page size tags returned
require.Equal(t, expectedTagNames[:DefaultPageSize], foundTags)

// Test second page
var secondPageTags []string
nextPageToken, err := IterResolvedTagsPaginated(ctx, dEnv.DoltDB, pageToken, func(tag *doltdb.Tag) (bool, error) {
secondPageTags = append(secondPageTags, tag.Name)
return false, nil
})

require.NoError(t, err)
require.Empty(t, nextPageToken) // Should be no more pages
require.Equal(t, DefaultPageSize, len(secondPageTags)) // Remaining tags
require.Equal(t, expectedTagNames[DefaultPageSize:], secondPageTags)

// Verify all tags were found
allFoundTags := append(foundTags, secondPageTags...)
require.Equal(t, len(tagNames), len(allFoundTags))
require.Equal(t, expectedTagNames, allFoundTags)

// Test early termination
var earlyTermTags []string
_, err = IterResolvedTagsPaginated(ctx, dEnv.DoltDB, "", func(tag *doltdb.Tag) (bool, error) {
earlyTermTags = append(earlyTermTags, tag.Name)
return true, nil // Stop after first tag
})
require.NoError(t, err)
require.Equal(t, 1, len(earlyTermTags))
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func persistReplicaRunningState(ctx *sql.Context, state replicaRunningState) err

// loadReplicationConfiguration loads the replication configuration for default channel ("") from
// the "mysql" database, |mysqlDb|.
func loadReplicationConfiguration(_ *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
func loadReplicationConfiguration(ctx *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
rd := mysqlDb.Reader()
defer rd.Close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) {
// Restart the MySQL replica and reconnect to the Dolt primary
prevPrimaryDatabase := primaryDatabase
var err error
mySqlPort, mySqlProcess, err = startMySqlServer(testDir)
mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir)
require.NoError(t, err)
replicaDatabase = primaryDatabase
primaryDatabase = prevPrimaryDatabase
Expand Down Expand Up @@ -1042,7 +1042,7 @@ func outputReplicaApplierStatus(t *testing.T) {
newRows, err := replicaDatabase.Queryx("select * from performance_schema.replication_applier_status_by_worker;")
require.NoError(t, err)
allNewRows := readAllRowsIntoMaps(t, newRows)
fmt.Printf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows)
t.Logf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows)
}

// outputShowReplicaStatus prints out replica status information. This is useful for debugging
Expand All @@ -1052,7 +1052,7 @@ func outputShowReplicaStatus(t *testing.T) {
newRows, err := replicaDatabase.Queryx("show replica status;")
require.NoError(t, err)
allNewRows := readAllRowsIntoMaps(t, newRows)
fmt.Printf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows)
t.Logf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows)
}

// copyMap returns a copy of the specified map |m|.
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func waitForReplicaToReconnect(t *testing.T) {
func mustRestartDoltPrimaryServer(t *testing.T) {
var err error
prevReplicaDatabase := replicaDatabase
doltPort, doltProcess, err = startDoltSqlServer(testDir, nil)
doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil)
require.NoError(t, err)
primaryDatabase = replicaDatabase
replicaDatabase = prevReplicaDatabase
Expand All @@ -1109,7 +1109,7 @@ func mustRestartDoltPrimaryServer(t *testing.T) {
func mustRestartMySqlReplicaServer(t *testing.T) {
var err error
prevPrimaryDatabase := primaryDatabase
mySqlPort, mySqlProcess, err = startMySqlServer(testDir)
mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir)
require.NoError(t, err)
replicaDatabase = primaryDatabase
primaryDatabase = prevPrimaryDatabase
Expand Down
Loading

0 comments on commit b1aa361

Please sign in to comment.