From a2c878729734d9ae1dbc7973f44fe91fdf069af2 Mon Sep 17 00:00:00 2001 From: Mahdi Khanzadi Date: Sun, 7 Apr 2024 18:38:38 +0200 Subject: [PATCH] introduce command and event bus --- backend/domain/commandbus.go | 11 ++++ backend/domain/errors.go | 8 +++ backend/domain/eventbus.go | 11 ++++ backend/domain/{domain.go => mailer.go} | 6 -- .../infrastructure/commandbus/commandbus.go | 46 ++++++++++++++ .../commandbus/commandbus_test.go | 44 +++++++++++++ backend/infrastructure/eventbus/eventbus.go | 47 ++++++++++++++ .../infrastructure/eventbus/eventbus_test.go | 62 +++++++++++++++++++ 8 files changed, 229 insertions(+), 6 deletions(-) create mode 100644 backend/domain/commandbus.go create mode 100644 backend/domain/errors.go create mode 100644 backend/domain/eventbus.go rename backend/domain/{domain.go => mailer.go} (62%) create mode 100644 backend/infrastructure/commandbus/commandbus.go create mode 100644 backend/infrastructure/commandbus/commandbus_test.go create mode 100644 backend/infrastructure/eventbus/eventbus.go create mode 100644 backend/infrastructure/eventbus/eventbus_test.go diff --git a/backend/domain/commandbus.go b/backend/domain/commandbus.go new file mode 100644 index 00000000..5523a7de --- /dev/null +++ b/backend/domain/commandbus.go @@ -0,0 +1,11 @@ +package domain + +// CommandHandler provides command handler logic +type CommandHandler interface { + Handle(command any) +} + +type CommandBus interface { + Register(command any, handler CommandHandler) + Execute(command any) +} diff --git a/backend/domain/errors.go b/backend/domain/errors.go new file mode 100644 index 00000000..4176d05c --- /dev/null +++ b/backend/domain/errors.go @@ -0,0 +1,8 @@ +package domain + +import "errors" + +var ( + ErrNotExists = errors.New("not exists") +) + diff --git a/backend/domain/eventbus.go b/backend/domain/eventbus.go new file mode 100644 index 00000000..13a32bc2 --- /dev/null +++ b/backend/domain/eventbus.go @@ -0,0 +1,11 @@ +package domain + +// EventHandler provides event handler logic +type EventHandler interface { + Handle(event any) +} + +type EventBus interface { + Subscribe(event any, handler EventHandler) + Publish(event any) +} diff --git a/backend/domain/domain.go b/backend/domain/mailer.go similarity index 62% rename from backend/domain/domain.go rename to backend/domain/mailer.go index 2f8bf412..63920af6 100644 --- a/backend/domain/domain.go +++ b/backend/domain/mailer.go @@ -1,11 +1,5 @@ package domain -import "errors" - -var ( - ErrNotExists = errors.New("not exists") -) - type Mailer interface { SendMail(from string, to string, subject string, body []byte) error } diff --git a/backend/infrastructure/commandbus/commandbus.go b/backend/infrastructure/commandbus/commandbus.go new file mode 100644 index 00000000..93e3a6e1 --- /dev/null +++ b/backend/infrastructure/commandbus/commandbus.go @@ -0,0 +1,46 @@ +package commandbus + +import ( + "reflect" + "sync" + + "github.com/khanzadimahdi/testproject/domain" +) + +type HandlerFunc func(command any) + +func (h HandlerFunc) Handle(command any) { + h(command) +} + +type bus struct { + lock sync.Mutex + handlers map[reflect.Type]domain.CommandHandler +} + +var _ domain.CommandBus = New() + +func New() *bus { + return &bus{ + handlers: make(map[reflect.Type]domain.CommandHandler), + } +} + +func (b *bus) Register(command any, handler domain.CommandHandler) { + b.lock.Lock() + defer b.lock.Unlock() + + b.handlers[reflect.TypeOf(command)] = handler +} + +func (b *bus) Execute(command any) { + b.lock.Lock() + defer b.lock.Unlock() + + handler, ok := b.handlers[reflect.TypeOf(command)] + if !ok { + return + } + + handler.Handle(command) +} diff --git a/backend/infrastructure/commandbus/commandbus_test.go b/backend/infrastructure/commandbus/commandbus_test.go new file mode 100644 index 00000000..ae8b9f57 --- /dev/null +++ b/backend/infrastructure/commandbus/commandbus_test.go @@ -0,0 +1,44 @@ +package commandbus + +import ( + "bytes" + "crypto/rand" + "testing" +) + +func TestBus(t *testing.T) { + b := New() + + var counter int + + payload := make([]byte, 100) + if _, err := rand.Read(payload); err != nil { + t.Fatal("unexpected error", err) + } + + command := &FakeCommand{Payload: payload} + + handler := func(command any) { + counter++ + + cmd, ok := command.(*FakeCommand) + if !ok { + t.Error("invalid command") + } + + if !bytes.Equal(payload, cmd.Payload) { + t.Error("command payload is not valid") + } + } + + b.Register(&FakeCommand{}, HandlerFunc(handler)) + b.Execute(command) + + if counter != 1 { + t.Errorf("command handler should be invoked once but invoked %d", counter) + } +} + +type FakeCommand struct { + Payload []byte +} diff --git a/backend/infrastructure/eventbus/eventbus.go b/backend/infrastructure/eventbus/eventbus.go new file mode 100644 index 00000000..379440c4 --- /dev/null +++ b/backend/infrastructure/eventbus/eventbus.go @@ -0,0 +1,47 @@ +package eventbus + +import ( + "reflect" + "sync" + + "github.com/khanzadimahdi/testproject/domain" +) + +type HandlerFunc func(event any) + +func (h HandlerFunc) Handle(event any) { + h(event) +} + +type bus struct { + lock sync.Mutex + handlers map[reflect.Type][]domain.EventHandler +} + +var _ domain.EventBus = New() + +func New() *bus { + return &bus{ + handlers: make(map[reflect.Type][]domain.EventHandler), + } +} + +func (b *bus) Subscribe(event any, handler domain.EventHandler) { + t := reflect.TypeOf(event) + + b.lock.Lock() + defer b.lock.Unlock() + + b.handlers[t] = append(b.handlers[t], handler) +} + +func (b *bus) Publish(event any) { + t := reflect.TypeOf(event) + + b.lock.Lock() + defer b.lock.Unlock() + + for i := range b.handlers[t] { + b.handlers[t][i].Handle(event) + } +} diff --git a/backend/infrastructure/eventbus/eventbus_test.go b/backend/infrastructure/eventbus/eventbus_test.go new file mode 100644 index 00000000..2717186f --- /dev/null +++ b/backend/infrastructure/eventbus/eventbus_test.go @@ -0,0 +1,62 @@ +package eventbus + +import ( + "bytes" + "crypto/rand" + "testing" +) + +func TestBus(t *testing.T) { + b := New() + + payload := make([]byte, 100) + if _, err := rand.Read(payload); err != nil { + t.Fatal("unexpected error", err) + } + + command := &FakeEvent{Payload: payload} + + var counter1 int + handler1 := func(command any) { + counter1++ + + cmd, ok := command.(*FakeEvent) + if !ok { + t.Error("invalid command") + } + + if !bytes.Equal(payload, cmd.Payload) { + t.Error("command payload is not valid") + } + } + + var counter2 int + handler2 := func(command any) { + counter2++ + + cmd, ok := command.(*FakeEvent) + if !ok { + t.Error("invalid command") + } + + if !bytes.Equal(payload, cmd.Payload) { + t.Error("command payload is not valid") + } + } + + b.Subscribe(&FakeEvent{}, HandlerFunc(handler1)) + b.Subscribe(&FakeEvent{}, HandlerFunc(handler2)) + b.Publish(command) + + if counter1 != 1 { + t.Errorf("event handler-1 should be invoked once but invoked %d", counter1) + } + + if counter2 != 1 { + t.Errorf("event handler-2 should be invoked once but invoked %d", counter2) + } +} + +type FakeEvent struct { + Payload []byte +}