diff --git a/src/k8s/pkg/k8sd/app/app.go b/src/k8s/pkg/k8sd/app/app.go index df373c376..99b61722d 100644 --- a/src/k8s/pkg/k8sd/app/app.go +++ b/src/k8s/pkg/k8sd/app/app.go @@ -3,12 +3,17 @@ package app import ( "context" "fmt" + "sync" + "time" "github.com/canonical/k8s/pkg/k8sd/api" + "github.com/canonical/k8s/pkg/k8sd/controllers" "github.com/canonical/k8s/pkg/k8sd/database" "github.com/canonical/k8s/pkg/snap" + "github.com/canonical/k8s/pkg/utils/k8s" "github.com/canonical/microcluster/config" "github.com/canonical/microcluster/microcluster" + "github.com/canonical/microcluster/state" ) // Config defines configuration for the k8sd app. @@ -27,6 +32,11 @@ type Config struct { type App struct { microCluster *microcluster.MicroCluster snap snap.Snap + + // readyWg is used to denote that the microcluster node is now running + readyWg sync.WaitGroup + + nodeConfigController *controllers.NodeConfigurationController } // New initializes a new microcluster instance from configuration. @@ -44,10 +54,21 @@ func New(ctx context.Context, cfg Config) (*App, error) { return nil, fmt.Errorf("failed to create microcluster app: %w", err) } - return &App{ + app := &App{ microCluster: cluster, snap: cfg.Snap, - }, nil + } + app.readyWg.Add(1) + + app.nodeConfigController = controllers.NewNodeConfigurationController( + cfg.Snap, + app.readyWg.Wait, + func() (*k8s.Client, error) { + return k8s.NewClient(cfg.Snap.KubernetesNodeRESTClientGetter("kube-system")) + }, + ) + + return app, nil } // Run starts the microcluster node and waits until it terminates. @@ -80,3 +101,18 @@ func (a *App) Run(customHooks *config.Hooks) error { } return nil } + +func (a *App) markNodeReady(ctx context.Context, s *state.State) { + for { + if s.Database.IsOpen() { + a.readyWg.Done() + return + } + + select { + case <-ctx.Done(): + return + case <-time.After(3 * time.Second): + } + } +} diff --git a/src/k8s/pkg/k8sd/app/hooks_start.go b/src/k8s/pkg/k8sd/app/hooks_start.go index a5729e21b..23c418ce0 100644 --- a/src/k8s/pkg/k8sd/app/hooks_start.go +++ b/src/k8s/pkg/k8sd/app/hooks_start.go @@ -1,33 +1,17 @@ package app import ( - "context" - "time" - - "github.com/canonical/k8s/pkg/k8sd/controllers" - "github.com/canonical/k8s/pkg/utils/k8s" "github.com/canonical/microcluster/state" ) func (a *App) onStart(s *state.State) error { - snap := a.Snap() - - configController := controllers.NewNodeConfigurationController(snap, func(ctx context.Context) *k8s.Client { - for { - select { - case <-ctx.Done(): - return nil - case <-time.After(3 * time.Second): - } + // start a goroutine to mark the node as running + go a.markNodeReady(s.Context, s) - client, err := k8s.NewClient(snap.KubernetesNodeRESTClientGetter("kube-system")) - if err != nil { - continue - } - return client - } - }) - go configController.Run(s.Context) + // start node config controller + if a.nodeConfigController != nil { + go a.nodeConfigController.Run(s.Context) + } return nil } diff --git a/src/k8s/pkg/k8sd/controllers/node_configuration.go b/src/k8s/pkg/k8sd/controllers/node_configuration.go index 50e70b8a8..bf2d718f1 100644 --- a/src/k8s/pkg/k8sd/controllers/node_configuration.go +++ b/src/k8s/pkg/k8sd/controllers/node_configuration.go @@ -14,24 +14,48 @@ import ( ) type NodeConfigurationController struct { - snap snap.Snap - createK8sClient func(ctx context.Context) *k8s.Client + snap snap.Snap + waitReady func() + newK8sClient func() (*k8s.Client, error) } -func NewNodeConfigurationController(snap snap.Snap, createK8sClient func(ctx context.Context) *k8s.Client) *NodeConfigurationController { +func NewNodeConfigurationController(snap snap.Snap, waitReady func(), newK8sClient func() (*k8s.Client, error)) *NodeConfigurationController { return &NodeConfigurationController{ - snap: snap, - createK8sClient: createK8sClient, + snap: snap, + waitReady: waitReady, + newK8sClient: newK8sClient, + } +} + +func (c *NodeConfigurationController) retryNewK8sClient(ctx context.Context) (*k8s.Client, error) { + for { + client, err := c.newK8sClient() + if err == nil { + return client, nil + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(3 * time.Second): + } } } func (c *NodeConfigurationController) Run(ctx context.Context) { - client := c.createK8sClient(ctx) + // wait for microcluster node to be ready + c.waitReady() + + client, err := c.retryNewK8sClient(ctx) + if err != nil { + log.Println(fmt.Errorf("failed to create a Kubernetes client: %w", err)) + } + for { if err := client.WatchConfigMap(ctx, "kube-system", "k8sd-config", func(configMap *v1.ConfigMap) error { return c.reconcile(ctx, configMap) }); err != nil { // This also can fail during bootstrapping/start up when api-server is not ready // So the watch requests get connection refused replies - log.Println(fmt.Errorf("error while watching configmap: %w", err)) + log.Println(fmt.Errorf("failed to watch configmap: %w", err)) } select { diff --git a/src/k8s/pkg/k8sd/controllers/node_configuration_test.go b/src/k8s/pkg/k8sd/controllers/node_configuration_test.go index 7bccda8e5..e99806690 100644 --- a/src/k8s/pkg/k8sd/controllers/node_configuration_test.go +++ b/src/k8s/pkg/k8sd/controllers/node_configuration_test.go @@ -125,8 +125,8 @@ func TestConfigPropagation(t *testing.T) { watcher := watch.NewFake() clientset.PrependWatchReactor("configmaps", k8stesting.DefaultWatchReactor(watcher, nil)) - configController := NewNodeConfigurationController(s, func(ctx context.Context) *k8s.Client { - return &k8s.Client{Interface: clientset} + configController := NewNodeConfigurationController(s, func() {}, func() (*k8s.Client, error) { + return &k8s.Client{Interface: clientset}, nil }) go configController.Run(ctx)