From f631ca2ef75a723a24f8af9336d7580ed2b16454 Mon Sep 17 00:00:00 2001 From: Mantas Sidlauskas Date: Mon, 22 Apr 2024 21:33:40 +0300 Subject: [PATCH 1/2] Add peer provider plugin registration --- common/config/config.go | 10 ++++ common/peerprovider/plugin.go | 87 ++++++++++++++++++++++++++++++ common/peerprovider/plugin_test.go | 63 ++++++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 common/peerprovider/plugin.go create mode 100644 common/peerprovider/plugin_test.go diff --git a/common/config/config.go b/common/config/config.go index 38336e6388f..a7da3370b1f 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -43,6 +43,8 @@ type ( Config struct { // Ringpop is the ringpop related configuration Ringpop ringpopprovider.Config `yaml:"ringpop"` + // Membership is used to configure peer provider plugin + Membership Membership `yaml:"membership"` // Persistence contains the configuration for cadence datastores Persistence Persistence `yaml:"persistence"` // Log is the logging config @@ -84,6 +86,14 @@ type ( AsyncWorkflowQueues map[string]AsyncWorkflowQueueProvider `yaml:"asyncWorkflowQueues"` } + // Membership holds peer provider configuration. + Membership struct { + Provider PeerProvider `yaml:"provider"` + } + + // PeerProvider is provider config. Contents depends on plugin in use + PeerProvider map[string]*YamlNode + HeaderRule struct { Add bool // if false, matching headers are removed if previously matched. Match *regexp.Regexp diff --git a/common/peerprovider/plugin.go b/common/peerprovider/plugin.go new file mode 100644 index 00000000000..a3805184bd3 --- /dev/null +++ b/common/peerprovider/plugin.go @@ -0,0 +1,87 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package peerprovider + +import ( + "fmt" + + "go.uber.org/yarpc/transport/tchannel" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/syncmap" +) + +// Container is passed to peer provider plugin +type Container struct { + Service string + // Channel is required by ringpop + Channel tchannel.Channel + Logger log.Logger + Portmap membership.PortMap +} + +type constructorFn func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) + +var plugins = syncmap.New[string, plugin]() + +type plugin struct { + fn constructorFn + configKey string +} + +type Provider struct { + config config.PeerProvider + container Container +} + +func New(config config.PeerProvider, container Container) *Provider { + return &Provider{ + config: config, + container: container, + } +} + +func Register(configKey string, constructor constructorFn) error { + inserted := plugins.Put(configKey, plugin{ + fn: constructor, + configKey: configKey, + }) + + if !inserted { + return fmt.Errorf("peer provider %q is already registered", configKey) + } + + return nil +} + +func (p *Provider) Provider() (membership.PeerProvider, error) { + for configKey, cfg := range p.config { + if plugin, found := plugins.Get(configKey); found { + return plugin.fn(cfg, p.container) + } + } + + return nil, fmt.Errorf("no configured peer providers found") +} diff --git a/common/peerprovider/plugin_test.go b/common/peerprovider/plugin_test.go new file mode 100644 index 00000000000..f68eb918459 --- /dev/null +++ b/common/peerprovider/plugin_test.go @@ -0,0 +1,63 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package peerprovider + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/membership" +) + +func TestRegisterAllowsPluginOnlyOnce(t *testing.T) { + assert.NoError(t, Register("testConfig", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { return nil, nil })) + assert.Error(t, Register("testConfig", + func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { return nil, nil }), + "plugin can be registered only once", + ) + +} +func TestProviderRetrunsErrorWhenNoProviderRegistered(t *testing.T) { + a := Provider{ + config: nil, + container: Container{}, + } + p, err := a.Provider() + assert.Nil(t, p) + assert.EqualError(t, err, "no configured peer providers found") +} + +func TestProviderRetrunsErrorWhenNoConfigFound(t *testing.T) { + err := Register("providerName", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { + return nil, nil + }) + assert.NoError(t, err) + ppConfig := config.PeerProvider{ + "configKey": &config.YamlNode{}, + } + p, err := New(ppConfig, Container{}).Provider() + assert.Nil(t, p) + assert.Error(t, err) +} From b7d5a94840619d5ede114cb064207b085ecd82dc Mon Sep 17 00:00:00 2001 From: Mantas Sidlauskas Date: Wed, 24 Jul 2024 16:52:52 +0300 Subject: [PATCH 2/2] Allow only one peer provider to be registered --- common/peerprovider/plugin.go | 21 +++++++++--- common/peerprovider/plugin_test.go | 53 +++++++++++++++++++++--------- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/common/peerprovider/plugin.go b/common/peerprovider/plugin.go index a3805184bd3..567708fc372 100644 --- a/common/peerprovider/plugin.go +++ b/common/peerprovider/plugin.go @@ -33,6 +33,8 @@ import ( "github.com/uber/cadence/common/syncmap" ) +const key = "peerprovider" + // Container is passed to peer provider plugin type Container struct { Service string @@ -64,24 +66,33 @@ func New(config config.PeerProvider, container Container) *Provider { } func Register(configKey string, constructor constructorFn) error { - inserted := plugins.Put(configKey, plugin{ + + inserted := plugins.Put(key, plugin{ fn: constructor, configKey: configKey, }) + // only one plugin is allowed to be registered if !inserted { - return fmt.Errorf("peer provider %q is already registered", configKey) + registeredPlugin, _ := plugins.Get(key) + return fmt.Errorf("cannot register %q provider, %q is already registered", configKey, registeredPlugin.configKey) } return nil } func (p *Provider) Provider() (membership.PeerProvider, error) { + registeredPlugin, found := plugins.Get(key) + + if !found { + return nil, fmt.Errorf("no configured peer providers found") + } + for configKey, cfg := range p.config { - if plugin, found := plugins.Get(configKey); found { - return plugin.fn(cfg, p.container) + if configKey == registeredPlugin.configKey { + return registeredPlugin.fn(cfg, p.container) } } - return nil, fmt.Errorf("no configured peer providers found") + return nil, fmt.Errorf("no configuration for %q peer provider found", registeredPlugin.configKey) } diff --git a/common/peerprovider/plugin_test.go b/common/peerprovider/plugin_test.go index f68eb918459..431192984ea 100644 --- a/common/peerprovider/plugin_test.go +++ b/common/peerprovider/plugin_test.go @@ -29,17 +29,12 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/syncmap" ) -func TestRegisterAllowsPluginOnlyOnce(t *testing.T) { - assert.NoError(t, Register("testConfig", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { return nil, nil })) - assert.Error(t, Register("testConfig", - func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { return nil, nil }), - "plugin can be registered only once", - ) - -} func TestProviderRetrunsErrorWhenNoProviderRegistered(t *testing.T) { + // Reset plugins + plugins = syncmap.New[string, plugin]() a := Provider{ config: nil, container: Container{}, @@ -49,15 +44,43 @@ func TestProviderRetrunsErrorWhenNoProviderRegistered(t *testing.T) { assert.EqualError(t, err, "no configured peer providers found") } -func TestProviderRetrunsErrorWhenNoConfigFound(t *testing.T) { - err := Register("providerName", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { +func TestProviderRetrunsErrorWhenPluginAlreadyRegistered(t *testing.T) { + // Reset plugins + plugins = syncmap.New[string, plugin]() + err := Register("provider1", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { return nil, nil }) assert.NoError(t, err) - ppConfig := config.PeerProvider{ - "configKey": &config.YamlNode{}, - } - p, err := New(ppConfig, Container{}).Provider() - assert.Nil(t, p) + err = Register("provider2", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { + return nil, nil + }) assert.Error(t, err) } + +func TestConfigIsPickedUp(t *testing.T) { + // Reset plugins + plugins = syncmap.New[string, plugin]() + + peerProviderConfig := map[string]*config.YamlNode{} + peerProviderConfig["provider1"] = &config.YamlNode{} + + pp := New(peerProviderConfig, Container{}) + err := Register("provider1", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { + return nil, nil + }) + assert.NoError(t, err) + _, err = pp.Provider() + assert.NoError(t, err) +} + +func TestErrorWhenConfigIsNotProvided(t *testing.T) { + // Reset plugins + plugins = syncmap.New[string, plugin]() + pp := New(config.PeerProvider{}, Container{}) + err := Register("provider1", func(cfg *config.YamlNode, container Container) (membership.PeerProvider, error) { + return nil, nil + }) + p, err := pp.Provider() + assert.Nil(t, p) + assert.EqualError(t, err, "no configuration for \"provider1\" peer provider found") +}