Skip to content

Commit

Permalink
Make sure link accepts are not single threaded. Fixes #1669
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Dec 22, 2023
1 parent 76c1544 commit de5c195
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 47 deletions.
2 changes: 1 addition & 1 deletion controller/handler_ctrl/circuit_confirmation.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (self *circuitConfirmationHandler) checkCircuitMaxIdle(circuit *network.Cir
return
}

log.Infof("removing idle circuit, idle time of %s exceedes max idle time of %s",
log.Infof("removing idle circuit, idle time of %s exceeds max idle time of %s",
time.Duration(idleTime).String(), service.MaxIdleTime.String())

if err := self.n.RemoveCircuit(circuit.Id, true); err != nil {
Expand Down
20 changes: 4 additions & 16 deletions controller/handler_ctrl/router_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package handler_ctrl
import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/controller/network"
"google.golang.org/protobuf/proto"
)

Expand All @@ -46,23 +46,11 @@ func (h *routerLinkHandler) HandleReceive(msg *channel.Message, ch channel.Chann
return
}

go h.HandleLinks(ch, link)
go h.HandleLinks(link)
}

func (h *routerLinkHandler) HandleLinks(ch channel.Channel, links *ctrl_pb.RouterLinks) {
log := pfxlog.ContextLogger(ch.Label()).WithField("routerId", ch.Id())

func (h *routerLinkHandler) HandleLinks(links *ctrl_pb.RouterLinks) {
for _, link := range links.Links {
linkLog := log.WithField("linkId", link.Id).
WithField("destRouterId", link.DestRouterId)

created, err := h.network.NotifyExistingLink(link.Id, link.LinkProtocol, link.DialAddress, h.r, link.DestRouterId)
if err != nil {
linkLog.WithError(err).Error("unexpected error adding router reported link")
} else if created {
linkLog.Info("router reported link added")
} else {
linkLog.Info("router reported link already known")
}
h.network.NotifyExistingLink(link.Id, link.LinkProtocol, link.DialAddress, h.r, link.DestRouterId)
}
}
14 changes: 11 additions & 3 deletions controller/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,19 +381,27 @@ func (network *Network) DisconnectRouter(r *Router) {
network.routerChanged <- r
}

func (network *Network) NotifyExistingLink(id, linkProtocol, dialAddress string, srcRouter *Router, dstRouterId string) (bool, error) {
func (network *Network) NotifyExistingLink(id, linkProtocol, dialAddress string, srcRouter *Router, dstRouterId string) {
log := pfxlog.Logger().
WithField("routerId", srcRouter.Id).
WithField("linkId", id).
WithField("destRouterId", dstRouterId)

dst := network.Routers.getConnected(dstRouterId)
if dst == nil {
network.NotifyLinkIdEvent(id, event.LinkFromRouterDisconnectedDest)
return false, errors.New("destination router not connected")
log.Error("destination router not connected")
return
}

link, created := network.linkController.routerReportedLink(id, linkProtocol, dialAddress, srcRouter, dst)
if created {
network.NotifyLinkEvent(link, event.LinkFromRouterNew)
log.Info("router reported link added")
} else {
network.NotifyLinkEvent(link, event.LinkFromRouterKnown)
log.Info("router reported link already known")
}
return created, nil
}

func (network *Network) LinkConnected(msg *ctrl_pb.LinkConnected) error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/openziti/agent v1.0.16
github.com/openziti/channel/v2 v2.0.111
github.com/openziti/channel/v2 v2.0.112-0.20231222194626-a043d90d77c2
github.com/openziti/edge-api v0.26.7
github.com/openziti/foundation/v2 v2.0.35
github.com/openziti/identity v1.0.68
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/openziti/agent v1.0.16 h1:9Saji+8hFE1NpzP2XzDhsVJbCrDlhixoLHfOpFt5Z+U=
github.com/openziti/agent v1.0.16/go.mod h1:zfm53+PVWoGFzjGGgQdKby5749G6VRYHe+eQJmoVKy4=
github.com/openziti/channel/v2 v2.0.111 h1:ZZDyUUFcyshitXjUqAMjdAKbaDMpgV7oX1Jp1I35Rc4=
github.com/openziti/channel/v2 v2.0.111/go.mod h1:abw0qwT0MzWvh1eI2P6D6CD17PRHL8EEo1d3DHCyCdM=
github.com/openziti/channel/v2 v2.0.112-0.20231222194626-a043d90d77c2 h1:oTLxSyWGIob3x1P8Qgcr3dmn7YLMUgYj52as9KAfe9o=
github.com/openziti/channel/v2 v2.0.112-0.20231222194626-a043d90d77c2/go.mod h1:abw0qwT0MzWvh1eI2P6D6CD17PRHL8EEo1d3DHCyCdM=
github.com/openziti/dilithium v0.3.3 h1:PLgQ6PMNLSTzCFbX/h98cmudgz/cU6TmjdSv5NAPD8k=
github.com/openziti/dilithium v0.3.3/go.mod h1:vsCjI2AU/hon9e+dLhUFbCNGesJDj2ASgkySOcpmvjo=
github.com/openziti/edge-api v0.26.7 h1:dHLH7+O+Yp3HPmhgAbvq8z93EcZDypiSOHm/PVVUCoc=
Expand Down
35 changes: 14 additions & 21 deletions router/xlink_transport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@ import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
fabricMetrics "github.com/openziti/ziti/common/metrics"
"github.com/openziti/ziti/router/xlink"
"github.com/openziti/identity"
"github.com/openziti/metrics"
"github.com/openziti/transport/v2"
fabricMetrics "github.com/openziti/ziti/common/metrics"
"github.com/openziti/ziti/router/xlink"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"sync"
"time"
)

type listener struct {
id *identity.TokenId
config *listenerConfig
listener channel.UnderlayListener
listener io.Closer
accepter xlink.Acceptor
bindHandlerFactory BindHandlerFactory
tcfg transport.Configuration
Expand All @@ -46,18 +47,17 @@ type listener struct {

func (self *listener) Listen() error {
config := channel.ListenerConfig{
ConnectOptions: self.config.options.ConnectOptions,
TransportConfig: self.tcfg,
PoolConfigurator: fabricMetrics.GoroutinesPoolMetricsConfigF(self.metricsRegistry, "pool.listener.link"),
ConnectOptions: self.config.options.ConnectOptions,
TransportConfig: self.tcfg,
PoolConfigurator: fabricMetrics.GoroutinesPoolMetricsConfigF(self.metricsRegistry, "pool.listener.link"),
ConnectionHandlers: []channel.ConnectionHandler{&ConnectionHandler{self.id}},
}
listener := channel.NewClassicListener(self.id, self.config.bind, config)

self.listener = listener
connectionHandler := &ConnectionHandler{self.id}
if err := self.listener.Listen(connectionHandler); err != nil {
var err error
if self.listener, err = channel.NewClassicListenerF(self.id, self.config.bind, config, self.acceptNewUnderlay); err != nil {
return fmt.Errorf("error listening (%w)", err)
}
go self.acceptLoop()

go self.cleanupExpiredPartialLinks()
return nil
}
Expand Down Expand Up @@ -86,16 +86,9 @@ func (self *listener) GetLocalBinding() string {
return self.config.bindInterface
}

func (self *listener) acceptLoop() {
for {
_, err := channel.NewChannelWithTransportConfiguration("link", self.listener, self, self.config.options, self.tcfg)
if err != nil && errors.Is(err, channel.ListenerClosedError) {
logrus.Errorf("link underlay acceptor closed")
return
} else if err != nil {
logrus.Errorf("error creating link underlay (%v)", err)
continue
}
func (self *listener) acceptNewUnderlay(underlay channel.Underlay) {
if _, err := channel.NewChannelWithUnderlay("link", underlay, self, self.config.options); err != nil {
logrus.WithError(err).Error("error creating link channel")
}
}

Expand Down
2 changes: 1 addition & 1 deletion zititest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/google/uuid v1.5.0
github.com/michaelquigley/pfxlog v0.6.10
github.com/openziti/agent v1.0.16
github.com/openziti/channel/v2 v2.0.111
github.com/openziti/channel/v2 v2.0.112-0.20231222194626-a043d90d77c2
github.com/openziti/fablab v0.5.32
github.com/openziti/foundation/v2 v2.0.35
github.com/openziti/identity v1.0.68
Expand Down
4 changes: 2 additions & 2 deletions zititest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/openziti/agent v1.0.16 h1:9Saji+8hFE1NpzP2XzDhsVJbCrDlhixoLHfOpFt5Z+U=
github.com/openziti/agent v1.0.16/go.mod h1:zfm53+PVWoGFzjGGgQdKby5749G6VRYHe+eQJmoVKy4=
github.com/openziti/channel/v2 v2.0.111 h1:ZZDyUUFcyshitXjUqAMjdAKbaDMpgV7oX1Jp1I35Rc4=
github.com/openziti/channel/v2 v2.0.111/go.mod h1:abw0qwT0MzWvh1eI2P6D6CD17PRHL8EEo1d3DHCyCdM=
github.com/openziti/channel/v2 v2.0.112-0.20231222194626-a043d90d77c2 h1:oTLxSyWGIob3x1P8Qgcr3dmn7YLMUgYj52as9KAfe9o=
github.com/openziti/channel/v2 v2.0.112-0.20231222194626-a043d90d77c2/go.mod h1:abw0qwT0MzWvh1eI2P6D6CD17PRHL8EEo1d3DHCyCdM=
github.com/openziti/dilithium v0.3.3 h1:PLgQ6PMNLSTzCFbX/h98cmudgz/cU6TmjdSv5NAPD8k=
github.com/openziti/dilithium v0.3.3/go.mod h1:vsCjI2AU/hon9e+dLhUFbCNGesJDj2ASgkySOcpmvjo=
github.com/openziti/edge-api v0.26.7 h1:dHLH7+O+Yp3HPmhgAbvq8z93EcZDypiSOHm/PVVUCoc=
Expand Down

0 comments on commit de5c195

Please sign in to comment.