Skip to content

Commit

Permalink
Software Architecture in Go: Extensibility - Part 3
Browse files Browse the repository at this point in the history
Extract Kafka to start adding new Message Brokers, rename some types to
make them more intentional.
  • Loading branch information
MarioCarrion committed Jan 19, 2025
1 parent ca983d5 commit e553966
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 116 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,17 @@ Originally added as part of [Building Microservices In Go: Containerization with
* Engine: **27.4.0**, and
* Compose: **v2.31.0-desktop.2**

Use the `docker compose` normal instructions to `build` or `run`, for example:
The `docker compose` instructions are executed in the form of:

```
docker compose build
docker compose up
docker compose -f compose.yml -f compose.kafka.yml <command>
```

For example

```
docker compose -f compose.yml -f compose.kafka.yml build
docker compose -f compose.yml -f compose.kafka.yml up
```

Once you `up` all the containers you can access the Swagger UI at http://127.0.0.1:9234/static/swagger-ui/ .
Expand Down
2 changes: 2 additions & 0 deletions cmd/internal/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/MarioCarrion/todo-api/internal/envvar"
)

// KafkaConsumer is the producer implementation of Kafka.
type KafkaProducer struct {
Producer *kafka.Producer
Topic string
Expand Down Expand Up @@ -34,6 +35,7 @@ func NewKafkaProducer(conf *envvar.Configuration) (*KafkaProducer, error) {
}, nil
}

// KafkaConsumer is the consumer implementation of Kafka.
type KafkaConsumer struct {
Consumer *kafka.Consumer
}
Expand Down
40 changes: 40 additions & 0 deletions cmd/rest-server/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
cmdinternal "github.com/MarioCarrion/todo-api/cmd/internal"
"github.com/MarioCarrion/todo-api/internal"
"github.com/MarioCarrion/todo-api/internal/envvar"
"github.com/MarioCarrion/todo-api/internal/kafka"
"github.com/MarioCarrion/todo-api/internal/service"
)

// KafkaMessageBroker represents Kafka as a Message Broker.
type KafkaMessageBroker struct {
producer *cmdinternal.KafkaProducer
publisher service.TaskMessageBrokerPublisher
}

// NewMessageBroker initializes a new Kafka Broker.
func NewMessageBroker(conf *envvar.Configuration) (MessageBrokerPublisher, error) { //nolint: ireturn
producer, err := cmdinternal.NewKafkaProducer(conf)
if err != nil {
return nil, internal.WrapErrorf(err, internal.ErrorCodeUnknown, "internal.NewKafkaProducer")
}

return &KafkaMessageBroker{
producer: producer,
publisher: kafka.NewTask(producer.Producer, producer.Topic),
}, nil
}

// Publisher returns the Kafka broker.
func (m *KafkaMessageBroker) Publisher() service.TaskMessageBrokerPublisher { //nolint: ireturn
return m.publisher
}

