Skip to content

Commit

Permalink
Stats purge and prune (#8451)
Browse files Browse the repository at this point in the history
* Stats purge and prune

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* fix purge test

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* more fixes

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* tidy

* better branch handling

* race edits

* try to fix raceg

* try to fix race

* fix lock conflict

---------

Co-authored-by: max-hoffman <[email protected]>
  • Loading branch information
max-hoffman and max-hoffman authored Oct 21, 2024
1 parent 228b3e1 commit 8973ba2
Show file tree
Hide file tree
Showing 12 changed files with 406 additions and 31 deletions.
2 changes: 2 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var DoltProcedures = []sql.ExternalStoredProcedureDetails{
{Name: "dolt_stats_restart", Schema: statsFuncSchema, Function: statsFunc(statsRestart)},
{Name: "dolt_stats_stop", Schema: statsFuncSchema, Function: statsFunc(statsStop)},
{Name: "dolt_stats_status", Schema: statsFuncSchema, Function: statsFunc(statsStatus)},
{Name: "dolt_stats_prune", Schema: statsFuncSchema, Function: statsFunc(statsPrune)},
{Name: "dolt_stats_purge", Schema: statsFuncSchema, Function: statsFunc(statsPurge)},
}

// stringSchema returns a non-nullable schema with all columns as LONGTEXT.
Expand Down
29 changes: 29 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/stats_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type AutoRefreshStatsProvider interface {
CancelRefreshThread(string)
StartRefreshThread(*sql.Context, dsess.DoltDatabaseProvider, string, *env.DoltEnv, dsess.SqlDatabase) error
ThreadStatus(string) string
Prune(ctx *sql.Context) error
Purge(ctx *sql.Context) error
}

// statsRestart tries to stop and then start a refresh thread
Expand Down Expand Up @@ -124,3 +126,30 @@ func statsDrop(ctx *sql.Context) (interface{}, error) {
}
return fmt.Sprintf("deleted stats ref for %s", dbName), nil
}

// statsPrune replaces the current disk contents with only the currently
// tracked in memory statistics.
func statsPrune(ctx *sql.Context) (interface{}, error) {
dSess := dsess.DSessFromSess(ctx.Session)
pro, ok := dSess.StatsProvider().(AutoRefreshStatsProvider)
if !ok {
return nil, fmt.Errorf("stats not persisted, cannot purge")
}
if err := pro.Prune(ctx); err != nil {
return "failed to prune stats databases", err
}
return "pruned all stats databases", nil
}

// statsPurge removes the stats database from disk
func statsPurge(ctx *sql.Context) (interface{}, error) {
dSess := dsess.DSessFromSess(ctx.Session)
pro, ok := dSess.StatsProvider().(AutoRefreshStatsProvider)
if !ok {
return nil, fmt.Errorf("stats not persisted, cannot purge")
}
if err := pro.Purge(ctx); err != nil {
return "failed to purged databases", err
}
return "purged all database stats", nil
}
44 changes: 44 additions & 0 deletions go/libraries/doltcore/sqle/enginetest/stats_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,50 @@ var StatProcTests = []queries.ScriptTest{
},
},
},
{
Name: "test purge",
SetUpScript: []string{
"set @@PERSIST.dolt_stats_auto_refresh_enabled = 0;",
"CREATE table xy (x bigint primary key, y int, z varchar(500), key(y,z));",
"insert into xy values (1, 1, 'a'), (2,1,'a'), (3,1,'a'), (4,2,'b'), (5,2,'b'), (6,3,'c');",
"analyze table xy",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "select count(*) as cnt from dolt_statistics group by table_name, index_name order by cnt",
Expected: []sql.Row{{1}, {1}},
},
{
Query: "call dolt_stats_purge()",
},
{
Query: "select count(*) from dolt_statistics;",
Expected: []sql.Row{{0}},
},
},
},
{
Name: "test prune",
SetUpScript: []string{
"set @@PERSIST.dolt_stats_auto_refresh_enabled = 0;",
"CREATE table xy (x bigint primary key, y int, z varchar(500), key(y,z));",
"insert into xy values (1, 1, 'a'), (2,1,'a'), (3,1,'a'), (4,2,'b'), (5,2,'b'), (6,3,'c');",
"analyze table xy",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "select count(*) as cnt from dolt_statistics group by table_name, index_name order by cnt",
Expected: []sql.Row{{1}, {1}},
},
{
Query: "call dolt_stats_prune()",
},
{
Query: "select count(*) from dolt_statistics;",
Expected: []sql.Row{{2}},
},
},
},
}

