Skip to content

Commit

Permalink
record offsets in db
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jul 26, 2024
1 parent d81c37d commit e86edf4
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 70 deletions.
40 changes: 40 additions & 0 deletions services/horizon/internal/db2/history/key_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package history
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/errors"
)

Expand All @@ -18,6 +21,7 @@ const (
stateInvalid = "exp_state_invalid"
offerCompactionSequence = "offer_compaction_sequence"
liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence"
lookupTableReapOffsetSuffix = "_reap_offset"
)

// GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but
Expand Down Expand Up @@ -203,6 +207,42 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) (
return value, nil
}

type KeyValuePair struct {
Key string `db:"key"`
Value string `db:"value"`
}

func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) {
keys := make([]string, 0, len(historyLookupTables))
for table := range historyLookupTables {
keys = append(keys, table+lookupTableReapOffsetSuffix)
}
offsets := map[string]int64{}
var pairs []KeyValuePair
query := sq.Select("key", "value").
From("key_value_store").
Where(map[string]interface{}{
"key": keys,
})
err := q.Select(ctx, &pairs, query)
for _, pair := range pairs {
table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix)
if _, ok := historyLookupTables[table]; !ok {
return nil, fmt.Errorf("invalid key: %s", pair.Key)
}
offset, err := strconv.ParseInt(pair.Value, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid offset: %s", pair.Value)
}
offsets[table] = offset
}
return offsets, err
}

func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error {
return q.updateValueInStore(ctx, table+lookupTableReapOffsetSuffix, strconv.FormatInt(offset, 10))
}

// updateValueInStore updates a value for a given key in KV store
func (q *Q) updateValueInStore(ctx context.Context, key, value string) error {
query := sq.Insert("key_value_store").
Expand Down
141 changes: 76 additions & 65 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error)
ReapLookupTables(ctx context.Context) (map[string]LookupTableReapResult, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -981,80 +981,23 @@ type LookupTableReapResult struct {
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
func (q *Q) ReapLookupTables(ctx context.Context) (
map[string]LookupTableReapResult,
error,
) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

offsets, err := q.getLookupTableReapOffsets(ctx)
if err != nil {
return nil, fmt.Errorf("could not obtain offsets: %w", err)
}

const batchSize = 1000

results := map[string]LookupTableReapResult{}
for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_transaction_participants",
objectField: "history_account_id",
},

{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
},
"history_assets": {
{
name: "history_trades",
objectField: "base_asset_id",
},
{
name: "history_trades",
objectField: "counter_asset_id",
},
{
name: "history_trades_60000",
objectField: "base_asset_id",
},
{
name: "history_trades_60000",
objectField: "counter_asset_id",
},
},
"history_claimable_balances": {
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
} {
for table, historyTables := range historyLookupTables {
startTime := time.Now()
query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])

Expand All @@ -1077,6 +1020,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
return nil, errors.Wrapf(err, "error running query: %s", query)
}

if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil {
return nil, fmt.Errorf("error updating offset: %w", err)
}

rows, err := res.RowsAffected()
if err != nil {
return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
Expand All @@ -1091,6 +1038,70 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
return results, nil
}

var historyLookupTables = map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_transaction_participants",
objectField: "history_account_id",
},

{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
},
"history_assets": {
{
name: "history_trades",
objectField: "base_asset_id",
},
{
name: "history_trades",
objectField: "counter_asset_id",
},
{
name: "history_trades_60000",
objectField: "base_asset_id",
},
{
name: "history_trades_60000",
objectField: "counter_asset_id",
},
},
"history_claimable_balances": {
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
// as an example):
//
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) {
err = q.Begin(tt.Ctx)
tt.Require.NoError(err)

results, err := q.ReapLookupTables(tt.Ctx, nil)
results, err := q.ReapLookupTables(tt.Ctx)
tt.Require.NoError(err)

err = q.Commit()
Expand Down
5 changes: 1 addition & 4 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ type system struct {

runStateVerificationOnLedger func(uint32) bool

reapOffsetByTable map[string]int64
maxLedgerPerFlush uint32

reaper *Reaper
Expand Down Expand Up @@ -369,7 +368,6 @@ func NewSystem(config Config) (System, error) {
config.ReapConfig,
config.HistorySession,
),
reapOffsetByTable: map[string]int64{},
}

system.initMetrics()
Expand Down Expand Up @@ -843,7 +841,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
defer cancel()

reapStart := time.Now()
results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsetByTable)
results, err := s.historyQ.ReapLookupTables(ctx)
if err != nil {
log.WithError(err).Warn("Error reaping lookup tables")
return
Expand All @@ -860,7 +858,6 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
for table, result := range results {
totalDeleted += result.RowsDeleted
reapLog = reapLog.WithField(table, result)
s.reapOffsetByTable[table] = result.Offset
s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds())
}
Expand Down

0 comments on commit e86edf4

Please sign in to comment.