// Close closes the broker.
func (m *KafkaMessageBroker) Close() error {
m.producer.Producer.Close()

return nil
}
25 changes: 14 additions & 11 deletions cmd/rest-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
esv7 "github.com/elastic/go-elasticsearch/v7"
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
rv8 "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riandyrn/otelchi"
"go.uber.org/zap"
Expand All @@ -28,7 +27,6 @@ import (
internaldomain "github.com/MarioCarrion/todo-api/internal"
"github.com/MarioCarrion/todo-api/internal/elasticsearch"
"github.com/MarioCarrion/todo-api/internal/envvar"
"github.com/MarioCarrion/todo-api/internal/kafka"
"github.com/MarioCarrion/todo-api/internal/memcached"
"github.com/MarioCarrion/todo-api/internal/postgresql"
"github.com/MarioCarrion/todo-api/internal/rest"
Expand All @@ -38,6 +36,12 @@ import (
//go:embed static
var content embed.FS

// MessageBrokerPublisher represents the type that indicates the different Message Brokers supported.
type MessageBrokerPublisher interface {
Publisher() service.TaskMessageBrokerPublisher
Close() error
}

func main() {
var env, address string

Expand Down Expand Up @@ -89,9 +93,9 @@ func run(env, address string) (<-chan error, error) {
return nil, internaldomain.WrapErrorf(err, internaldomain.ErrorCodeUnknown, "internal.NewMemcached")
}

kafka, err := internal.NewKafkaProducer(conf)
msgBroker, err := NewMessageBroker(conf)
if err != nil {
return nil, internaldomain.WrapErrorf(err, internaldomain.ErrorCodeUnknown, "internal.NewKafkaProducer")
return nil, internaldomain.WrapErrorf(err, internaldomain.ErrorCodeUnknown, "NewMessageBroker")
}

//-
Expand Down Expand Up @@ -122,7 +126,7 @@ func run(env, address string) (<-chan error, error) {
Middlewares: []func(next http.Handler) http.Handler{otelchi.Middleware("todo-api-server"), logging},
Logger: logger,
Memcached: memcached,
Kafka: kafka,
MessageBroker: msgBroker,
})
if err != nil {
return nil, internaldomain.WrapErrorf(err, internaldomain.ErrorCodeUnknown, "newServer")
Expand All @@ -146,6 +150,9 @@ func run(env, address string) (<-chan error, error) {
_ = logger.Sync()

pool.Close()
srv.Close()
_ = msgBroker.Close()

stop()
cancel()
close(errC)
Expand Down Expand Up @@ -177,13 +184,11 @@ type serverConfig struct {
Address string
DB *pgxpool.Pool
ElasticSearch *esv7.Client
Kafka *internal.KafkaProducer
RabbitMQ *internal.RabbitMQ
Redis *rv8.Client
Memcached *memcache.Client
Metrics http.Handler
Middlewares []func(next http.Handler) http.Handler
Logger *zap.Logger
MessageBroker MessageBrokerPublisher
}

func newServer(conf serverConfig) (*http.Server, error) {
Expand All @@ -202,9 +207,7 @@ func newServer(conf serverConfig) (*http.Server, error) {
search := elasticsearch.NewTask(conf.ElasticSearch)
msearch := memcached.NewSearchableTask(conf.Memcached, search)

msgBroker := kafka.NewTask(conf.Kafka.Producer, conf.Kafka.Topic)

svc := service.NewTask(conf.Logger, mrepo, msearch, msgBroker)
svc := service.NewTask(conf.Logger, mrepo, msearch, conf.MessageBroker.Publisher())

rest.RegisterOpenAPI(router)
rest.NewTaskHandler(svc).Register(router)
Expand Down
42 changes: 42 additions & 0 deletions compose.common.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
services:
rest-server-common:
build:
context: .
dockerfile: ./dockerfiles/rest-server.Dockerfile
ports:
- 9234:9234
command: rest-server -env /api/env.example
environment:
DATABASE_HOST: postgres
ELASTICSEARCH_URL: http://elasticsearch:9200
JAEGER_ENDPOINT: http://jaeger:14268/api/traces
MEMCACHED_HOST: memcached:11211
VAULT_ADDRESS: http://vault:8300
depends_on:
postgres:
condition: service_healthy
vault:
condition: service_started
prometheus:
condition: service_started
jaeger:
condition: service_started
elasticsearch:
condition: service_healthy
memcached:
condition: service_healthy
elasticsearch-indexer-common:
build:
context: .
command: elasticsearch-indexer -env /api/env.example
environment:
ELASTICSEARCH_URL: http://elasticsearch:9200
JAEGER_ENDPOINT: http://jaeger:14268/api/traces
VAULT_ADDRESS: http://vault:8300
depends_on:
elasticsearch:
condition: service_healthy
jaeger:
condition: service_started
vault:
condition: service_started
75 changes: 75 additions & 0 deletions compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
services:
rest-server-kafka:
extends:
file: compose.common.yml
service: rest-server-common
build:
args:
TAG: kafka
environment:
KAFKA_HOST: kafka:29092
KAFKA_TOPIC: tasks
depends_on:
kafka:
condition: service_healthy
elasticsearch-indexer-kafka:
extends:
file: compose.common.yml
service: elasticsearch-indexer-common
build:
dockerfile: ./dockerfiles/elasticsearch-indexer-kafka.Dockerfile
environment:
KAFKA_HOST: kafka:29092
KAFKA_TOPIC: tasks
depends_on:
kafka:
condition: service_healthy
zookeeper:
image: confluentinc/cp-zookeeper:7.6.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 2181 || exit 1"]
interval: 20s
timeout: 1s
retries: 5
kafka:
image: confluentinc/cp-kafka:7.6.2
depends_on:
zookeeper:
condition: service_healthy
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_BROKER_ID: 1
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
healthcheck:
test: kafka-topics --bootstrap-server kafka:29092 --list
interval: 30s
timeout: 10s
retries: 5
kafka_setup:
image: confluentinc/cp-kafka:7.6.2
restart: no
depends_on:
kafka:
condition: service_healthy
entrypoint: ["/bin/sh", "-c"]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic tasks --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
96 changes: 0 additions & 96 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -1,35 +1,5 @@
name: todo-api-microservice
services:
rest-server:
build:
context: .
dockerfile: ./dockerfiles/rest-server.Dockerfile
ports:
- 9234:9234
command: rest-server -env /api/env.example
environment:
DATABASE_HOST: postgres
ELASTICSEARCH_URL: http://elasticsearch:9200
JAEGER_ENDPOINT: http://jaeger:14268/api/traces
MEMCACHED_HOST: memcached:11211
VAULT_ADDRESS: http://vault:8300
KAFKA_HOST: kafka:29092
KAFKA_TOPIC: tasks
depends_on:
postgres:
condition: service_healthy
vault:
condition: service_started
prometheus:
condition: service_started
jaeger:
condition: service_started
elasticsearch:
condition: service_healthy
memcached:
condition: service_healthy
kafka:
condition: service_healthy
prometheus:
image: prom/prometheus:v2.40.7
ports:
Expand Down Expand Up @@ -93,72 +63,6 @@ services:
elasticsearch:
condition: service_healthy
entrypoint: ["curl", "-X", "PUT", "-H", "Content-Type: application/json", "http://elasticsearch:9200/tasks", "-d", "{\"mappings\":{\"properties\":{\"id\":{\"type\":\"keyword\"},\"description\":{\"type\":\"text\"}}}}"]
elasticsearch-indexer-kafka:
build:
dockerfile: ./dockerfiles/elasticsearch-indexer-kafka.Dockerfile
command: elasticsearch-indexer -env /api/env.example
environment:
ELASTICSEARCH_URL: http://elasticsearch:9200
JAEGER_ENDPOINT: http://jaeger:14268/api/traces
VAULT_ADDRESS: http://vault:8300
KAFKA_HOST: kafka:29092
KAFKA_TOPIC: tasks
depends_on:
elasticsearch:
condition: service_healthy
jaeger:
condition: service_started
vault:
condition: service_started
kafka:
condition: service_healthy
zookeeper:
image: confluentinc/cp-zookeeper:7.6.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 2181 || exit 1"]
interval: 20s
timeout: 1s
retries: 5
kafka:
image: confluentinc/cp-kafka:7.6.2
depends_on:
zookeeper:
condition: service_healthy
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_BROKER_ID: 1
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
healthcheck:
test: kafka-topics --bootstrap-server kafka:29092 --list
interval: 30s
timeout: 10s
retries: 5
kafka_setup:
image: confluentinc/cp-kafka:7.6.2
restart: no
depends_on:
kafka:
condition: service_healthy
entrypoint: ["/bin/sh", "-c"]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic tasks --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
memcached:
image: memcached:1.6.19-alpine3.17
ports:
Expand Down
Loading

0 comments on commit e553966

Please sign in to comment.