Skip to content

Commit

Permalink
Wait for microcluster to be running before starting node config contr…
Browse files Browse the repository at this point in the history
…oller (#291)

* wait for microcluster to be running before starting node config controller
* adjust error logs in node configuration controller
  • Loading branch information
neoaggelos authored Apr 4, 2024
1 parent 225c500 commit 01d9247
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 33 deletions.
40 changes: 38 additions & 2 deletions src/k8s/pkg/k8sd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package app
import (
"context"
"fmt"
"sync"
"time"

"github.com/canonical/k8s/pkg/k8sd/api"
"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/k8sd/database"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/microcluster/config"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
)

// Config defines configuration for the k8sd app.
Expand All @@ -27,6 +32,11 @@ type Config struct {
type App struct {
microCluster *microcluster.MicroCluster
snap snap.Snap

// readyWg is used to denote that the microcluster node is now running
readyWg sync.WaitGroup

nodeConfigController *controllers.NodeConfigurationController
}

// New initializes a new microcluster instance from configuration.
Expand All @@ -44,10 +54,21 @@ func New(ctx context.Context, cfg Config) (*App, error) {
return nil, fmt.Errorf("failed to create microcluster app: %w", err)
}

return &App{
app := &App{
microCluster: cluster,
snap: cfg.Snap,
}, nil
}
app.readyWg.Add(1)

app.nodeConfigController = controllers.NewNodeConfigurationController(
cfg.Snap,
app.readyWg.Wait,
func() (*k8s.Client, error) {
return k8s.NewClient(cfg.Snap.KubernetesNodeRESTClientGetter("kube-system"))
},
)

return app, nil
}

// Run starts the microcluster node and waits until it terminates.
Expand Down Expand Up @@ -80,3 +101,18 @@ func (a *App) Run(customHooks *config.Hooks) error {
}
return nil
}

func (a *App) markNodeReady(ctx context.Context, s *state.State) {
for {
if s.Database.IsOpen() {
a.readyWg.Done()
return
}

select {
case <-ctx.Done():
return
case <-time.After(3 * time.Second):
}
}
}
28 changes: 6 additions & 22 deletions src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,17 @@
package app

import (
"context"
"time"

"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/microcluster/state"
)

func (a *App) onStart(s *state.State) error {
snap := a.Snap()

configController := controllers.NewNodeConfigurationController(snap, func(ctx context.Context) *k8s.Client {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(3 * time.Second):
}
// start a goroutine to mark the node as running
go a.markNodeReady(s.Context, s)

client, err := k8s.NewClient(snap.KubernetesNodeRESTClientGetter("kube-system"))
if err != nil {
continue
}
return client
}
})
go configController.Run(s.Context)
// start node config controller
if a.nodeConfigController != nil {
go a.nodeConfigController.Run(s.Context)
}

return nil
}
38 changes: 31 additions & 7 deletions src/k8s/pkg/k8sd/controllers/node_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,48 @@ import (
)

type NodeConfigurationController struct {
snap snap.Snap
createK8sClient func(ctx context.Context) *k8s.Client
snap snap.Snap
waitReady func()
newK8sClient func() (*k8s.Client, error)
}

func NewNodeConfigurationController(snap snap.Snap, createK8sClient func(ctx context.Context) *k8s.Client) *NodeConfigurationController {
func NewNodeConfigurationController(snap snap.Snap, waitReady func(), newK8sClient func() (*k8s.Client, error)) *NodeConfigurationController {
return &NodeConfigurationController{
snap: snap,
createK8sClient: createK8sClient,
snap: snap,
waitReady: waitReady,
newK8sClient: newK8sClient,
}
}

func (c *NodeConfigurationController) retryNewK8sClient(ctx context.Context) (*k8s.Client, error) {
for {
client, err := c.newK8sClient()
if err == nil {
return client, nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(3 * time.Second):
}
}
}

func (c *NodeConfigurationController) Run(ctx context.Context) {
client := c.createK8sClient(ctx)
// wait for microcluster node to be ready
c.waitReady()

client, err := c.retryNewK8sClient(ctx)
if err != nil {
log.Println(fmt.Errorf("failed to create a Kubernetes client: %w", err))
}

for {
if err := client.WatchConfigMap(ctx, "kube-system", "k8sd-config", func(configMap *v1.ConfigMap) error { return c.reconcile(ctx, configMap) }); err != nil {
// This also can fail during bootstrapping/start up when api-server is not ready
// So the watch requests get connection refused replies
log.Println(fmt.Errorf("error while watching configmap: %w", err))
log.Println(fmt.Errorf("failed to watch configmap: %w", err))
}

select {
Expand Down
4 changes: 2 additions & 2 deletions src/k8s/pkg/k8sd/controllers/node_configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func TestConfigPropagation(t *testing.T) {
watcher := watch.NewFake()
clientset.PrependWatchReactor("configmaps", k8stesting.DefaultWatchReactor(watcher, nil))

configController := NewNodeConfigurationController(s, func(ctx context.Context) *k8s.Client {
return &k8s.Client{Interface: clientset}
configController := NewNodeConfigurationController(s, func() {}, func() (*k8s.Client, error) {
return &k8s.Client{Interface: clientset}, nil
})

go configController.Run(ctx)
Expand Down

0 comments on commit 01d9247

Please sign in to comment.