Skip to content

Commit

Permalink
introduce command and event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
khanzadimahdi committed Apr 7, 2024
1 parent 5dc0304 commit a2c8787
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 6 deletions.
11 changes: 11 additions & 0 deletions backend/domain/commandbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package domain

// CommandHandler provides command handler logic
type CommandHandler interface {
Handle(command any)
}

type CommandBus interface {
Register(command any, handler CommandHandler)
Execute(command any)
}
8 changes: 8 additions & 0 deletions backend/domain/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package domain

import "errors"

var (
ErrNotExists = errors.New("not exists")
)

11 changes: 11 additions & 0 deletions backend/domain/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package domain

// EventHandler provides event handler logic
type EventHandler interface {
Handle(event any)
}

type EventBus interface {
Subscribe(event any, handler EventHandler)
Publish(event any)
}
6 changes: 0 additions & 6 deletions backend/domain/domain.go → backend/domain/mailer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package domain

import "errors"

var (
ErrNotExists = errors.New("not exists")
)

type Mailer interface {
SendMail(from string, to string, subject string, body []byte) error
}
46 changes: 46 additions & 0 deletions backend/infrastructure/commandbus/commandbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package commandbus

import (
"reflect"
"sync"

"github.com/khanzadimahdi/testproject/domain"
)

type HandlerFunc func(command any)

func (h HandlerFunc) Handle(command any) {
h(command)
}

type bus struct {
lock sync.Mutex
handlers map[reflect.Type]domain.CommandHandler
}

var _ domain.CommandBus = New()

func New() *bus {
return &bus{
handlers: make(map[reflect.Type]domain.CommandHandler),
}
}

func (b *bus) Register(command any, handler domain.CommandHandler) {
b.lock.Lock()
defer b.lock.Unlock()

b.handlers[reflect.TypeOf(command)] = handler
}

func (b *bus) Execute(command any) {
b.lock.Lock()
defer b.lock.Unlock()

handler, ok := b.handlers[reflect.TypeOf(command)]
if !ok {
return
}

handler.Handle(command)
}
44 changes: 44 additions & 0 deletions backend/infrastructure/commandbus/commandbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package commandbus

import (
"bytes"
"crypto/rand"
"testing"
)

func TestBus(t *testing.T) {
b := New()

var counter int

payload := make([]byte, 100)
if _, err := rand.Read(payload); err != nil {
t.Fatal("unexpected error", err)
}

command := &FakeCommand{Payload: payload}

handler := func(command any) {
counter++

cmd, ok := command.(*FakeCommand)
if !ok {
t.Error("invalid command")
}

if !bytes.Equal(payload, cmd.Payload) {
t.Error("command payload is not valid")
}
}

b.Register(&FakeCommand{}, HandlerFunc(handler))
b.Execute(command)

if counter != 1 {
t.Errorf("command handler should be invoked once but invoked %d", counter)
}
}

type FakeCommand struct {
Payload []byte
}
47 changes: 47 additions & 0 deletions backend/infrastructure/eventbus/eventbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package eventbus

import (
"reflect"
"sync"

"github.com/khanzadimahdi/testproject/domain"
)

type HandlerFunc func(event any)

func (h HandlerFunc) Handle(event any) {
h(event)
}

type bus struct {
lock sync.Mutex
handlers map[reflect.Type][]domain.EventHandler
}

var _ domain.EventBus = New()

func New() *bus {
return &bus{
handlers: make(map[reflect.Type][]domain.EventHandler),
}
}

func (b *bus) Subscribe(event any, handler domain.EventHandler) {
t := reflect.TypeOf(event)

b.lock.Lock()
defer b.lock.Unlock()

b.handlers[t] = append(b.handlers[t], handler)
}

func (b *bus) Publish(event any) {
t := reflect.TypeOf(event)

b.lock.Lock()
defer b.lock.Unlock()

for i := range b.handlers[t] {
b.handlers[t][i].Handle(event)
}
}
62 changes: 62 additions & 0 deletions backend/infrastructure/eventbus/eventbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package eventbus

import (
"bytes"
"crypto/rand"
"testing"
)

func TestBus(t *testing.T) {
b := New()

payload := make([]byte, 100)
if _, err := rand.Read(payload); err != nil {
t.Fatal("unexpected error", err)
}

command := &FakeEvent{Payload: payload}

var counter1 int
handler1 := func(command any) {
counter1++

cmd, ok := command.(*FakeEvent)
if !ok {
t.Error("invalid command")
}

if !bytes.Equal(payload, cmd.Payload) {
t.Error("command payload is not valid")
}
}

var counter2 int
handler2 := func(command any) {
counter2++

cmd, ok := command.(*FakeEvent)
if !ok {
t.Error("invalid command")
}

if !bytes.Equal(payload, cmd.Payload) {
t.Error("command payload is not valid")
}
}

b.Subscribe(&FakeEvent{}, HandlerFunc(handler1))
b.Subscribe(&FakeEvent{}, HandlerFunc(handler2))
b.Publish(command)

if counter1 != 1 {
t.Errorf("event handler-1 should be invoked once but invoked %d", counter1)
}

if counter2 != 1 {
t.Errorf("event handler-2 should be invoked once but invoked %d", counter2)
}
}

type FakeEvent struct {
Payload []byte
}

0 comments on commit a2c8787

Please sign in to comment.