From 032619f2587c789d97aefe1bdd97f3b815de798a Mon Sep 17 00:00:00 2001 From: Gabor Retvari Date: Thu, 28 Nov 2024 14:17:13 +0100 Subject: [PATCH] fix: No longer delay config-delete messages in the CDS server So far we have added a 5 sec delay before sending our config deletion messages on the CDS server watchers in order to prevent a race condition where a terminating client receives the config deletion message from the CDS server before it can actually enter the graceful shutdown cycle, which causes the immediate dropping of all active client connections. The delay allowed comfortable time to the client to start graceful shutdown. Unfortunately, the delay also caused unexpected resets in some hard-to-debug situations, where during adding the Gateway API resources there is a transient that causes a (delayed) delete due to a missing API resource, which then resets the full config later when all resources become available. This change removes the delay in the config-delete path of the CDS server, so all config-delete updates are immediately sent out. In order to prevent the race condition, the CDS client in stunnerd ignores delete-config CDS updates all together (but only in stunnerd, the auth-service and stunnerctl still process them): once started, there is no other way to stop a stunnerd pod than to delete the Gateway resource. --- cmd/stunnerctl/main.go | 2 +- cmd/stunnerd/main.go | 12 +- config.go | 8 +- config_test.go | 6 +- pkg/config/cds_test.go | 182 +++++++++++++++++++++++-------- pkg/config/client/cds_api.go | 45 ++++---- pkg/config/client/client.go | 11 +- pkg/config/client/file_client.go | 15 ++- pkg/config/server/config.go | 27 ++--- pkg/config/server/handler.go | 2 +- pkg/config/server/server.go | 42 +++---- 11 files changed, 220 insertions(+), 132 deletions(-) diff --git a/cmd/stunnerctl/main.go b/cmd/stunnerctl/main.go index 41d2bf23..8c15df44 100644 --- a/cmd/stunnerctl/main.go +++ b/cmd/stunnerctl/main.go @@ -173,7 +173,7 @@ func runConfig(_ *cobra.Command, args []string) error { confChan := make(chan *stnrv1.StunnerConfig, 8) if watch { - err := cds.Watch(ctx, confChan) + err := cds.Watch(ctx, confChan, false) if err != nil { close(confChan) return err diff --git a/cmd/stunnerd/main.go b/cmd/stunnerd/main.go index 744d1232..462e79d5 100644 --- a/cmd/stunnerd/main.go +++ b/cmd/stunnerd/main.go @@ -142,8 +142,8 @@ func main() { configOrigin = cdsAddr.Addr } - log.Infof("Watching configuration at origin %q", configOrigin) - if err := st.WatchConfig(ctx, configOrigin, conf); err != nil { + log.Infof("Watching configuration at origin %q (ignoring delete-config updates)", configOrigin) + if err := st.WatchConfig(ctx, configOrigin, conf, true); err != nil { log.Errorf("Could not run config watcher: %s", err.Error()) os.Exit(1) } @@ -195,11 +195,9 @@ func main() { c.Admin.LogLevel = logLevel } - // we have working stunnerd: reconcile log.Debug("Initiating reconciliation") - err := st.Reconcile(c) - log.Trace("Reconciliation ready") - if err != nil { + + if err := st.Reconcile(c); err != nil { if e, ok := err.(stnrv1.ErrRestarted); ok { log.Debugf("Reconciliation ready: %s", e.Error()) } else { @@ -207,6 +205,8 @@ func main() { "(running configuration unchanged): %s", err.Error()) } } + + log.Trace("Reconciliation ready") } } } diff --git a/config.go b/config.go index 601e5039..e39675be 100644 --- a/config.go +++ b/config.go @@ -143,7 +143,7 @@ func (s *Stunner) GetConfig() *stnrv1.StunnerConfig { return &c } -// LoadConfig loads a configuration from an origin. This is a shim wrapper around ConfigOrigin.Load. +// LoadConfig loads a configuration from an origin. This is a shim wrapper around configclient.Load. func (s *Stunner) LoadConfig(origin string) (*stnrv1.StunnerConfig, error) { client, err := client.New(origin, s.name, s.logger) if err != nil { @@ -153,12 +153,12 @@ func (s *Stunner) LoadConfig(origin string) (*stnrv1.StunnerConfig, error) { return client.Load() } -// WatchConfig watches a configuration from an origin. This is a shim wrapper around ConfigOrigin.Watch. -func (s *Stunner) WatchConfig(ctx context.Context, origin string, ch chan<- *stnrv1.StunnerConfig) error { +// WatchConfig watches a configuration from an origin. This is a shim wrapper around configclient.Watch. +func (s *Stunner) WatchConfig(ctx context.Context, origin string, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { client, err := client.New(origin, s.name, s.logger) if err != nil { return err } - return client.Watch(ctx, ch) + return client.Watch(ctx, ch, suppressDelete) } diff --git a/config_test.go b/config_test.go index 96c62086..0a9b247d 100644 --- a/config_test.go +++ b/config_test.go @@ -154,7 +154,7 @@ func TestStunnerConfigFileWatcher(t *testing.T) { defer cancel() url := "file://" + file - err = stunner.WatchConfig(ctx, url, conf) + err = stunner.WatchConfig(ctx, url, conf, false) assert.NoError(t, err, "creating config watcher") // nothing should happen here: wait a bit so that the watcher has comfortable time to start @@ -269,7 +269,7 @@ func TestStunnerConfigFileWatcherMultiVersion(t *testing.T) { defer cancel() url := "file://" + file - err = stunner.WatchConfig(ctx, url, conf) + err = stunner.WatchConfig(ctx, url, conf, false) assert.NoError(t, err, "creating config watcher") // nothing should happen here: wait a bit so that the watcher has comfortable time to start @@ -410,7 +410,7 @@ func TestStunnerConfigPollerMultiVersion(t *testing.T) { defer close(conf) log.Debug("init config poller") - assert.NoError(t, stunner.WatchConfig(ctx, origin, conf), "creating config poller") + assert.NoError(t, stunner.WatchConfig(ctx, origin, conf, true), "creating config poller") c2, ok := <-conf assert.True(t, ok, "config emitted") diff --git a/pkg/config/cds_test.go b/pkg/config/cds_test.go index c8519181..50a178d0 100644 --- a/pkg/config/cds_test.go +++ b/pkg/config/cds_test.go @@ -51,9 +51,7 @@ func TestServerLoad(t *testing.T) { testLog := logger.NewLogger("test") // suppress deletions - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = 0 - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() + server.SuppressConfigDeletion = true ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -144,11 +142,6 @@ func TestServerPoll(t *testing.T) { logger := logger.NewLoggerFactory(stunnerLogLevel) testLog := logger.NewLogger("test") - // suppress deletions - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = 0 - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -178,15 +171,15 @@ func TestServerPoll(t *testing.T) { defer close(ch3) go func() { - err = client1.Poll(ctx, ch1) + err = client1.Poll(ctx, ch1, false) assert.NoError(t, err, "client 1 cancelled") }() go func() { - err = client2.Poll(ctx, ch2) + err = client2.Poll(ctx, ch2, false) assert.NoError(t, err, "client 2 cancelled") }() go func() { - err = client3.Poll(ctx, ch2) + err = client3.Poll(ctx, ch2, false) assert.NoError(t, err, "client 3 cancelled") }() @@ -253,11 +246,6 @@ func TestServerWatch(t *testing.T) { logger := logger.NewLoggerFactory(stunnerLogLevel) testLog := logger.NewLogger("test") - // suppress deletions - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = 0 - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() - serverCtx, serverCancel := context.WithCancel(context.Background()) testCDSAddr := getRandCDSAddr() @@ -285,11 +273,11 @@ func TestServerWatch(t *testing.T) { clientCtx, clientCancel := context.WithCancel(context.Background()) defer clientCancel() - err = client1.Watch(clientCtx, ch1) + err = client1.Watch(clientCtx, ch1, false) assert.NoError(t, err, "client 1 watch") - err = client2.Watch(clientCtx, ch2) + err = client2.Watch(clientCtx, ch2, false) assert.NoError(t, err, "client 2 watch") - err = client3.Watch(clientCtx, ch3) + err = client3.Watch(clientCtx, ch3, false) assert.NoError(t, err, "client 3 watch") s := watchConfig(ch1, 150*time.Millisecond) @@ -435,11 +423,6 @@ func TestServerWatchBootstrap(t *testing.T) { logger := logger.NewLoggerFactory(stunnerLogLevel) testLog := logger.NewLogger("test") - // suppress deletions - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = 0 - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() - serverCtx, serverCancel := context.WithCancel(context.Background()) defer serverCancel() @@ -479,7 +462,7 @@ func TestServerWatchBootstrap(t *testing.T) { clientCtx, clientCancel := context.WithCancel(context.Background()) defer clientCancel() - err = client1.Watch(clientCtx, ch1) + err = client1.Watch(clientCtx, ch1, false) assert.NoError(t, err, "client 1 watch") s := watchConfig(ch1, 1500*time.Millisecond) @@ -534,11 +517,6 @@ func TestServerAPI(t *testing.T) { logger := logger.NewLoggerFactory(stunnerLogLevel) testLog := logger.NewLogger("test") - // suppress deletions - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = time.Millisecond - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() - serverCtx, serverCancel := context.WithCancel(context.Background()) testCDSAddr := getRandCDSAddr() @@ -570,13 +548,13 @@ func TestServerAPI(t *testing.T) { clientCtx, clientCancel := context.WithCancel(context.Background()) defer clientCancel() - err = client1.Watch(clientCtx, ch1) + err = client1.Watch(clientCtx, ch1, false) assert.NoError(t, err, "client 1 watch") - err = client2.Watch(clientCtx, ch2) + err = client2.Watch(clientCtx, ch2, false) assert.NoError(t, err, "client 2 watch") - err = client3.Watch(clientCtx, ch3) + err = client3.Watch(clientCtx, ch3, false) assert.NoError(t, err, "client 3 watch") - err = client4.Watch(clientCtx, ch4) + err = client4.Watch(clientCtx, ch4, false) assert.NoError(t, err, "client 4 watch") s := watchConfig(ch1, 50*time.Millisecond) @@ -877,6 +855,9 @@ func TestServerAPI(t *testing.T) { s = watchConfig(ch4, 50*time.Millisecond) assert.Nil(t, s) + // switch config deletions on + server.SuppressConfigDeletion = false + testLog.Debug("--------------------------------") testLog.Debug("Update1: ns1/gw1 + ns3/gw1 ") testLog.Debug("--------------------------------") @@ -953,8 +934,11 @@ func TestServerAPI(t *testing.T) { assert.NotNil(t, s1) s2 = watchConfig(ch2, 50*time.Millisecond) assert.NotNil(t, s2) - assert.True(t, s1.DeepEqual(sc1), "deepeq") - assert.True(t, client.IsConfigDeleted(s2), "deepeq") // deleted! + // we do not know the order + assert.True(t, s1.DeepEqual(sc1) || s2.DeepEqual(sc1), "config-deepeq") + assert.True(t, client.IsConfigDeleted(s1) || client.IsConfigDeleted(s2), "deleted") // deleted + // assert.True(t, s1.DeepEqual(sc1), "deepeq") + // assert.True(t, client.IsConfigDeleted(s2), "deepeq") // deleted! // no config from client3 watch s = watchConfig(ch3, 50*time.Millisecond) @@ -974,14 +958,12 @@ func TestClientReconnect(t *testing.T) { zlogger := zapr.NewLogger(z) log := zlogger.WithName("tester") - // suppress deletions - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = 0 - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() - logger := logger.NewLoggerFactory(stunnerLogLevel) testLog := logger.NewLogger("test") + // suppress deletions + server.SuppressConfigDeletion = true + serverCtx, serverCancel := context.WithCancel(context.Background()) defer serverCancel() @@ -1002,7 +984,7 @@ func TestClientReconnect(t *testing.T) { clientCtx, clientCancel := context.WithCancel(context.Background()) defer clientCancel() - err = client1.Watch(clientCtx, ch1) + err = client1.Watch(clientCtx, ch1, false) assert.NoError(t, err, "client 1 watch") s := watchConfig(ch1, 150*time.Millisecond) @@ -1055,10 +1037,8 @@ func TestServerUpdate(t *testing.T) { logger := logger.NewLoggerFactory(stunnerLogLevel) testLog := logger.NewLogger("test") - // make sure deletions are suppressed - deleteDelay := server.ConfigDeletionUpdateDelay - server.ConfigDeletionUpdateDelay = 0 - defer func() { server.ConfigDeletionUpdateDelay = deleteDelay }() + // suppress deletions + server.SuppressConfigDeletion = true serverCtx, serverCancel := context.WithCancel(context.Background()) defer serverCancel() @@ -1141,6 +1121,112 @@ func TestServerUpdate(t *testing.T) { assert.True(t, sc2.DeepEqual(tcpC), "deepeq") } +// Test various combinations of server-side "drop-delete" (server.SuppressConfigDeletion=true) and +// client-side "drop-delete" (client.Watch(..., suppressDelete=true)). +func TestDeleteConfigAPI(t *testing.T) { + zc := zap.NewProductionConfig() + zc.Level = zap.NewAtomicLevelAt(testerLogLevel) + z, err := zc.Build() + assert.NoError(t, err, "logger created") + zlogger := zapr.NewLogger(z) + log := zlogger.WithName("tester") + + logger := logger.NewLoggerFactory(stunnerLogLevel) + testLog := logger.NewLogger("test") + + saved := server.SuppressConfigDeletion + server.SuppressConfigDeletion = false + serverCtx, serverCancel := context.WithCancel(context.Background()) + defer serverCancel() + + testCDSAddr := getRandCDSAddr() + testLog.Debugf("create server on %s", testCDSAddr) + srv := server.New(testCDSAddr, nil, log) + assert.NotNil(t, srv, "server") + err = srv.Start(serverCtx) + assert.NoError(t, err, "start") + + testLog.Debug("create client") + c, err := client.New(testCDSAddr, "ns1/gw1", logger) + assert.NoError(t, err, "client") + + ch := make(chan *stnrv1.StunnerConfig, 8) + defer close(ch) + + for _, testCase := range []struct { + name string + serverDropDel, clientDropDel bool + tester func(t *testing.T) + }{ + { + name: "server sends delete - client handles delete", + serverDropDel: false, + clientDropDel: false, + tester: func(t *testing.T) { + conf := watchConfig(ch, 50*time.Millisecond) + assert.NotNil(t, conf, "config") + assert.True(t, client.IsConfigDeleted(conf)) + }, + }, + { + name: "server suppresses delete - client handles delete", + serverDropDel: true, + clientDropDel: false, + tester: func(t *testing.T) { + conf := watchConfig(ch, 50*time.Millisecond) + assert.Nil(t, conf, "config") + }, + }, + { + name: "server sends delete - client suppresses delete", + serverDropDel: false, + clientDropDel: true, + tester: func(t *testing.T) { + conf := watchConfig(ch, 50*time.Millisecond) + assert.Nil(t, conf, "config") + }, + }, + { + name: "server suppresses delete - client suppresses delete", + serverDropDel: true, + clientDropDel: true, + tester: func(t *testing.T) { + conf := watchConfig(ch, 50*time.Millisecond) + assert.Nil(t, conf, "config") + }, + }, + } { + testLog.Debugf("------------------------- %s ----------------------", testCase.name) + + server.SuppressConfigDeletion = testCase.serverDropDel + + clientCtx, clientCancel := context.WithCancel(context.Background()) + err = c.Watch(clientCtx, ch, testCase.clientDropDel) + assert.NoError(t, err, "client watch") + + conf := watchConfig(ch, 25*time.Millisecond) + assert.Nil(t, conf, "noconfig") + + testLog.Trace("Adding config") + testConf := testConfig("ns1/gw1", "realm1") + err = srv.UpdateConfig([]server.Config{testConf}) + assert.NoError(t, err, "update") + + conf = watchConfig(ch, 50*time.Millisecond) + assert.NotNil(t, conf) + assert.Equal(t, *testConf.Config, *conf) + + testLog.Trace("Deleting config") + err = srv.UpdateConfig([]server.Config{}) + assert.NoError(t, err, "update") + testCase.tester(t) + + clientCancel() + } + + server.SuppressConfigDeletion = saved +} + // func TestServerPatcher(t *testing.T) { // zc := zap.NewProductionConfig() // zc.Level = zap.NewAtomicLevelAt(testerLogLevel) @@ -1206,7 +1292,7 @@ func TestServerUpdate(t *testing.T) { // ch1 := make(chan *stnrv1.StunnerConfig, 8) // defer close(ch1) -// err = watcher1.Watch(watchCtx, ch1) +// err = watcher1.Watch(watchCtx, ch1, false) // assert.NoError(t, err, "client watch") // s := watchConfig(ch1, 100*time.Millisecond) @@ -1219,7 +1305,7 @@ func TestServerUpdate(t *testing.T) { // ch2 := make(chan *stnrv1.StunnerConfig, 8) // defer close(ch2) -// err = watcher2.Watch(watchCtx, ch2) +// err = watcher2.Watch(watchCtx, ch2, false) // assert.NoError(t, err, "client watch") // s = watchConfig(ch2, 100*time.Millisecond) diff --git a/pkg/config/client/cds_api.go b/pkg/config/client/cds_api.go index 39103e82..e017abe4 100644 --- a/pkg/config/client/cds_api.go +++ b/pkg/config/client/cds_api.go @@ -36,10 +36,12 @@ type CdsApi interface { // Get loads the config(s) from the API endpoint. Get(ctx context.Context) ([]*stnrv1.StunnerConfig, error) // Watch watches config(s) from the API endpoint of a CDS server. If the server is not - // available watch will retry, and if the connection goes away it will create a new one. - Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error + // available watch will retry, and if the connection goes away it will create a new one. If + // set, the suppressDelete instructs the API to ignore config delete updates from the + // server. + Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error // Poll creates a one-shot config watcher without the retry mechanincs of Watch. - Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error + Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error logging.LeveledLogger } @@ -98,14 +100,14 @@ func (a *AllConfigsAPI) Get(ctx context.Context) ([]*stnrv1.StunnerConfig, error return decodeConfigList(r.Body) } -func (a *AllConfigsAPI) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (a *AllConfigsAPI) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { a.Debugf("WATCH: watching all configs from CDS server %s", a.wsURI) - return watch(ctx, a, ch) + return watch(ctx, a, ch, suppressDelete) } -func (a *AllConfigsAPI) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (a *AllConfigsAPI) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { a.Debugf("POLL: polling all configs from CDS server %s", a.wsURI) - return poll(ctx, a, ch) + return poll(ctx, a, ch, suppressDelete) } // ConfigsNamespaceAPI is the API for listing all configs in a namespace. @@ -163,16 +165,16 @@ func (a *ConfigsNamespaceAPI) Get(ctx context.Context) ([]*stnrv1.StunnerConfig, return decodeConfigList(r.Body) } -func (a *ConfigsNamespaceAPI) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (a *ConfigsNamespaceAPI) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { a.Debugf("WATCH: watching all configs in namespace %s from CDS server %s", a.namespace, a.wsURI) - return watch(ctx, a, ch) + return watch(ctx, a, ch, suppressDelete) } -func (a *ConfigsNamespaceAPI) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (a *ConfigsNamespaceAPI) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { a.Debugf("POLL: polling all configs in namespace %s from CDS server %s", a.namespace, a.wsURI) - return poll(ctx, a, ch) + return poll(ctx, a, ch, suppressDelete) } type ConfigNamespaceNameAPI struct { @@ -231,22 +233,22 @@ func (a *ConfigNamespaceNameAPI) Get(ctx context.Context) ([]*stnrv1.StunnerConf return decodeConfig(r.Body) } -func (a *ConfigNamespaceNameAPI) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (a *ConfigNamespaceNameAPI) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { a.Debugf("WATCH: watching config for gateway %s/%s from CDS server %s", a.namespace, a.name, a.wsURI) - return watch(ctx, a, ch) + return watch(ctx, a, ch, suppressDelete) } -func (a *ConfigNamespaceNameAPI) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (a *ConfigNamespaceNameAPI) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { a.Debugf("POLL: polling config for gateway %s/%s from CDS server %s", a.namespace, a.name, a.wsURI) - return poll(ctx, a, ch) + return poll(ctx, a, ch, suppressDelete) } -func watch(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig) error { +func watch(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { go func() { for { - if err := poll(ctx, a, ch); err != nil { + if err := poll(ctx, a, ch, suppressDelete); err != nil { _, wsuri := a.Endpoint() a.Errorf("failed to init CDS watcher (url: %s): %s", wsuri, err.Error()) } else { @@ -265,7 +267,7 @@ func watch(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig) error // //////////// // API workers // //////////// -func poll(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig) error { +func poll(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { _, url := a.Endpoint() a.Tracef("poll: trying to open connection to CDS server at %s", url) @@ -354,6 +356,11 @@ func poll(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig) error continue } + if suppressDelete && IsConfigDeleted(c) { + a.Infof("Ignoring delete configuration update from %q", url) + continue + } + a.Debugf("new config received from %q: %q", url, c.String()) ch <- c @@ -363,7 +370,7 @@ func poll(ctx context.Context, a CdsApi, ch chan<- *stnrv1.StunnerConfig) error // wait fo cancel for { defer func() { - a.Infof("closing connection for client %s", conn.RemoteAddr().String()) + a.Infof("closing connection to server %s", conn.RemoteAddr().String()) conn.WriteMessage(websocket.CloseMessage, []byte{}) //nolint:errcheck conn.Close() closePinger <- struct{}{} diff --git a/pkg/config/client/client.go b/pkg/config/client/client.go index f1c2a863..d54a843d 100644 --- a/pkg/config/client/client.go +++ b/pkg/config/client/client.go @@ -33,12 +33,13 @@ var ( type Client interface { // Load grabs a new configuration from the config client. Load() (*stnrv1.StunnerConfig, error) - // Watch grabs new configs from a config origin (config file or CDS server) and returns - // them on the channel. The context cancels the watcher. If the origin is not available - // watch will retry. - Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error + // Watch listens to new configs from a config origin (config file or CDS server) and + // returns them on the given channel. The context cancels the watcher. If the origin is not + // available watch will retry. If set, the suppressDelete flag instructs the client to + // ignore delete config (essentially zero-configs) from the origin. + Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error // Poll creates a one-shot config watcher without the retry mechanincs of Watch. - Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error + Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error fmt.Stringer } diff --git a/pkg/config/client/file_client.go b/pkg/config/client/file_client.go index 3f0e7b2c..0ead3bfc 100644 --- a/pkg/config/client/file_client.go +++ b/pkg/config/client/file_client.go @@ -66,15 +66,15 @@ func (w *ConfigFileClient) Load() (*stnrv1.StunnerConfig, error) { // Watch watches a configuration file for changes. If no file exists at the given path, it will // periodically retry until the file appears. -func (w *ConfigFileClient) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (w *ConfigFileClient) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { if w.configFile == "" { - return errors.New("Uninitialized config file path") + return errors.New("uninitialized config file path") } go func() { for { // try to watch - if err := w.Poll(ctx, ch); err != nil { + if err := w.Poll(ctx, ch, suppressDelete); err != nil { w.log.Warnf("Error loading config file %q: %s", w.configFile, err.Error()) } else { @@ -92,7 +92,7 @@ func (w *ConfigFileClient) Watch(ctx context.Context, ch chan<- *stnrv1.StunnerC // Poll watches the config file and emits new configs on the specified channel. Returns an error if // further action is needed (tryWatchConfig is to be started) or nil on normal exit. -func (w *ConfigFileClient) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig) error { +func (w *ConfigFileClient) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerConfig, suppressDelete bool) error { w.log.Tracef("configWatcher") // create a new watcher @@ -160,7 +160,12 @@ func (w *ConfigFileClient) Poll(ctx context.Context, ch chan<- *stnrv1.StunnerCo // suppress repeated events if c.DeepEqual(&prev) { - w.log.Debugf("Ignoring recurrent notify event for the same config file") + w.log.Debug("Ignoring recurrent notify event for the same config file") + continue + } + + if suppressDelete && IsConfigDeleted(c) { + w.log.Info("Ignoring deleted configuration") continue } diff --git a/pkg/config/server/config.go b/pkg/config/server/config.go index 9b0d3ca2..159fcd61 100644 --- a/pkg/config/server/config.go +++ b/pkg/config/server/config.go @@ -28,24 +28,19 @@ func (s *Server) UpsertConfig(id string, c *stnrv1.StunnerConfig) { s.configCh <- Config{Id: id, Config: cpy} } -// DeleteConfig removes a config from the client. Theoretically, this should send the client a -// zero-config immediately. However, in order to avoid that a client being removed and entering the +// DeleteConfig removes a config from clients by sending a zero-config. Clients may decide to +// ignore the delete operation by (1) using client.IsConfigDeleted() to identify whether a config +// is being deleted and (2) selectively ignoring config delete updates based on the result. This is +// needed, e.g., in stunnerd, in order to avoid that a client being removed and entering the // graceful shutdown cycle receive a zeroconfig and abruprly kill all listeners with all active -// connections allocated to them, we actually delay sending the zeroconfig with a configurable time -// (default is 5 sec, but a zero delay will suppress sending the zero-config all together). This -// should allow the client comfortable time to enter the grafeul shutdown cycle. Note that clients -// should stop actively reconciling config updates once they initiated graceful shutdown for this -// to work. +// connections allocated to them. func (s *Server) DeleteConfig(id string) { s.configs.Delete(id) - if ConfigDeletionUpdateDelay == 0 { - s.log.Info("Suppressing config update for deleted config", "client", id) + if SuppressConfigDeletion { + s.log.Info("Suppressing config update for deleted config", "config-id", id) return } - s.log.Info("Delaying config update for deleted config", "client", id, - "delay", ConfigDeletionUpdateDelay) - s.deleteCh <- Config{Id: id, Config: client.ZeroConfig(id)} } @@ -60,11 +55,11 @@ func (s *Server) UpdateConfig(newConfigs []Config) error { for _, newC := range newConfigs { if oldC.Id == newC.Id { if !oldC.Config.DeepEqual(newC.Config) { - s.log.V(2).Info("Updating config", "client", newC.Id, "config", + s.log.V(2).Info("Updating config", "config-id", newC.Id, "config", newC.Config.String()) s.UpsertConfig(newC.Id, newC.Config) } else { - s.log.V(2).Info("Config not updated", "client", newC.Id, + s.log.V(2).Info("Config unchanged", "config-id", newC.Id, "old-config", oldC.Config.String(), "new-config", newC.Config.String()) } @@ -74,7 +69,7 @@ func (s *Server) UpdateConfig(newConfigs []Config) error { } if !found { - s.log.V(2).Info("Removing config", "client", oldC.Id) + s.log.V(2).Info("Removing config", "config-id", oldC.Id) s.DeleteConfig(oldC.Id) } } @@ -89,7 +84,7 @@ func (s *Server) UpdateConfig(newConfigs []Config) error { } if !found { - s.log.V(2).Info("Adding config", "client", newC.Id, "config", newC.Config) + s.log.V(2).Info("Adding config", "config-id", newC.Id, "config", newC.Config) s.UpsertConfig(newC.Id, newC.Config) } } diff --git a/pkg/config/server/handler.go b/pkg/config/server/handler.go index c2e5892d..f97e81c0 100644 --- a/pkg/config/server/handler.go +++ b/pkg/config/server/handler.go @@ -66,7 +66,7 @@ func (s *Server) GetV1ConfigNamespaceName(ctx context.Context, request api.GetV1 id := fmt.Sprintf("%s/%s", namespace, name) c := s.configs.Get(id) if c == nil { - s.log.V(1).Info("GetV1ConfigNamespaceName: Config not found", "client", id) + s.log.V(1).Info("GetV1ConfigNamespaceName: Config not found", "config-id", id) return api.GetV1ConfigNamespaceName404JSONResponse{ Code: http.StatusNotFound, Message: fmt.Sprintf("Config not found for ID %q", id), diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index f554cf52..f986bdfe 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -5,9 +5,9 @@ import ( "context" "encoding/json" "errors" + "fmt" "net" "net/http" - "time" "github.com/go-logr/logr" "github.com/gorilla/mux" @@ -18,10 +18,9 @@ import ( ) var ( - // ConfigDeletionUpdateDelay is the delay between deleting a config from the server and - // sending the corresponing zero-config to the client. Set this to zero to suppress sending - // the zero-config all together. - ConfigDeletionUpdateDelay = 5 * time.Second + // SuppressConfigDeletion allows the server to suppress config deletions all together. Used + // mostly for testing. + SuppressConfigDeletion = false ) // Server is a generic config discovery server implementation. @@ -60,11 +59,15 @@ func (s *Server) Start(ctx context.Context) error { handler := api.NewStrictHandler(s, []api.StrictMiddlewareFunc{s.WSUpgradeMiddleware}) api.HandlerFromMux(handler, s.router) s.Server = &http.Server{Addr: s.addr, Handler: s.router} + l, err := net.Listen("tcp", s.addr) + if err != nil { + return fmt.Errorf("CDS server failed to listen: %w", err) + } go func() { s.log.Info("Starting CDS server", "address", s.addr, "patch", s.patch != nil) - err := s.ListenAndServe() + err := s.Serve(l) if err != nil { if errors.Is(err, net.ErrClosed) || errors.Is(err, http.ErrServerClosed) { s.log.Info("Closing config discovery server") @@ -83,21 +86,11 @@ func (s *Server) Start(ctx context.Context) error { for { select { case c := <-s.configCh: + s.log.V(2).Info("Sending config update event", "config-id", c.Id) s.broadcastConfig(c) - case c := <-s.deleteCh: - s.log.V(2).Info("Initiating deleyed config deletion", "id", c.Id) - - go func() { - select { - case <-ctx.Done(): - return - case <-time.After(ConfigDeletionUpdateDelay): - s.configCh <- Config{Id: c.Id, Config: c.Config} - return - } - }() - + s.log.V(2).Info("Sending config delete event", "config-id", c.Id) + s.broadcastConfig(c) case <-ctx.Done(): return } @@ -135,9 +128,10 @@ func (s *Server) GetConnTrack() *ConnTrack { // RemoveClient forcefully closes a client connection. This is used mainly for testing. func (s *Server) RemoveClient(id string) { - if c := s.conns.Get(id); c != nil { - s.log.V(1).Info("Forcefully removing client connection", "client", id) - s.closeConn(c) + if conn := s.conns.Get(id); conn != nil { + s.log.V(1).Info("Forcefully removing client connection", "config-id", id, + "client", conn.RemoteAddr().String()) + s.closeConn(conn) } } @@ -156,6 +150,7 @@ func (s *Server) handleConn(reqCtx context.Context, wsConn *websocket.Conn, oper // drop anything we receive _, _, err := conn.ReadMessage() if err != nil { + s.closeConn(conn) return } } @@ -211,8 +206,7 @@ func (s *Server) sendConfig(conn *Conn, e *stnrv1.StunnerConfig) { return } - s.log.V(2).Info("Sending configuration to client", "client", conn.Id(), - "config", c.String()) + s.log.V(2).Info("Sending configuration to client", "client", conn.Id()) if err := conn.WriteMessage(websocket.TextMessage, json); err != nil { s.log.Error(err, "Error sending config update", "client", conn.Id())