diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index a2a170a4b1..f9c49b04a6 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -3,9 +3,12 @@ package history import ( "context" "database/sql" + "fmt" "strconv" + "strings" sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/support/errors" ) @@ -18,6 +21,7 @@ const ( stateInvalid = "exp_state_invalid" offerCompactionSequence = "offer_compaction_sequence" liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence" + lookupTableReapOffsetSuffix = "_reap_offset" ) // GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but @@ -203,6 +207,42 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) ( return value, nil } +type KeyValuePair struct { + Key string `db:"key"` + Value string `db:"value"` +} + +func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) { + keys := make([]string, 0, len(historyLookupTables)) + for table := range historyLookupTables { + keys = append(keys, table+lookupTableReapOffsetSuffix) + } + offsets := map[string]int64{} + var pairs []KeyValuePair + query := sq.Select("key", "value"). + From("key_value_store"). + Where(map[string]interface{}{ + "key": keys, + }) + err := q.Select(ctx, &pairs, query) + for _, pair := range pairs { + table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix) + if _, ok := historyLookupTables[table]; !ok { + return nil, fmt.Errorf("invalid key: %s", pair.Key) + } + offset, err := strconv.ParseInt(pair.Value, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid offset: %s", pair.Value) + } + offsets[table] = offset + } + return offsets, err +} + +func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error { + return q.updateValueInStore(ctx, table+lookupTableReapOffsetSuffix, strconv.FormatInt(offset, 10)) +} + // updateValueInStore updates a value for a given key in KV store func (q *Q) updateValueInStore(ctx context.Context, key, value string) error { query := sq.Insert("key_value_store"). diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 02c0edf143..90face3129 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -282,7 +282,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error) + ReapLookupTables(ctx context.Context) (map[string]LookupTableReapResult, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -981,7 +981,7 @@ type LookupTableReapResult struct { // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. -func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( +func (q *Q) ReapLookupTables(ctx context.Context) ( map[string]LookupTableReapResult, error, ) { @@ -989,72 +989,15 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return nil, errors.New("cannot be called outside of an ingestion transaction") } + offsets, err := q.getLookupTableReapOffsets(ctx) + if err != nil { + return nil, fmt.Errorf("could not obtain offsets: %w", err) + } + const batchSize = 1000 results := map[string]LookupTableReapResult{} - for table, historyTables := range map[string][]tableObjectFieldPair{ - "history_accounts": { - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, - - { - name: "history_effects", - objectField: "history_account_id", - }, - { - name: "history_operation_participants", - objectField: "history_account_id", - }, - { - name: "history_trades", - objectField: "base_account_id", - }, - { - name: "history_trades", - objectField: "counter_account_id", - }, - }, - "history_assets": { - { - name: "history_trades", - objectField: "base_asset_id", - }, - { - name: "history_trades", - objectField: "counter_asset_id", - }, - { - name: "history_trades_60000", - objectField: "base_asset_id", - }, - { - name: "history_trades_60000", - objectField: "counter_asset_id", - }, - }, - "history_claimable_balances": { - { - name: "history_transaction_claimable_balances", - objectField: "history_claimable_balance_id", - }, - { - name: "history_operation_claimable_balances", - objectField: "history_claimable_balance_id", - }, - }, - "history_liquidity_pools": { - { - name: "history_transaction_liquidity_pools", - objectField: "history_liquidity_pool_id", - }, - { - name: "history_operation_liquidity_pools", - objectField: "history_liquidity_pool_id", - }, - }, - } { + for table, historyTables := range historyLookupTables { startTime := time.Now() query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) @@ -1077,6 +1020,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return nil, errors.Wrapf(err, "error running query: %s", query) } + if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { + return nil, fmt.Errorf("error updating offset: %w", err) + } + rows, err := res.RowsAffected() if err != nil { return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) @@ -1091,6 +1038,70 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return results, nil } +var historyLookupTables = map[string][]tableObjectFieldPair{ + "history_accounts": { + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, + + { + name: "history_effects", + objectField: "history_account_id", + }, + { + name: "history_operation_participants", + objectField: "history_account_id", + }, + { + name: "history_trades", + objectField: "base_account_id", + }, + { + name: "history_trades", + objectField: "counter_account_id", + }, + }, + "history_assets": { + { + name: "history_trades", + objectField: "base_asset_id", + }, + { + name: "history_trades", + objectField: "counter_asset_id", + }, + { + name: "history_trades_60000", + objectField: "base_asset_id", + }, + { + name: "history_trades_60000", + objectField: "counter_asset_id", + }, + }, + "history_claimable_balances": { + { + name: "history_transaction_claimable_balances", + objectField: "history_claimable_balance_id", + }, + { + name: "history_operation_claimable_balances", + objectField: "history_claimable_balance_id", + }, + }, + "history_liquidity_pools": { + { + name: "history_transaction_liquidity_pools", + objectField: "history_liquidity_pool_id", + }, + { + name: "history_operation_liquidity_pools", + objectField: "history_liquidity_pool_id", + }, + }, +} + // constructReapLookupTablesQuery creates a query like (using history_claimable_balances // as an example): // diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 0f033c3629..0b209cf7d7 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin(tt.Ctx) tt.Require.NoError(err) - results, err := q.ReapLookupTables(tt.Ctx, nil) + results, err := q.ReapLookupTables(tt.Ctx) tt.Require.NoError(err) err = q.Commit() diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 1a54e6843c..508da0c7da 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -253,7 +253,6 @@ type system struct { runStateVerificationOnLedger func(uint32) bool - reapOffsetByTable map[string]int64 maxLedgerPerFlush uint32 reaper *Reaper @@ -369,7 +368,6 @@ func NewSystem(config Config) (System, error) { config.ReapConfig, config.HistorySession, ), - reapOffsetByTable: map[string]int64{}, } system.initMetrics() @@ -843,7 +841,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsetByTable) + results, err := s.historyQ.ReapLookupTables(ctx) if err != nil { log.WithError(err).Warn("Error reaping lookup tables") return @@ -860,7 +858,6 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { for table, result := range results { totalDeleted += result.RowsDeleted reapLog = reapLog.WithField(table, result) - s.reapOffsetByTable[table] = result.Offset s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) }