Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: micro/go-micro
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v5.3.0
Choose a base ref
...
head repository: micro/go-micro
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
  • 7 commits
  • 11 files changed
  • 5 contributors

Commits on Jul 8, 2024

  1. update readme

    asim committed Jul 8, 2024
    Copy the full SHA
    9a7cd8c View commit details

Commits on Jul 12, 2024

  1. Fix data race (#2721)

    asim authored Jul 12, 2024
    Copy the full SHA
    4c34451 View commit details

Commits on Jul 15, 2024

  1. Update README.md (#2722)

    asim authored Jul 15, 2024
    Copy the full SHA
    1c6c1ff View commit details

Commits on Jul 23, 2024

  1. Better connection pool handling (#2725)

    * [fix] etcd config source prefix issue (#2389)
    
    * http transport data race issue (#2436)
    
    * [fix] #2431 http transport data race issue
    
    * [feature] Ability to close connection while receiving.
    Ability to send messages while receiving.
    Icreased r channel limit to 100 to more fluently communication.
    Do not dropp sent request if r channel is full.
    
    * [fix] Use pool connection close timeout
    
    * [fix] replace Close with private function
    
    * [fix] Do not close the transport client twice in stream connection , the transport client is closed in the rpc codec
    
    * [fix] tests
    
    ---------
    
    Co-authored-by: Johnson C <chengqiaosheng@gmail.com>
    Ak-Army and xpunch authored Jul 23, 2024
    Copy the full SHA
    0433e98 View commit details

Commits on Aug 26, 2024

  1. go mod tidy (#2730)

    morya authored Aug 26, 2024
    Copy the full SHA
    b318b7f View commit details

Commits on Oct 28, 2024

  1. invalidate service if node was not updated (#2736)

    Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
    butonic authored Oct 28, 2024
    Copy the full SHA
    14a1791 View commit details

Commits on Nov 15, 2024

  1. update ttl for updated nodes only (#2740)

    Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
    butonic authored Nov 15, 2024
    Copy the full SHA
    e032a6a View commit details
Showing with 137 additions and 57 deletions.
  1. +2 −10 README.md
  2. +20 −9 client/options.go
  3. +5 −1 client/rpc_client.go
  4. +8 −0 cmd/cmd.go
  5. +4 −0 config/loader/memory/memory.go
  6. +0 −1 go.mod
  7. +0 −2 go.sum
  8. +28 −1 registry/cache/cache.go
  9. +57 −27 util/pool/default.go
  10. +3 −3 util/pool/default_test.go
  11. +10 −3 util/pool/options.go
12 changes: 2 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@ Go Micro abstracts away the details of distributed systems. Here are the main fe
- **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application
level config from any source such as env vars, file, etcd. You can merge the sources and even define fallbacks.

- **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and
CockroachDB by default. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
- **Data Storage** - A simple data store interface to read, write and delete records. It includes support for many storage backends
in the plugins repo. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.

- **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service
development. When service A needs to speak to service B it needs the location of that service. The default discovery mechanism is
@@ -40,12 +40,6 @@ Go Micro abstracts away the details of distributed systems. Here are the main fe
- **Async Messaging** - PubSub is built in as a first class citizen for asynchronous communication and event driven architectures.
Event notifications are a core pattern in micro service development. The default messaging system is a HTTP event message broker.

- **Event Streaming** - PubSub is great for async notifications but for more advanced use cases event streaming is preferred. Offering
persistent storage, consuming from offsets and acking. Go Micro includes support for NATS Jetstream and Redis streams.

- **Synchronization** - Distributed systems are often built in an eventually consistent manner. Support for distributed locking and
leadership are built in as a Sync interface. When using an eventually consistent database or scheduling use the Sync interface.

- **Pluggable Interfaces** - Go Micro makes use of Go interfaces for each distributed system abstraction. Because of this these interfaces
are pluggable and allows Go Micro to be runtime agnostic. You can plugin any underlying technology.

@@ -111,6 +105,4 @@ curl -XPOST \
http://localhost:8080
```

See the [examples](https://github.com/go-micro/examples) for detailed information on usage.


29 changes: 20 additions & 9 deletions client/options.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,8 @@ var (
DefaultPoolSize = 100
// DefaultPoolTTL sets the connection pool ttl.
DefaultPoolTTL = time.Minute
// DefaultPoolCloseTimeout sets the connection pool colse timeout.
DefaultPoolCloseTimeout = time.Second
)

// Options are the Client options.
@@ -63,8 +65,9 @@ type Options struct {
Wrappers []Wrapper

// Connection Pool
PoolSize int
PoolTTL time.Duration
PoolSize int
PoolTTL time.Duration
PoolCloseTimeout time.Duration
}

// CallOptions are options used to make calls to a server.
@@ -140,13 +143,14 @@ func NewOptions(options ...Option) Options {
ConnectionTimeout: DefaultConnectionTimeout,
DialTimeout: transport.DefaultDialTimeout,
},
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
Broker: broker.DefaultBroker,
Selector: selector.DefaultSelector,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
Logger: logger.DefaultLogger,
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
PoolCloseTimeout: DefaultPoolCloseTimeout,
Broker: broker.DefaultBroker,
Selector: selector.DefaultSelector,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
Logger: logger.DefaultLogger,
}

for _, o := range options {
@@ -191,6 +195,13 @@ func PoolTTL(d time.Duration) Option {
}
}

// PoolCloseTimeout sets the connection pool close timeout.
func PoolCloseTimeout(d time.Duration) Option {
return func(o *Options) {
o.PoolCloseTimeout = d
}
}

// Registry to find nodes for a given service.
func Registry(r registry.Registry) Option {
return func(o *Options) {
6 changes: 5 additions & 1 deletion client/rpc_client.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ func newRPCClient(opt ...Option) Client {
pool.Size(opts.PoolSize),
pool.TTL(opts.PoolTTL),
pool.Transport(opts.Transport),
pool.CloseTimeout(opts.PoolCloseTimeout),
)

rc := &rpcClient{
@@ -148,7 +149,10 @@ func (r *rpcClient) call(

c, err := r.pool.Get(address, dOpts...)
if err != nil {
return merrors.InternalServerError("go.micro.client", "connection error: %v", err)
if c == nil {
return merrors.InternalServerError("go.micro.client", "connection error: %v", err)
}
logger.Log(log.ErrorLevel, "failed to close pool", err)
}

seq := atomic.AddUint64(&r.seq, 1) - 1
8 changes: 8 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
@@ -565,6 +565,14 @@ func (c *cmd) Before(ctx *cli.Context) error {
clientOpts = append(clientOpts, client.PoolTTL(d))
}

if t := ctx.String("client_pool_close_timeout"); len(t) > 0 {
d, err := time.ParseDuration(t)
if err != nil {
return fmt.Errorf("failed to parse client_pool_close_timeout: %v", t)
}
clientOpts = append(clientOpts, client.PoolCloseTimeout(d))
}

// We have some command line opts for the server.
// Lets set it up
if len(serverOpts) > 0 {
4 changes: 4 additions & 0 deletions config/loader/memory/memory.go
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ type updateValue struct {
}

type watcher struct {
sync.Mutex
value reader.Value
reader reader.Reader
version atomic.Value
@@ -427,6 +428,9 @@ func (w *watcher) Next() (*loader.Snapshot, error) {
}

func (w *watcher) Stop() error {
w.Lock()
defer w.Unlock()

select {
case <-w.exit:
default:
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ go 1.18

require (
github.com/bitly/go-simplejson v0.5.0
github.com/ef-ds/deque v1.0.4
github.com/fsnotify/fsnotify v1.6.0
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.3.0
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
29 changes: 28 additions & 1 deletion registry/cache/cache.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ type cache struct {
sg singleflight.Group
cache map[string][]*registry.Service
ttls map[string]time.Time
nttls map[string]map[string]time.Time // node ttls
watched map[string]bool

// used to stop the cache
@@ -94,6 +95,17 @@ func (c *cache) isValid(services []*registry.Service, ttl time.Time) bool {
return false
}

// a node did not get updated
for _, s := range services {
for _, n := range s.Nodes {
nttl := c.nttls[s.Name][n.Id]
if time.Since(nttl) > 0 {
delete(c.nttls, s.Name)
return false
}
}
}

// ok
return true
}
@@ -115,6 +127,7 @@ func (c *cache) del(service string) {
// otherwise delete entries
delete(c.cache, service)
delete(c.ttls, service)
delete(c.nttls, service)
}

func (c *cache) get(service string) ([]*registry.Service, error) {
@@ -128,7 +141,7 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
// make a copy
cp := util.Copy(services)

// got services && within ttl so return cache
// got services, nodes && within ttl so return cache
if c.isValid(cp, ttl) {
c.RUnlock()
// return services
@@ -163,6 +176,9 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
// cache results
cp := util.Copy(services)
c.Lock()
for _, s := range services {
c.updateNodeTTLs(service, s.Nodes)
}
c.set(service, services)
c.Unlock()

@@ -199,6 +215,15 @@ func (c *cache) set(service string, services []*registry.Service) {
c.ttls[service] = time.Now().Add(c.opts.TTL)
}

func (c *cache) updateNodeTTLs(name string, nodes []*registry.Node) {
if c.nttls[name] == nil {
c.nttls[name] = make(map[string]time.Time)
}
for _, node := range nodes {
c.nttls[name][node.Id] = time.Now().Add(c.opts.TTL)
}
}

func (c *cache) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
@@ -239,6 +264,7 @@ func (c *cache) update(res *registry.Result) {

switch res.Action {
case "create", "update":
c.updateNodeTTLs(res.Service.Name, res.Service.Nodes)
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
@@ -483,6 +509,7 @@ func New(r registry.Registry, opts ...Option) Cache {
watchedRunning: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
nttls: make(map[string]map[string]time.Time),
exit: make(chan bool),
}
}
Loading