Skip to content

Commit

Permalink
Fix: indexer module
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Aug 29, 2024
1 parent 423616e commit d1e730e
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 36 deletions.
32 changes: 3 additions & 29 deletions cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type Indexer struct {

client *grpc.Client
storage postgres.Storage
input *modules.Input
state *models.State
subscription grpc.Subscription
subId uint64
Expand All @@ -53,7 +52,6 @@ func NewIndexer(cfg Metadata, datasources map[string]config.DataSource, pg postg
storage: pg,
BaseModule: modules.New("Indexer"),
state: new(models.State),
input: modules.NewInput(InputName),
wg: new(sync.WaitGroup),
}
indexer.channel = NewChannel(pg, indexer.state)
Expand All @@ -64,7 +62,7 @@ func NewIndexer(cfg Metadata, datasources map[string]config.DataSource, pg postg
indexer.filler = filler
indexer.receiver = NewReceiver(cfg.Receiver, pg.TokenMetadata, ipfsNode)

indexer.CreateInput(InputName)
indexer.CreateInputWithCapacity(InputName, 1024*16)
indexer.CreateOutput(OutputName)

return indexer, nil
Expand All @@ -89,11 +87,6 @@ func (indexer *Indexer) Start(ctx context.Context) {
indexer.receiver.Start(ctx)
}

// Name -
func (indexer *Indexer) Name() string {
return "starknet_metadata_indexer"
}

// Subscribe -
func (indexer *Indexer) Subscribe(ctx context.Context, subscriptions map[string]grpc.Subscription) error {
s, ok := subscriptions["metadata"]
Expand Down Expand Up @@ -132,16 +125,6 @@ func (indexer *Indexer) init(ctx context.Context) error {
}
}

// Input - returns input by name
func (indexer *Indexer) Input(name string) (*modules.Input, error) {
switch name {
case InputName:
return indexer.input, nil
default:
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}
}

func (indexer *Indexer) listen(ctx context.Context) {
defer indexer.wg.Done()

Expand All @@ -159,7 +142,7 @@ func (indexer *Indexer) listen(ctx context.Context) {

case msg, ok := <-input.Listen():
if !ok {
continue
return
}

switch typ := msg.(type) {
Expand Down Expand Up @@ -192,7 +175,7 @@ func (indexer *Indexer) reconnectThread(ctx context.Context) {
}
}

func (indexer *Indexer) resubscribe(ctx context.Context, id uint64) error {
func (indexer *Indexer) resubscribe(ctx context.Context, _ uint64) error {
for !indexer.channel.IsEmpty() {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -239,11 +222,6 @@ func (indexer *Indexer) actualFilters(ctx context.Context, sub *grpc.Subscriptio
return nil
}

// Output - returns output by name
func (indexer *Indexer) Output(name string) (*modules.Output, error) {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}

// Unsubscribe -
func (indexer *Indexer) Unsubscribe(ctx context.Context) error {
log.Info().Uint64("id", indexer.subId).Msg("unsubscribing...")
Expand All @@ -269,9 +247,5 @@ func (indexer *Indexer) Close() error {
return err
}

if err := indexer.input.Close(); err != nil {
return err
}

return nil
}
8 changes: 3 additions & 5 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/dipdup-io/starknet-metadata/internal/storage"
"github.com/dipdup-io/starknet-metadata/internal/storage/postgres"
"github.com/dipdup-net/go-lib/hasura"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
"github.com/dipdup-net/indexer-sdk/pkg/modules/printer"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -82,7 +80,7 @@ func main() {
return
}

views, err := createViews(ctx, pg)
views, err := createViews(pg)
if err != nil {
log.Panic().Err(err).Msg("create views")
return
Expand Down Expand Up @@ -117,8 +115,8 @@ func main() {
return
}

if err := modules.Connect(client, indexer, grpc.OutputMessages, printer.InputName); err != nil {
log.Panic().Err(err).Msg("module connect")
if err := indexer.AttachTo(client, grpc.OutputMessages, InputName); err != nil {
log.Panic().Err(err).Msg("connect indexer to grpc module")
return
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/metadata/views.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package main

import (
"context"
"fmt"
"os"
"strings"

"github.com/dipdup-io/starknet-metadata/internal/storage/postgres"
)

func createViews(ctx context.Context, strg postgres.Storage) ([]string, error) {
func createViews(strg postgres.Storage) ([]string, error) {
files, err := os.ReadDir("views")
if err != nil {
return nil, err
Expand Down

0 comments on commit d1e730e

Please sign in to comment.