Skip to content

Commit

Permalink
Address handler
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Jan 31, 2025
1 parent 8176896 commit 021461d
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 147 deletions.
9 changes: 1 addition & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@ module github.com/celenium-io/celestial-module

go 1.23.2

replace (
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.1-sdk-v0.46.16
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35
)
replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

require (
github.com/cosmos/cosmos-sdk v0.46.16
github.com/dipdup-net/go-lib v0.4.7
github.com/dipdup-net/indexer-sdk v0.0.6
github.com/go-testfixtures/testfixtures/v3 v3.14.0
Expand Down Expand Up @@ -47,7 +41,6 @@ require (
github.com/containerd/containerd v1.7.18 // indirect
github.com/containerd/errdefs v0.1.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dipdup-io/workerpool v0.0.4 // indirect
Expand Down
186 changes: 182 additions & 4 deletions go.sum

Large diffs are not rendered by default.

8 changes: 0 additions & 8 deletions pkg/module/address.go

This file was deleted.

84 changes: 0 additions & 84 deletions pkg/module/mock/address.go

This file was deleted.

54 changes: 27 additions & 27 deletions pkg/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,36 @@ import (
v1 "github.com/celenium-io/celestial-module/pkg/api/v1"
"github.com/celenium-io/celestial-module/pkg/storage"
"github.com/celenium-io/celestial-module/pkg/storage/postgres"
"github.com/cosmos/cosmos-sdk/types/bech32"
"github.com/dipdup-net/go-lib/config"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
sdk "github.com/dipdup-net/indexer-sdk/pkg/storage"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

type AddressHandler func(ctx context.Context, address string) (uint64, error)

type Module struct {
modules.BaseModule

celestialsApi celestials.API
address IdByHash
states storage.ICelestialState
celestials storage.ICelestial
tx sdk.Transactable
state storage.CelestialState
celestialsApi celestials.API
addressHandler AddressHandler
states storage.ICelestialState
celestials storage.ICelestial
tx sdk.Transactable
state storage.CelestialState

celestialsDatasource config.DataSource
indexerName string
network string
prefix string
indexPeriod time.Duration
databaseTimeout time.Duration
limit int64
}

func New(
celestialsDatasource config.DataSource,
address IdByHash,
addressHandler AddressHandler,
celestials storage.ICelestial,
state storage.ICelestialState,
tx sdk.Transactable,
Expand All @@ -48,7 +49,6 @@ func New(
) *Module {
module := Module{
BaseModule: modules.New("celestials"),
address: address,
celestials: celestials,
states: state,
tx: tx,
Expand All @@ -59,6 +59,7 @@ func New(
databaseTimeout: time.Minute,
limit: 100,
celestialsDatasource: celestialsDatasource,
addressHandler: addressHandler,
}

for i := range opts {
Expand All @@ -76,6 +77,9 @@ func (m *Module) Close() error {
}

func (m *Module) Start(ctx context.Context) {
if m.addressHandler == nil {
panic("nil address handler")
}
if err := m.getState(ctx); err != nil {
m.Log.Err(err).Msg("state receiving")
return
Expand Down Expand Up @@ -146,6 +150,10 @@ func (m *Module) sync(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "get changes")
}
log.Info().
Int("changes_count", len(changes.Changes)).
Int64("head", changes.Head).
Msg("received changes")

cids := make(map[string]storage.Celestial)
addressIds := make(map[uint64]struct{})
Expand All @@ -155,22 +163,9 @@ func (m *Module) sync(ctx context.Context) error {
continue
}
m.state.ChangeId = changes.Changes[i].ChangeID

prefix, hash, err := bech32.DecodeAndConvert(changes.Changes[i].Address)
if err != nil {
return errors.Wrapf(err, "decoding address %s", changes.Changes[i].Address)
}
if m.prefix != "" && prefix != m.prefix {
return errors.Errorf("invalid address prefix %s", changes.Changes[i].Address)
}

addressId, err := m.address.IdByHash(ctx, hash)
addressId, err := m.addressHandler(ctx, changes.Changes[i].Address)
if err != nil {
return errors.Wrap(err, "address by hash")
}

if len(addressId) == 0 {
return errors.Errorf("can't find address %s", changes.Changes[i].Address)
return errors.Wrap(err, "address handler")
}

status, err := storage.ParseStatus(changes.Changes[i].Status)
Expand All @@ -179,13 +174,13 @@ func (m *Module) sync(ctx context.Context) error {
}

if status == storage.StatusPRIMARY {
addressIds[addressId[0]] = struct{}{}
addressIds[addressId] = struct{}{}
}

cids[changes.Changes[i].CelestialID] = storage.Celestial{
Id: changes.Changes[i].CelestialID,
ImageUrl: changes.Changes[i].ImageURL,
AddressId: addressId[0],
AddressId: addressId,
ChangeId: changes.Changes[i].ChangeID,
Status: status,
}
Expand All @@ -194,6 +189,11 @@ func (m *Module) sync(ctx context.Context) error {
if err := m.save(ctx, cids, addressIds); err != nil {
return errors.Wrap(err, "save")
}
log.Debug().
Int("changes_count", len(cids)).
Int64("head", m.state.ChangeId).
Msg("saved changes")

end = len(changes.Changes) < int(m.limit)
}

Expand Down
13 changes: 3 additions & 10 deletions pkg/module/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

celestials "github.com/celenium-io/celestial-module/pkg/api"
celestialsMock "github.com/celenium-io/celestial-module/pkg/api/mock"
"github.com/celenium-io/celestial-module/pkg/module/mock"
"github.com/celenium-io/celestial-module/pkg/storage"
pg "github.com/celenium-io/celestial-module/pkg/storage/postgres"
"github.com/dipdup-net/go-lib/config"
Expand All @@ -32,7 +31,6 @@ type ModuleTestSuite struct {
celestialState *pg.CelestialState
ctrl *gomock.Controller
api *celestialsMock.MockAPI
address *mock.MockIdByHash
}

// SetupSuite -
Expand Down Expand Up @@ -79,7 +77,6 @@ func (s *ModuleTestSuite) SetupSuite() {

s.ctrl = gomock.NewController(s.T())
s.api = celestialsMock.NewMockAPI(s.ctrl)
s.address = mock.NewMockIdByHash(s.ctrl)
}

// TearDownSuite -
Expand Down Expand Up @@ -126,11 +123,6 @@ func (s *ModuleTestSuite) TestSync() {
},
}, nil)

s.address.EXPECT().
IdByHash(gomock.Any(), gomock.Any()).
Return([]uint64{1}, nil).
Times(1)

cfgDs := config.DataSource{
Kind: "celestials",
URL: "base_url",
Expand All @@ -143,13 +135,14 @@ func (s *ModuleTestSuite) TestSync() {

m := New(
cfgDs,
s.address,
func(ctx context.Context, address string) (uint64, error) {
return 1, nil
},
s.celestials,
s.celestialState,
s.storage.Transactable,
testIndexerName,
network,
WithAddressPrefix("celestia"),
WithLimit(10),
)
m.celestialsApi = s.api
Expand Down
6 changes: 0 additions & 6 deletions pkg/module/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ import "time"

type ModuleOption func(*Module)

func WithAddressPrefix(prefix string) ModuleOption {
return func(m *Module) {
m.prefix = prefix
}
}

func WithIndexPeriod(period time.Duration) ModuleOption {
return func(m *Module) {
m.indexPeriod = period
Expand Down

0 comments on commit 021461d

Please sign in to comment.