// TestProviderReloadScriptWithEngine runs the test script given with the engine provided.
Expand Down
14 changes: 13 additions & 1 deletion go/libraries/doltcore/sqle/statsnoms/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/earl"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/types"
Expand Down Expand Up @@ -132,13 +133,24 @@ func (n *NomsStatsDatabase) Close() error {
return n.destDb.DbData().Ddb.Close()
}

func (n *NomsStatsDatabase) Branches() []string {
return n.branches
}

func (n *NomsStatsDatabase) LoadBranchStats(ctx *sql.Context, branch string) error {
statsMap, err := n.destDb.DbData().Ddb.GetStatistics(ctx, branch)
if errors.Is(err, doltdb.ErrNoStatistics) {
return nil
return n.trackBranch(ctx, branch)
} else if errors.Is(err, datas.ErrNoBranchStats) {
return n.trackBranch(ctx, branch)
} else if err != nil {
return err
}
if cnt, err := statsMap.Count(); err != nil {
return err
} else if cnt == 0 {
return n.trackBranch(ctx, branch)
}
doltStats, err := loadStats(ctx, n.sourceDb, statsMap)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions go/libraries/doltcore/sqle/statsnoms/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (n *NomsStatsDatabase) replaceStats(ctx context.Context, statsMap *prolly.M
}

func deleteIndexRows(ctx context.Context, statsMap *prolly.MutableMap, dStats *statspro.DoltStats) error {
if ctx.Err() != nil {
return ctx.Err()
}
sch := schema.StatsTableDoltSchema
kd, _ := sch.GetMapDescriptors()

Expand Down Expand Up @@ -89,6 +92,9 @@ func deleteIndexRows(ctx context.Context, statsMap *prolly.MutableMap, dStats *s
}

func putIndexRows(ctx context.Context, statsMap *prolly.MutableMap, dStats *statspro.DoltStats) error {
if ctx.Err() != nil {
return ctx.Err()
}
sch := schema.StatsTableDoltSchema
kd, vd := sch.GetMapDescriptors()

Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/statspro/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (p *Provider) InitAutoRefreshWithParams(ctxFactory func(ctx context.Context
defer p.mu.Unlock()

dropDbCtx, dbStatsCancel := context.WithCancel(context.Background())
p.cancelers[dbName] = dbStatsCancel
p.autoCtxCancelers[dbName] = dbStatsCancel

return bThreads.Add(fmt.Sprintf("%s_%s", asyncAutoRefreshStats, dbName), func(ctx context.Context) {
ticker := time.NewTicker(checkInterval + time.Nanosecond)
Expand Down
9 changes: 6 additions & 3 deletions go/libraries/doltcore/sqle/statspro/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ func (p *Provider) Load(ctx *sql.Context, fs filesys.Filesys, db dsess.SqlDataba
}

for _, branch := range branches {
err = statsDb.LoadBranchStats(ctx, branch)
if err != nil {
if err = statsDb.LoadBranchStats(ctx, branch); err != nil {
// if branch name is invalid, continue loading rest
// TODO: differentiate bad branch name from other errors
ctx.GetLogger().Errorf("load stats failure: %s\n", err.Error())
ctx.GetLogger().Errorf("load stats init failure: %s\n", err.Error())
continue
}
if err := statsDb.Flush(ctx, branch); err != nil {
ctx.GetLogger().Errorf("load stats flush failure: %s\n", err.Error())
continue
}
}
Expand Down
1 change: 1 addition & 0 deletions go/libraries/doltcore/sqle/statspro/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Database interface {

SetLatestHash(branch, tableName string, h hash.Hash)
GetLatestHash(branch, tableName string) hash.Hash
Branches() []string
}

// StatsFactory instances construct statistic databases.
Expand Down
Loading

0 comments on commit 8973ba2

Please sign in to comment.