Skip to content

Commit

Permalink
Micro update (actatum#14)
Browse files Browse the repository at this point in the history
* use nats micro package

* upgrade to go1.21 and use slog for the logger middleware

* move protoc-gen-stormrpc to cmd folder, fix passing of headers to handler function and context, fix endpoint name to pass micro endpoint name validation

* move protoc-gen-stormrpc to cmd folder, fix passing of headers to handler function and context, fix endpoint name to pass micro endpoint name validation

* cleaning up some of the codegen internals, adding some comments

* bump github actions test matrix to go1.21

* bump nats server version, bump protobuf go library version

* trying to track down this data race and determine if its my code or nats.go

* using my fork of nats.go with fix for race condition until it can get merged into upstream

* fix linting errors

* update nats.go dependency to my fork's main branch

* it works
  • Loading branch information
actatum authored Dec 7, 2023
1 parent e3f6b48 commit e2506f7
Show file tree
Hide file tree
Showing 20 changed files with 642 additions and 552 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.19.x, 1.20.x]
go-version: [1.21.x]
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.19.x, 1.20.x]
go-version: [1.21.x]
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
128 changes: 75 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,51 @@
[![Godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/actatum/stormrpc)
[![Release](https://img.shields.io/github/release/actatum/stormrpc.svg)](https://github.com/actatum/stormrpc/releases/latest)


StormRPC is an abstraction or wrapper on [`NATS`] Request/Reply messaging capabilities.

It provides some convenient features including:

* **Middleware**
- **Middleware**

Middleware are decorators around `HandlerFunc`s. Some middleware are available within the package including `RequestID`, `Tracing` (via OpenTelemetry) `Logger` and `Recoverer`.

- **Body encoding and decoding**

Marshalling and unmarshalling request bodies to structs. JSON, Protobuf, and Msgpack are supported out of the box.

Middleware are decorators around `HandlerFunc`s. Some middleware are available within the package including `RequestID`, `Tracing` (via OpenTelemetry) `Logger` and `Recoverer`.
* **Body encoding and decoding**
- **Deadline propagation**

Marshalling and unmarshalling request bodies to structs. JSON, Protobuf, and Msgpack are supported out of the box.
* **Deadline propagation**
Request deadlines are propagated from client to server so both ends will stop processing once the deadline has passed.

Request deadlines are propagated from client to server so both ends will stop processing once the deadline has passed.
* **Error propagation**
- **Error propagation**

Responses have an `Error` attribute and these are propagated across the wire without needing to tweak your request/response schemas.
Responses have an `Error` attribute and these are propagated across the wire without needing to tweak your request/response schemas.

## Installation

### Runtime Library

The runtime library package ```github.com/actatum/stormrpc``` contains common types like ```stormrpc.Error```, ```stormrpc.Client``` and ```stormrpc.Server```. If you aren't generating servers and clients from protobuf definitions you only need to import the stormrpc package.
The runtime library package `github.com/actatum/stormrpc` contains common types like `stormrpc.Error`, `stormrpc.Client` and `stormrpc.Server`. If you aren't generating servers and clients from protobuf definitions you only need to import the stormrpc package.

```bash
go get github.com/actatum/stormrpc
```

### Code Generator

You need to install ```go``` and the ```protoc``` compiler on your system. Then, install the protoc plugins ```protoc-gen-stormrpc``` and ```protoc-gen-go``` to generate Go code.
You need to install `go` and the `protoc` compiler on your system. Then, install the protoc plugins `protoc-gen-stormrpc` and `protoc-gen-go` to generate Go code.

```bash
go install github.com/actatum/stormrpc/protoc-gen-stormrpc@latest
go install github.com/actatum/stormrpc/cmd/protoc-gen-stormrpc@latest
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
```

To generate client and server stubs use the following command

```bash
protoc --go_out=$output_dir --stormrpc_out=$output_dir $input_proto_file
```


Code generation examples can be found [here](https://github.com/actatum/stormrpc/tree/main/examples/protogen)

## Basic Usage
Expand All @@ -59,53 +61,73 @@ Code generation examples can be found [here](https://github.com/actatum/stormrpc
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/actatum/stormrpc"
"github.com/nats-io/nats.go"
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/actatum/stormrpc"
"github.com/nats-io/nats-server/v2/server"
)

func echo(ctx context.Context, req stormrpc.Request) stormrpc.Response {
var b any
if err := req.Decode(&b); err != nil {
return stormrpc.NewErrorResponse(req.Reply, err)
}
var b any
if err := req.Decode(&b); err != nil {
return stormrpc.NewErrorResponse(req.Reply, err)
}

resp, err := stormrpc.NewResponse(req.Reply, b)
if err != nil {
return stormrpc.NewErrorResponse(req.Reply, err)
}
resp, err := stormrpc.NewResponse(req.Reply, b)
if err != nil {
return stormrpc.NewErrorResponse(req.Reply, err)
}

return resp
return resp
}

func main() {
srv, err := stormrpc.NewServer("echo", nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
srv.Handle("echo", echo)

go func() {
_ = srv.Run()
}()
log.Printf("👋 Listening on %v", srv.Subjects())

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
log.Printf("💀 Shutting down")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err = srv.Shutdown(ctx); err != nil {
log.Fatal(err)
}
ns, err := server.NewServer(&server.Options{
Port: 40897,
})
if err != nil {
log.Fatal(err)
}
ns.Start()
defer func() {
ns.Shutdown()
ns.WaitForShutdown()
}()

if !ns.ReadyForConnections(1 * time.Second) {
log.Fatal("timeout waiting for nats server")
}

srv, err := stormrpc.NewServer(&stormrpc.ServerConfig{
NatsURL: ns.ClientURL(),
Name: "echo",
})
if err != nil {
log.Fatal(err)
}

srv.Handle("echo", echo)

go func() {
_ = srv.Run()
}()
log.Printf("👋 Listening on %v", srv.Subjects())

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
log.Printf("💀 Shutting down")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err = srv.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}
```

Expand Down Expand Up @@ -156,4 +178,4 @@ func main() {
```

[`nats.go`]: https://github.com/nats-io/nats.go
[`NATS`]: https://docs.nats.io/
[`NATS`]: https://docs.nats.io/
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Client struct {
}

// NewClient returns a new instance of a Client.
func NewClient(natsURL string, opts ...ClientOption) (*Client, error) {
func NewClient(natsURL string, _ ...ClientOption) (*Client, error) {
nc, err := nats.Connect(natsURL)
if err != nil {
return nil, err
Expand Down
48 changes: 25 additions & 23 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestNewClient(t *testing.T) {
}

func TestClient_Do(t *testing.T) {
t.Parallel()
// t.Parallel()

ns, err := server.NewServer(&server.Options{
Port: 41397,
Expand All @@ -69,12 +69,15 @@ func TestClient_Do(t *testing.T) {
return
}

t.Run("deadline exceeded", func(t *testing.T) {
t.Parallel()
clientURL := ns.ClientURL()

t.Run("deadline exceeded", func(t *testing.T) {
timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
srv, err := NewServer(&ServerConfig{
NatsURL: clientURL,
Name: "test",
})
if err != nil {
t.Fatal(err)
}
Expand All @@ -89,7 +92,7 @@ func TestClient_Do(t *testing.T) {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
client, err := NewClient(clientURL)
if err != nil {
t.Fatal(err)
}
Expand All @@ -112,11 +115,12 @@ func TestClient_Do(t *testing.T) {
})

t.Run("rpc error", func(t *testing.T) {
t.Parallel()

timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
srv, err := NewServer(&ServerConfig{
NatsURL: clientURL,
Name: "test",
})
if err != nil {
t.Fatal(err)
}
Expand All @@ -130,7 +134,7 @@ func TestClient_Do(t *testing.T) {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
client, err := NewClient(clientURL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -158,11 +162,9 @@ func TestClient_Do(t *testing.T) {
})

t.Run("no servers", func(t *testing.T) {
t.Parallel()

subject := strconv.Itoa(rand.Int())

client, err := NewClient(ns.ClientURL())
client, err := NewClient(clientURL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -194,9 +196,7 @@ func TestClient_Do(t *testing.T) {
})

t.Run("request option errors", func(t *testing.T) {
t.Parallel()

client, err := NewClient(ns.ClientURL())
client, err := NewClient(clientURL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -225,11 +225,12 @@ func TestClient_Do(t *testing.T) {
})

t.Run("successful request", func(t *testing.T) {
t.Parallel()

timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
srv, err := NewServer(&ServerConfig{
NatsURL: clientURL,
Name: "test",
})
if err != nil {
t.Fatal(err)
}
Expand All @@ -248,7 +249,7 @@ func TestClient_Do(t *testing.T) {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
client, err := NewClient(clientURL)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -276,12 +277,13 @@ func TestClient_Do(t *testing.T) {
})

t.Run("successful request w/headers option", func(t *testing.T) {
t.Parallel()

apiKey := uuid.NewString()
timeout := 50 * time.Millisecond
subject := strconv.Itoa(rand.Int())
srv, err := NewServer("test", ns.ClientURL())
srv, err := NewServer(&ServerConfig{
NatsURL: clientURL,
Name: "test",
})
if err != nil {
t.Fatal(err)
}
Expand All @@ -303,7 +305,7 @@ func TestClient_Do(t *testing.T) {
_ = srv.Shutdown(context.Background())
})

client, err := NewClient(ns.ClientURL())
client, err := NewClient(clientURL)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/protogen/genproto.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
go install ../../protoc-gen-stormrpc
protoc --go_out=./pb --stormrpc_out=./pb pb/echo.proto
go install ../../cmd/protoc-gen-stormrpc
protoc --go_out=./pb --stormrpc_out=./pb pb/echo.proto
2 changes: 1 addition & 1 deletion examples/protogen/pb/echo.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion examples/protogen/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func main() {
log.Fatal("timeout waiting for nats server")
}

srv, err := stormrpc.NewServer("echo", ns.ClientURL(), stormrpc.WithErrorHandler(logError))
srv, err := stormrpc.NewServer(&stormrpc.ServerConfig{
NatsURL: ns.ClientURL(),
Name: "echo",
}, stormrpc.WithErrorHandler(logError))
if err != nil {
log.Fatal(err)
}
Expand Down
7 changes: 5 additions & 2 deletions examples/simple/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
go ns.Start()
ns.Start()
defer func() {
ns.Shutdown()
ns.WaitForShutdown()
Expand All @@ -43,7 +43,10 @@ func main() {
log.Fatal("timeout waiting for nats server")
}

srv, err := stormrpc.NewServer("echo", ns.ClientURL())
srv, err := stormrpc.NewServer(&stormrpc.ServerConfig{
NatsURL: ns.ClientURL(),
Name: "echo",
})
if err != nil {
log.Fatal(err)
}
Expand Down
Loading

0 comments on commit e2506f7

Please sign in to comment.