From ffa3c8e2c2138140cb34aa79f71db6934d638667 Mon Sep 17 00:00:00 2001 From: Christian Stewart Date: Fri, 12 May 2023 15:52:57 -0700 Subject: [PATCH] refactor: changes to controllerbus Signed-off-by: Christian Stewart --- daemon/api/api.go | 2 +- examples/nats-host/main.go | 4 ++-- examples/nats-host/nats-single/main.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- hack/go.mod | 2 +- hack/go.sum | 4 ++-- http/dir-lookup-http-handler.go | 2 +- link/establish-link-ex.go | 2 +- link/open-stream-ex.go | 2 +- link/open-stream-with-link-ex.go | 2 +- peer/directive_ex.go | 7 +++++-- pubsub/controller/tracked-link.go | 2 +- pubsub/dir-build-channel-subscription.go | 2 +- sim/simulate/assertions.go | 2 +- sim/simulate/peer.go | 2 +- sim/tests/bifrost/pubsub_nats_test.go | 4 ++-- testbed/testbed.go | 4 ++-- transport/controller/controller.go | 2 +- transport/inproc/inproc_test.go | 4 ++-- 20 files changed, 30 insertions(+), 27 deletions(-) diff --git a/daemon/api/api.go b/daemon/api/api.go index 935c697b..1030ab7a 100644 --- a/daemon/api/api.go +++ b/daemon/api/api.go @@ -62,7 +62,7 @@ func (a *API) ForwardStreams( plCtx, a.bus, peer.NewGetPeer(targetPeerID), - false, + nil, reqCtxCancel, ) if err != nil { diff --git a/examples/nats-host/main.go b/examples/nats-host/main.go index 6e5dec37..2ad9ac20 100644 --- a/examples/nats-host/main.go +++ b/examples/nats-host/main.go @@ -128,7 +128,7 @@ func runNatsExample(c *cli.Context) error { ctx, lp2tb.Bus, pubsub.NewBuildChannelSubscription(channelID, lp2tb.PrivKey), - true, + bus.ReturnWhenIdle(), nil, ) if err != nil { @@ -143,7 +143,7 @@ func runNatsExample(c *cli.Context) error { ctx, lp0tb.Bus, pubsub.NewBuildChannelSubscription(channelID, lp0tb.PrivKey), - true, + bus.ReturnWhenIdle(), nil, ) if err != nil { diff --git a/examples/nats-host/nats-single/main.go b/examples/nats-host/nats-single/main.go index 1f2ef319..7b090313 100644 --- a/examples/nats-host/nats-single/main.go +++ b/examples/nats-host/nats-single/main.go @@ -93,7 +93,7 @@ func runNatsExample(c *cli.Context) error { ctx, lp0tb.Bus, pubsub.NewBuildChannelSubscription(channelID, lp0tb.PrivKey), - false, + nil, nil, ) if err != nil { diff --git a/go.mod b/go.mod index 67b8bb3c..e6f4eb33 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/aperturerobotics/bifrost go 1.19 require ( - github.com/aperturerobotics/controllerbus v0.27.0 // latest + github.com/aperturerobotics/controllerbus v0.27.1-0.20230512225029-073bdff7defe // latest github.com/aperturerobotics/entitygraph v0.4.0 github.com/aperturerobotics/starpc v0.19.1 // latest ) diff --git a/go.sum b/go.sum index 4390a3fb..6ea19612 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d04 github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d0464e58/go.mod h1:ougcjYEZDYV8pVtaNbA5sgYDukkYHyKtSsW/T3B13j0= github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f h1:bmScByQNGDPPy9T+zdwu816XaCbFtD5UDyqZMRiHJ80= github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f/go.mod h1:kIcZtLpq4UIZzOqduYLm1mYU1nuMBtN6XuDCtQ21QT8= -github.com/aperturerobotics/controllerbus v0.27.0 h1:5bUa31IBXOZGToNwpBh/uYjc/Gxrk5z2494VmAkd37Q= -github.com/aperturerobotics/controllerbus v0.27.0/go.mod h1:/fYwB1oNk5PvouAKR/NaXlkaUkf+YsgT2mwn7eV9s+A= +github.com/aperturerobotics/controllerbus v0.27.1-0.20230512225029-073bdff7defe h1:W96P4j9c5k9qdsdX8pbM4uQum1+6xayymBGNZSP1M68= +github.com/aperturerobotics/controllerbus v0.27.1-0.20230512225029-073bdff7defe/go.mod h1:/fYwB1oNk5PvouAKR/NaXlkaUkf+YsgT2mwn7eV9s+A= github.com/aperturerobotics/entitygraph v0.4.0 h1:V73zfiFrF6f8tBQ6Lg3YoptciPMFvfO0XySFRODWEuo= github.com/aperturerobotics/entitygraph v0.4.0/go.mod h1:k3re4TkpTDmCA2PVkZ/bGdq5GSG/OFLkeviainSQKFQ= github.com/aperturerobotics/logrus v1.9.1-0.20221224130652-ff61cbb763af h1:nRveANX8PK0j+6M+d6Hyl45W+fQpg6BDBfcrrFMbbrw= diff --git a/hack/go.mod b/hack/go.mod index b09ca184..bc852dbf 100644 --- a/hack/go.mod +++ b/hack/go.mod @@ -40,7 +40,7 @@ require ( github.com/OpenPeeDeeP/depguard v1.1.1 // indirect github.com/alexkohler/prealloc v1.0.0 // indirect github.com/alingse/asasalint v0.0.11 // indirect - github.com/aperturerobotics/controllerbus v0.27.0 // indirect + github.com/aperturerobotics/controllerbus v0.27.1-0.20230512225029-073bdff7defe // indirect github.com/aperturerobotics/entitygraph v0.4.0 // indirect github.com/aperturerobotics/timestamp v0.7.2 // indirect github.com/aperturerobotics/ts-proto-common-types v0.2.0 // indirect diff --git a/hack/go.sum b/hack/go.sum index 57467c96..6704ae7f 100644 --- a/hack/go.sum +++ b/hack/go.sum @@ -69,8 +69,8 @@ github.com/alexkohler/prealloc v1.0.0 h1:Hbq0/3fJPQhNkN0dR95AVrr6R7tou91y0uHG5pO github.com/alexkohler/prealloc v1.0.0/go.mod h1:VetnK3dIgFBBKmg0YnD9F9x6Icjd+9cvfHR56wJVlKE= github.com/alingse/asasalint v0.0.11 h1:SFwnQXJ49Kx/1GghOFz1XGqHYKp21Kq1nHad/0WQRnw= github.com/alingse/asasalint v0.0.11/go.mod h1:nCaoMhw7a9kSJObvQyVzNTPBDbNpdocqrSP7t/cW5+I= -github.com/aperturerobotics/controllerbus v0.27.0 h1:5bUa31IBXOZGToNwpBh/uYjc/Gxrk5z2494VmAkd37Q= -github.com/aperturerobotics/controllerbus v0.27.0/go.mod h1:/fYwB1oNk5PvouAKR/NaXlkaUkf+YsgT2mwn7eV9s+A= +github.com/aperturerobotics/controllerbus v0.27.1-0.20230512225029-073bdff7defe h1:W96P4j9c5k9qdsdX8pbM4uQum1+6xayymBGNZSP1M68= +github.com/aperturerobotics/controllerbus v0.27.1-0.20230512225029-073bdff7defe/go.mod h1:/fYwB1oNk5PvouAKR/NaXlkaUkf+YsgT2mwn7eV9s+A= github.com/aperturerobotics/entitygraph v0.4.0 h1:V73zfiFrF6f8tBQ6Lg3YoptciPMFvfO0XySFRODWEuo= github.com/aperturerobotics/entitygraph v0.4.0/go.mod h1:k3re4TkpTDmCA2PVkZ/bGdq5GSG/OFLkeviainSQKFQ= github.com/aperturerobotics/goprotowrap v0.3.0 h1:JLQ/1zVRdm2qwfeK7bPxWPg5fpTLlzovDUom0vBgcgM= diff --git a/http/dir-lookup-http-handler.go b/http/dir-lookup-http-handler.go index b0fa21c7..19bd46ae 100644 --- a/http/dir-lookup-http-handler.go +++ b/http/dir-lookup-http-handler.go @@ -72,7 +72,7 @@ func ExLookupFirstHTTPHandler( ctx, b, NewLookupHTTPHandler(handlerURL, clientID), - returnIfIdle, + bus.ReturnIfIdle(returnIfIdle), valDisposeCb, nil, ) diff --git a/link/establish-link-ex.go b/link/establish-link-ex.go index 246fb78b..fe40e323 100644 --- a/link/establish-link-ex.go +++ b/link/establish-link-ex.go @@ -21,7 +21,7 @@ func EstablishLinkWithPeerEx( NewEstablishLinkWithPeer( localPeerID, remotePeerID, ), - returnIfIdle, + bus.ReturnIfIdle(returnIfIdle), nil, nil, ) diff --git a/link/open-stream-ex.go b/link/open-stream-ex.go index da54d7d5..9052d3be 100644 --- a/link/open-stream-ex.go +++ b/link/open-stream-ex.go @@ -36,7 +36,7 @@ func OpenStreamWithPeerEx( transportID, openOpts, ), - false, + nil, nil, nil, ) diff --git a/link/open-stream-with-link-ex.go b/link/open-stream-with-link-ex.go index f2d90557..ddc5f9a9 100644 --- a/link/open-stream-with-link-ex.go +++ b/link/open-stream-with-link-ex.go @@ -28,7 +28,7 @@ func OpenStreamViaLinkEx( openOpts, transportID, ), - false, + nil, nil, nil, ) diff --git a/peer/directive_ex.go b/peer/directive_ex.go index 2b6e7e1f..28750d54 100644 --- a/peer/directive_ex.go +++ b/peer/directive_ex.go @@ -9,7 +9,6 @@ import ( // GetPeerWithID gets a peer. // If peer ID is empty, selects any peer. -// returnIfIdle if set, will return if the directive becomes idle. // valDisposeCallback is called when the value is no longer valid. // valDisposeCallback can be nil. func GetPeerWithID( @@ -19,11 +18,15 @@ func GetPeerWithID( returnIfIdle bool, valDisposeCallback func(), ) (Peer, directive.Instance, directive.Reference, error) { + var idleCb bus.ExecIdleCallback + if returnIfIdle { + idleCb = bus.ReturnWhenIdle() + } return bus.ExecWaitValue[Peer]( ctx, b, NewGetPeer(peerIDConstraint), - returnIfIdle, + idleCb, valDisposeCallback, nil, ) diff --git a/pubsub/controller/tracked-link.go b/pubsub/controller/tracked-link.go index c95e3123..af24b803 100644 --- a/pubsub/controller/tracked-link.go +++ b/pubsub/controller/tracked-link.go @@ -41,7 +41,7 @@ func (t *trackedLink) trackLink(ctx context.Context) error { stream.OpenOpts{Reliable: true, Encrypted: true}, t.lnk.GetTransportUUID(), ), - false, + nil, nil, nil, ) diff --git a/pubsub/dir-build-channel-subscription.go b/pubsub/dir-build-channel-subscription.go index eb3140ea..9bf1af1b 100644 --- a/pubsub/dir-build-channel-subscription.go +++ b/pubsub/dir-build-channel-subscription.go @@ -55,7 +55,7 @@ func ExBuildChannelSubscription( ctx, b, NewBuildChannelSubscription(channelID, privKey), - returnIfIdle, + bus.ReturnIfIdle(returnIfIdle), valDisposeCallback, nil, ) diff --git a/sim/simulate/assertions.go b/sim/simulate/assertions.go index bce83776..2058c40f 100644 --- a/sim/simulate/assertions.go +++ b/sim/simulate/assertions.go @@ -25,7 +25,7 @@ func TestConnectivity(ctx context.Context, px0, px1 *Peer) error { 0, stream.OpenOpts{Reliable: true, Encrypted: true}, ), - false, + nil, nil, ) if err != nil { diff --git a/sim/simulate/peer.go b/sim/simulate/peer.go index 4c337717..867c80f4 100644 --- a/sim/simulate/peer.go +++ b/sim/simulate/peer.go @@ -94,7 +94,7 @@ func newPeer(ctx context.Context, le *logrus.Entry, gp *graph.Peer) (*Peer, erro np.ctx, np.testbed.Bus, resolver.NewLoadControllerWithConfig(&configset_controller.Config{}), - false, + nil, nil, ) if err != nil { diff --git a/sim/tests/bifrost/pubsub_nats_test.go b/sim/tests/bifrost/pubsub_nats_test.go index 294c0057..179cb8d6 100644 --- a/sim/tests/bifrost/pubsub_nats_test.go +++ b/sim/tests/bifrost/pubsub_nats_test.go @@ -82,7 +82,7 @@ func TestPubsubNATS(t *testing.T) { ctx, lp1tb.Bus, pubsub.NewBuildChannelSubscription(channelID, lp1tb.PrivKey), - false, + nil, nil, ) if err != nil { @@ -97,7 +97,7 @@ func TestPubsubNATS(t *testing.T) { ctx, lp0tb.Bus, pubsub.NewBuildChannelSubscription(channelID, lp0tb.PrivKey), - false, + nil, nil, ) if err != nil { diff --git a/testbed/testbed.go b/testbed/testbed.go index 0c9632b7..935ebbc7 100644 --- a/testbed/testbed.go +++ b/testbed/testbed.go @@ -93,7 +93,7 @@ func NewTestbed(ctx context.Context, le *logrus.Entry, opts TestbedOpts) (*Testb ctx, t.Bus, resolver.NewLoadControllerWithConfig(peerConfig), - false, + nil, nil, ) if err != nil { @@ -108,7 +108,7 @@ func NewTestbed(ctx context.Context, le *logrus.Entry, opts TestbedOpts) (*Testb ctx, t.Bus, resolver.NewLoadControllerWithConfig(&stream_echo.Config{}), - false, + nil, nil, ) if err != nil { diff --git a/transport/controller/controller.go b/transport/controller/controller.go index db6f5249..e3bdf1e1 100644 --- a/transport/controller/controller.go +++ b/transport/controller/controller.go @@ -333,7 +333,7 @@ func (c *Controller) HandleIncomingStream( WithField("remote-peer", lnk.GetRemotePeer().Pretty()). Debug("accepted stream") dir := link.NewHandleMountedStream(pid, c.localPeerID, mstrm.GetPeerID()) - dval, _, dref, err := bus.ExecOneOff(ctx, c.bus, dir, false, nil) + dval, _, dref, err := bus.ExecOneOff(ctx, c.bus, dir, nil, nil) if err != nil { le.WithError(err).Warn("error retrieving stream handler for stream") strm.Close() diff --git a/transport/inproc/inproc_test.go b/transport/inproc/inproc_test.go index d7058f89..ee6538cf 100644 --- a/transport/inproc/inproc_test.go +++ b/transport/inproc/inproc_test.go @@ -99,7 +99,7 @@ func TestEstablishLink(t *testing.T) { ctx, tb2.Bus, link.NewEstablishLinkWithPeer("", peerId1), - false, + nil, nil, ) if err != nil { @@ -122,7 +122,7 @@ func TestEstablishLink(t *testing.T) { 0, stream.OpenOpts{Reliable: true, Encrypted: true}, ), - false, + nil, nil, ) if err != nil {