Skip to content

Commit

Permalink
remove unnecessary dependencies between plugins
Browse files Browse the repository at this point in the history
remove unnecessary dependencies between plugins
upgrade go-micro.dev/v4 to v4.2.1
  • Loading branch information
xpunch authored Oct 23, 2021
1 parent f96b48d commit af3cfa0
Show file tree
Hide file tree
Showing 206 changed files with 740 additions and 671 deletions.
239 changes: 239 additions & 0 deletions broker/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Package memory provides a memory broker
package broker

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/google/uuid"
"go-micro.dev/v4/logger"
maddr "go-micro.dev/v4/util/addr"
mnet "go-micro.dev/v4/util/net"
)

type memoryBroker struct {
opts Options

addr string
sync.RWMutex
connected bool
Subscribers map[string][]*memorySubscriber
}

type memoryEvent struct {
opts Options
topic string
err error
message interface{}
}

type memorySubscriber struct {
id string
topic string
exit chan bool
handler Handler
opts SubscribeOptions
}

func (m *memoryBroker) Options() Options {
return m.opts
}

func (m *memoryBroker) Address() string {
return m.addr
}

func (m *memoryBroker) Connect() error {
m.Lock()
defer m.Unlock()

if m.connected {
return nil
}

// use 127.0.0.1 to avoid scan of all network interfaces
addr, err := maddr.Extract("127.0.0.1")
if err != nil {
return err
}
i := rand.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)

m.addr = addr
m.connected = true

return nil
}

func (m *memoryBroker) Disconnect() error {
m.Lock()
defer m.Unlock()

if !m.connected {
return nil
}

m.connected = false

return nil
}

func (m *memoryBroker) Init(opts ...Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}

func (m *memoryBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return errors.New("not connected")
}

subs, ok := m.Subscribers[topic]
m.RUnlock()
if !ok {
return nil
}

var v interface{}
if m.opts.Codec != nil {
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
v = buf
} else {
v = msg
}

p := &memoryEvent{
topic: topic,
message: v,
opts: m.opts,
}

for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.err = err
if eh := m.opts.ErrorHandler; eh != nil {
eh(p)
continue
}
return err
}
}

return nil
}

func (m *memoryBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, errors.New("not connected")
}
m.RUnlock()

var options SubscribeOptions
for _, o := range opts {
o(&options)
}

sub := &memorySubscriber{
exit: make(chan bool, 1),
id: uuid.New().String(),
topic: topic,
handler: handler,
opts: options,
}

m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.Unlock()

go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memorySubscriber
for _, sb := range m.Subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.Subscribers[topic] = newSubscribers
m.Unlock()
}()

return sub, nil
}

func (m *memoryBroker) String() string {
return "memory"
}

func (m *memoryEvent) Topic() string {
return m.topic
}

func (m *memoryEvent) Message() *Message {
switch v := m.message.(type) {
case *Message:
return v
case []byte:
msg := &Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[memory]: failed to unmarshal: %v\n", err)
}
return nil
}
return msg
}

return nil
}

func (m *memoryEvent) Ack() error {
return nil
}

func (m *memoryEvent) Error() error {
return m.err
}

func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts
}

func (m *memorySubscriber) Topic() string {
return m.topic
}

func (m *memorySubscriber) Unsubscribe() error {
m.exit <- true
return nil
}

func NewMemoryBroker(opts ...Option) Broker {
options := Options{
Context: context.Background(),
}

rand.Seed(time.Now().UnixNano())
for _, o := range opts {
o(&options)
}

return &memoryBroker{
opts: options,
Subscribers: make(map[string][]*memorySubscriber),
}
}
50 changes: 50 additions & 0 deletions broker/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package broker_test

import (
"fmt"
"testing"

"go-micro.dev/v4/broker"
)

func TestMemoryBroker(t *testing.T) {
b := broker.NewMemoryBroker()

if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}

topic := "test"
count := 10

fn := func(p broker.Event) error {
return nil
}

sub, err := b.Subscribe(topic, fn)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}

for i := 0; i < count; i++ {
message := &broker.Message{
Header: map[string]string{
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
Body: []byte(`hello world`),
}

if err := b.Publish(topic, message); err != nil {
t.Fatalf("Unexpected error publishing %d", i)
}
}

if err := sub.Unsubscribe(); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}

if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
}
34 changes: 20 additions & 14 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@ module github.com/asim/go-micro/examples/v4

go 1.16

replace (
github.com/asim/go-micro/plugins/client/grpc/v4 => ../plugins/client/grpc
github.com/asim/go-micro/plugins/transport/grpc/v4 => ../plugins/transport/grpc
go-micro.dev/v4 => ../../go-micro
)

require (
github.com/asim/go-micro/plugins/client/http/v4 v4.0.0-20211013123517-8cad88edae00
github.com/asim/go-micro/plugins/config/encoder/toml/v4 v4.0.0-20211013123517-8cad88edae00
github.com/asim/go-micro/plugins/config/encoder/yaml/v4 v4.0.0-20211013123517-8cad88edae00
github.com/asim/go-micro/plugins/config/source/grpc/v4 v4.0.0-20211013123517-8cad88edae00
github.com/asim/go-micro/plugins/server/http/v4 v4.0.0-20211013123517-8cad88edae00
github.com/asim/go-micro/plugins/wrapper/select/roundrobin/v4 v4.0.0-20211013123517-8cad88edae00
github.com/asim/go-micro/plugins/wrapper/select/shard/v4 v4.0.0-20211013123517-8cad88edae00
github.com/gin-gonic/gin v1.7.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/asim/go-micro/plugins/client/http/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/asim/go-micro/plugins/config/encoder/toml/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/asim/go-micro/plugins/config/encoder/yaml/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/asim/go-micro/plugins/config/source/grpc/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/asim/go-micro/plugins/server/http/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/asim/go-micro/plugins/wrapper/select/roundrobin/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/asim/go-micro/plugins/wrapper/select/shard/v4 v4.0.0-20211022143028-f96b48dad9f9
github.com/gin-gonic/gin v1.7.4
github.com/golang/glog v1.0.0
github.com/golang/protobuf v1.5.2
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/pborman/uuid v1.2.1
github.com/urfave/cli/v2 v2.3.0
go-micro.dev/v4 v4.1.1-0.20211013123517-8cad88edae00
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.26.0
go-micro.dev/v4 v4.1.0
golang.org/x/net v0.0.0-20211020060615-d418f374d309
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c
google.golang.org/grpc v1.41.0
google.golang.org/protobuf v1.27.1
)
Loading

0 comments on commit af3cfa0

Please sign in to comment.