diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index a7efa58f2c..e5f5cc1e95 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -97,20 +97,20 @@ func (s LedgerMetaBackendType) String() string { case LedgerBackendPrecomputed: return "precomputed" default: - return "" + return "" } } type BufferedBackendConfig struct { - BufferSize uint32 `toml:"size"` - NumWorkers uint32 `toml:"num_workers"` - RetryLimit uint32 `toml:"retry_limit"` - RetryWait time.Duration `toml:"retry_wait"` + BufferSize uint32 `toml:"size"` + NumWorkers uint32 `toml:"num_workers"` + RetryLimit uint32 `toml:"retry_limit"` + RetryWait time.Duration `toml:"retry_wait"` } type PrecomputedLedgerMetaConfig struct { - DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` - BufferedBackendConfig BufferedBackendConfig `toml:"buffered_backend_config"` + DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` + BufferedBackendConfig BufferedBackendConfig `toml:"buffered_backend_config"` } type Config struct { @@ -148,7 +148,7 @@ type Config struct { ReapConfig ReapConfig - PrecomputedMetaConfig PrecomputedLedgerMetaConfig + PrecomputedMetaConfig *PrecomputedLedgerMetaConfig } const ( @@ -297,28 +297,63 @@ func NewSystem(config Config) (System, error) { return nil, errors.Wrap(err, "error creating history archive") } - // the only ingest option is local captive core config - logger := log.WithField("subservice", "stellar-core") - ledgerBackend, err := ledgerbackend.NewCaptive( - ledgerbackend.CaptiveCoreConfig{ - BinaryPath: config.CaptiveCoreBinaryPath, - StoragePath: config.CaptiveCoreStoragePath, - UseDB: config.CaptiveCoreConfigUseDB, - Toml: config.CaptiveCoreToml, - NetworkPassphrase: config.NetworkPassphrase, - HistoryArchiveURLs: config.HistoryArchiveURLs, - CheckpointFrequency: config.CheckpointFrequency, - LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), - Log: logger, - Context: ctx, - UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()), - CoreProtocolVersionFn: config.CoreProtocolVersionFn, - CoreBuildVersionFn: config.CoreBuildVersionFn, - }, - ) - if err != nil { + var ledgerBackend ledgerbackend.LedgerBackend + + switch config.LedgerMetaBackendType { + case LedgerBackendCaptiveCore: + logger := log.WithField("subservice", "stellar-core") + ledgerBackend, err = ledgerbackend.NewCaptive( + ledgerbackend.CaptiveCoreConfig{ + BinaryPath: config.CaptiveCoreBinaryPath, + StoragePath: config.CaptiveCoreStoragePath, + UseDB: config.CaptiveCoreConfigUseDB, + Toml: config.CaptiveCoreToml, + NetworkPassphrase: config.NetworkPassphrase, + HistoryArchiveURLs: config.HistoryArchiveURLs, + CheckpointFrequency: config.CheckpointFrequency, + LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), + Log: logger, + Context: ctx, + UserAgent: fmt.Sprintf("captivecore horizon/%s golang/%s", apkg.Version(), runtime.Version()), + CoreProtocolVersionFn: config.CoreProtocolVersionFn, + CoreBuildVersionFn: config.CoreBuildVersionFn, + }, + ) + if err != nil { + cancel() + return nil, errors.Wrap(err, "error creating captive core backend") + } + log.Infof("successfully created ledger backend of type captive core") + case LedgerBackendPrecomputed: + if config.PrecomputedMetaConfig == nil { + cancel() + return nil, errors.New("error creating precomputed buffered backend, precomputed backend config is not present") + } + precompConfig := config.PrecomputedMetaConfig + + dataStore, err := datastore.NewDataStore(ctx, precompConfig.DataStoreConfig) + if err != nil { + cancel() + return nil, errors.Wrapf(err, "error creating datastore from config, %v", precompConfig.DataStoreConfig) + } + + bufferedConfig := ledgerbackend.BufferedStorageBackendConfig{ + LedgerBatchConfig: precompConfig.DataStoreConfig.Schema, + DataStore: dataStore, + BufferSize: precompConfig.BufferedBackendConfig.BufferSize, + NumWorkers: precompConfig.BufferedBackendConfig.NumWorkers, + RetryLimit: precompConfig.BufferedBackendConfig.RetryLimit, + RetryWait: precompConfig.BufferedBackendConfig.RetryWait, + } + + if ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(ctx, bufferedConfig); err != nil { + cancel() + return nil, errors.Wrapf(err, "error creating buffered storage backend, %v", bufferedConfig) + } + log.Infof("successfully created ledger backend of type buffered storage") + default: cancel() - return nil, errors.Wrap(err, "error creating captive core backend") + return nil, errors.Errorf("unsupported ledger backend type %v", config.LedgerMetaBackendType.String()) } historyQ := &history.Q{config.HistorySession.Clone()}