Skip to content

Commit

Permalink
multi stage boot manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Paulo Pereira committed Sep 29, 2020
1 parent 67f00e9 commit c386666
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 24 deletions.
2 changes: 1 addition & 1 deletion account/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/uuid v1.1.1
github.com/labstack/echo/v4 v4.1.16
github.com/quintans/es-cqrs-bank-transfer/account/shared v0.0.0-00010101000000-000000000000
github.com/quintans/eventstore v0.6.0
github.com/quintans/eventstore v0.7.0
github.com/sirupsen/logrus v1.4.2
)

Expand Down
4 changes: 2 additions & 2 deletions account/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/quintans/eventstore v0.6.0 h1:wkFWkIU2iGb/sGMFQRGl5vcDvIID9o00oPJ/JZZiegg=
github.com/quintans/eventstore v0.6.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/eventstore v0.7.0 h1:Otfw3LnXw0af2xEaEETBQQZvi0sHFXpMfc4VRIrsnw0=
github.com/quintans/eventstore v0.7.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
2 changes: 1 addition & 1 deletion balance/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/kr/pretty v0.2.0 // indirect
github.com/labstack/echo/v4 v4.1.16
github.com/quintans/es-cqrs-bank-transfer/account/shared v0.0.0
github.com/quintans/eventstore v0.6.0
github.com/quintans/eventstore v0.7.0
github.com/quintans/toolkit v0.0.3
github.com/sirupsen/logrus v1.6.0
golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666 // indirect
Expand Down
4 changes: 2 additions & 2 deletions balance/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/quintans/eventstore v0.6.0 h1:wkFWkIU2iGb/sGMFQRGl5vcDvIID9o00oPJ/JZZiegg=
github.com/quintans/eventstore v0.6.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/eventstore v0.7.0 h1:Otfw3LnXw0af2xEaEETBQQZvi0sHFXpMfc4VRIrsnw0=
github.com/quintans/eventstore v0.7.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/goSQL v0.0.0-20171112122952-e93145e9919d/go.mod h1:MkrXLc68zRc6X1w5FRMsbOxoAJQFQjk2Smpuhv+Po9E=
github.com/quintans/goSQL v1.3.0/go.mod h1:P08UBv1bDcutt2DwddZd3M3eXI+z9+VoNLH1neKn/8E=
github.com/quintans/toolkit v0.0.0-20191116152115-d7982be7971f/go.mod h1:PpTakdDnp2jAKvAcnpWCEUg6RsDAdB3mK3jxUvXn1WM=
Expand Down
14 changes: 8 additions & 6 deletions balance/internal/controller/projection_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ func (p ProjectionBalance) GetName() string {
return domain.ProjectionBalance
}

func (p ProjectionBalance) GetAggregateTypes() []string {
return []string{event.AggregateType_Account}
}

