diff --git a/build/dipdup.yml b/build/dipdup.yml index d2a3d46..44b5a12 100644 --- a/build/dipdup.yml +++ b/build/dipdup.yml @@ -19,6 +19,9 @@ datasources: node: url: ${STARKNET_NODE_URL} rps: ${STARKNET_NODE_RPS:-5} + fallback: + url: ${STARKNET_FALLBACK_NODE_URL} + rps: ${STARKNET_FALLBACK_NODE_RPS:-1} database: kind: postgres diff --git a/pkg/indexer/receiver/receiver.go b/pkg/indexer/receiver/receiver.go index 0a36ea3..0262186 100644 --- a/pkg/indexer/receiver/receiver.go +++ b/pkg/indexer/receiver/receiver.go @@ -26,6 +26,7 @@ type Result struct { // Receiver - type Receiver struct { api API + fallbackAPI API result chan Result pool *workerpool.Pool[uint64] processing map[uint64]struct{} @@ -60,6 +61,11 @@ func NewReceiver(cfg config.Config, ds map[string]ddConfig.DataSource) (*Receive wg: new(sync.WaitGroup), } + fallbackDs, ok := ds["fallback"] + if ok { + receiver.fallbackAPI = NewNode(fallbackDs) + } + if receiver.timeout == 0 { receiver.timeout = 10 * time.Second } @@ -131,6 +137,7 @@ func (r *Receiver) worker(ctx context.Context, height uint64) { go func(mx *sync.Mutex, wg *sync.WaitGroup) { defer wg.Done() + api := r.api for { select { case <-ctx.Done(): @@ -138,13 +145,15 @@ func (r *Receiver) worker(ctx context.Context, height uint64) { default: } - response, err := r.api.TraceBlock(ctx, blockId) + response, err := api.TraceBlock(ctx, blockId) if err != nil { if errors.Is(err, context.Canceled) { return } r.log.Err(err).Uint64("height", height).Msg("get block traces request") time.Sleep(time.Second) + r.log.Warn().Msg("trying fallback node...") + api = r.fallbackAPI continue }