func (p ProjectionBalance) GetResumeEventID(ctx context.Context) (string, error) {
return p.BalanceUsecase.GetLastEventID(ctx)
func (p ProjectionBalance) GetResumeEventIDs(ctx context.Context) (map[string]string, error) {
lastEventID, err := p.BalanceUsecase.GetLastEventID(ctx)
if err != nil {
return nil, err
}
return map[string]string{
event.AggregateType_Account: lastEventID,
}, nil
}

func (p ProjectionBalance) Handler(ctx context.Context, e eventstore.Event) error {
Expand Down
6 changes: 3 additions & 3 deletions balance/internal/domain/usecase/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (

type BalanceUsecase struct {
BalanceRepository domain.BalanceRepository
Subscriber projection.Subscriber
Notifier projection.Notifier
}

func (b BalanceUsecase) ListAll(ctx context.Context) ([]entity.Balance, error) {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (b BalanceUsecase) RebuildBalance(ctx context.Context) error {
"method": "BalanceUsecase.RebuildBalance",
})
logger.Info("Signalling to STOP the balance projection listener")
err := b.Subscriber.FreezeProjection(ctx, domain.ProjectionBalance)
err := b.Notifier.FreezeProjection(ctx, domain.ProjectionBalance)
if err != nil {
log.WithError(err).Errorf("Error while freezing projection %s", domain.ProjectionBalance)
return err
Expand All @@ -121,7 +121,7 @@ func (b BalanceUsecase) RebuildBalance(ctx context.Context) error {
}

logger.Info("Signalling to START the balance projection listener")
return b.Subscriber.UnfreezeProjection(ctx, domain.ProjectionBalance)
return b.Notifier.UnfreezeProjection(ctx, domain.ProjectionBalance)
}

func (b BalanceUsecase) GetLastEventID(ctx context.Context) (string, error) {
Expand Down
11 changes: 8 additions & 3 deletions balance/internal/infrastructure/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/go-elasticsearch/v7"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/quintans/es-cqrs-bank-transfer/account/shared/event"
controller "github.com/quintans/es-cqrs-bank-transfer/balance/internal/controller"
"github.com/quintans/es-cqrs-bank-transfer/balance/internal/domain/usecase"
"github.com/quintans/es-cqrs-bank-transfer/balance/internal/gateway"
Expand Down Expand Up @@ -83,7 +84,7 @@ func Setup(cfg Config) {

balanceUC := usecase.BalanceUsecase{
BalanceRepository: repo,
Subscriber: natsSub,
Notifier: natsSub,
}

prjCtrl := controller.ProjectionBalance{
Expand All @@ -92,8 +93,12 @@ func Setup(cfg Config) {
manager := projection.NewBootableManager(
prjCtrl,
natsSub,
esRepo,
0, 0, // no partitioning range, meaning we will not use partitioned topic
projection.BootStages{
AggregateTypes: []string{event.AggregateType_Account},
Subscriber: natsSub,
Repository: esRepo,
// no partitioning range, meaning we will not use partitioned topic
},
)
// if we used partitioned topic, we would not need a locker, since each instance would be the only one responsible for a partion range
locker, err := locks.NewRedisLock(cfg.RedisAddresses, "balance", cfg.LockExpiry)
Expand Down
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,37 @@ services:
networks:
- my-network

# https://medium.com/@renato.groffe/mongodb-mongo-express-docker-compose-montando-rapidamente-um-ambiente-para-uso-824f25ca6957
mongodb:
image: mongo:latest
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: password
ports:
- 27017:27017
volumes:
- mongodb_data:/data/db
networks:
- my-network

mongo-express:
image: mongo-express
ports:
- 8081:8081
environment:
ME_CONFIG_BASICAUTH_USERNAME: root
ME_CONFIG_BASICAUTH_PASSWORD: password
ME_CONFIG_MONGODB_PORT: 27017
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: password
depends_on:
- mongodb
links:
- mongodb
networks:
- my-network

volumes:
mongodb_data:
data01:
driver: local
2 changes: 1 addition & 1 deletion poller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/caarlos0/env/v6 v6.3.0
github.com/lib/pq v1.7.0
github.com/nats-io/stan.go v0.7.0
github.com/quintans/eventstore v0.6.0
github.com/quintans/eventstore v0.7.0
github.com/quintans/toolkit v0.0.3
github.com/sirupsen/logrus v1.4.2
)
4 changes: 2 additions & 2 deletions poller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/quintans/eventstore v0.6.0 h1:wkFWkIU2iGb/sGMFQRGl5vcDvIID9o00oPJ/JZZiegg=
github.com/quintans/eventstore v0.6.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/eventstore v0.7.0 h1:Otfw3LnXw0af2xEaEETBQQZvi0sHFXpMfc4VRIrsnw0=
github.com/quintans/eventstore v0.7.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/goSQL v0.0.0-20171112122952-e93145e9919d/go.mod h1:MkrXLc68zRc6X1w5FRMsbOxoAJQFQjk2Smpuhv+Po9E=
github.com/quintans/goSQL v1.3.0/go.mod h1:P08UBv1bDcutt2DwddZd3M3eXI+z9+VoNLH1neKn/8E=
github.com/quintans/toolkit v0.0.0-20191116152115-d7982be7971f/go.mod h1:PpTakdDnp2jAKvAcnpWCEUg6RsDAdB3mK3jxUvXn1WM=
Expand Down
2 changes: 1 addition & 1 deletion pusher/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
github.com/caarlos0/env/v6 v6.3.0
github.com/nats-io/stan.go v0.7.0
github.com/quintans/eventstore v0.6.0
github.com/quintans/eventstore v0.7.0
github.com/quintans/toolkit v0.0.3
github.com/sirupsen/logrus v1.4.2
)
4 changes: 2 additions & 2 deletions pusher/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/quintans/eventstore v0.6.0 h1:wkFWkIU2iGb/sGMFQRGl5vcDvIID9o00oPJ/JZZiegg=
github.com/quintans/eventstore v0.6.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/eventstore v0.7.0 h1:Otfw3LnXw0af2xEaEETBQQZvi0sHFXpMfc4VRIrsnw0=
github.com/quintans/eventstore v0.7.0/go.mod h1:Q7ZOMnuMRODLUjxera/P2zDF0IGWZuo8qTxfxG/fDsI=
github.com/quintans/goSQL v0.0.0-20171112122952-e93145e9919d/go.mod h1:MkrXLc68zRc6X1w5FRMsbOxoAJQFQjk2Smpuhv+Po9E=
github.com/quintans/goSQL v1.3.0/go.mod h1:P08UBv1bDcutt2DwddZd3M3eXI+z9+VoNLH1neKn/8E=
github.com/quintans/toolkit v0.0.0-20191116152115-d7982be7971f/go.mod h1:PpTakdDnp2jAKvAcnpWCEUg6RsDAdB3mK3jxUvXn1WM=
Expand Down

0 comments on commit c386666

Please sign in to comment.