Skip to content

Commit

Permalink
Initial work for node config propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
berkayoz committed Mar 27, 2024
1 parent d4bc2d8 commit 0213732
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/k8s/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/canonical/go-dqlite v1.21.0
github.com/canonical/lxd v0.0.0-20231002162033-38796399c135
github.com/canonical/microcluster v0.0.0-20240122235408-1525f8ea8d7a
github.com/mitchellh/mapstructure v1.5.0
github.com/moby/sys/mountinfo v0.7.1
github.com/onsi/gomega v1.30.0
github.com/pelletier/go-toml v1.9.5
Expand Down
2 changes: 2 additions & 0 deletions src/k8s/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/component/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func UpdateDNSComponent(ctx context.Context, s snap.Snap, isRefresh bool, cluste
}
}

client, err := k8s.NewClient(s)
client, err := k8s.NewClient(s.KubernetesRESTClientGetter(""))
if err != nil {
return "", "", fmt.Errorf("failed to create kubernetes client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/component/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func UpdateGatewayComponent(ctx context.Context, s snap.Snap, isRefresh bool) er
return fmt.Errorf("failed to enable gateway component: %w", err)
}

client, err := k8s.NewClient(s)
client, err := k8s.NewClient(s.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/component/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func UpdateIngressComponent(ctx context.Context, s snap.Snap, isRefresh bool, de
return fmt.Errorf("failed to enable ingress component: %w", err)
}

client, err := k8s.NewClient(s)
client, err := k8s.NewClient(s.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions src/k8s/pkg/component/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo
}

// Wait for cilium CRDs to be available.
k8sClient, err := k8s.NewClient(s)
k8sClient, err := k8s.NewClient(s.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create k8s client: %w", err)
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo
}
}

client, err := k8s.NewClient(s)
client, err := k8s.NewClient(s.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
Expand Down
20 changes: 20 additions & 0 deletions src/k8s/pkg/k8sd/api/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"net/http"

api "github.com/canonical/k8s/api/v1"
"github.com/mitchellh/mapstructure"

"github.com/canonical/k8s/pkg/component"
"github.com/canonical/k8s/pkg/k8sd/database"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/state"
Expand Down Expand Up @@ -155,6 +157,7 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response {
if err != nil {
return response.InternalError(fmt.Errorf("failed to reconcile dns: %w", err))
}

if err := s.Database.Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error {
if err := database.SetClusterConfig(ctx, tx, types.ClusterConfig{
Kubelet: types.Kubelet{
Expand All @@ -167,6 +170,23 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response {
}); err != nil {
return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err))
}

var data map[string]string
if err := mapstructure.Decode(types.NodeConfig{
ClusterDNS: dnsIP,
ClusterDomain: newConfig.Kubelet.ClusterDomain,
}, &data); err != nil {
return response.InternalError(fmt.Errorf("failed to encode node config: %w", err))
}

client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err))
}

if _, err := client.UpdateConfigMap(r.Context(), "kube-system", "k8sd-config", data); err != nil {
return response.InternalError(fmt.Errorf("failed to update node config: %w", err))
}
}

if req.Config.LocalStorage != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/cluster_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func postClusterRemove(m *microcluster.MicroCluster, s *state.State, r *http.Req
}
if isWorker {
// For worker nodes, we need to manually cleanup the kubernetes node and db entry.
c, err := k8s.NewClient(snap)
c, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create k8s client: %w", err))
}
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/impl/k8sd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func GetClusterStatus(ctx context.Context, s *state.State) (apiv1.ClusterStatus,
return apiv1.ClusterStatus{}, fmt.Errorf("failed to get user-facing cluster config: %w", err)
}

client, err := k8s.NewClient(snap)
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return apiv1.ClusterStatus{}, fmt.Errorf("failed to create k8s client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func postWorkerInfo(s *state.State, r *http.Request) response.Response {
}

snap := snap.SnapFromContext(s.Context)
client, err := k8s.NewClient(snap)
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err))
}
Expand Down
4 changes: 4 additions & 0 deletions src/k8s/pkg/k8sd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (a *App) Run(customHooks *config.Hooks) error {
OnBootstrap: onBootstrap,
PostJoin: onPostJoin,
PreRemove: onPreRemove,
OnStart: onStart,
}
if customHooks != nil {
if customHooks.OnBootstrap != nil {
Expand All @@ -72,6 +73,9 @@ func (a *App) Run(customHooks *config.Hooks) error {
if customHooks.PreRemove != nil {
hooks.PreRemove = customHooks.PreRemove
}
if customHooks.OnStart != nil {
hooks.OnStart = customHooks.OnStart
}
}
err := a.MicroCluster.Start(api.Endpoints(a.MicroCluster), database.SchemaExtensions, hooks)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/app/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func startControlPlaneServices(ctx context.Context, snap snap.Snap, datastore st
func waitApiServerReady(ctx context.Context, snap snap.Snap) error {

// Wait for API server to come up
client, err := k8s.NewClient(snap)
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
Expand Down
19 changes: 19 additions & 0 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
"github.com/canonical/microcluster/state"
"github.com/mitchellh/mapstructure"
)

// onBootstrap is called after we bootstrap the first cluster node.
Expand Down Expand Up @@ -305,6 +307,23 @@ func onBootstrapControlPlane(s *state.State, initConfig map[string]string) error
}); err != nil {
return fmt.Errorf("database transaction to update cluster configuration failed: %w", err)
}

var data map[string]string
if err := mapstructure.Decode(types.NodeConfig{
ClusterDNS: dnsIP,
ClusterDomain: cfg.Kubelet.ClusterDomain,
}, &data); err != nil {
return fmt.Errorf("failed to encode node config: %w", err)
}

client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

if _, err := client.UpdateConfigMap(s.Context, "kube-system", "k8sd-config", data); err != nil {
return fmt.Errorf("failed to update node configs: %w", err)
}
}

if cfg.LocalStorage.Enabled != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func onPreRemove(s *state.State, force bool) error {
default:
}

c, err := k8s.NewClient(snap)
c, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
Expand Down
16 changes: 16 additions & 0 deletions src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package app

import (
"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/microcluster/state"
)

func onStart(s *state.State) error {
snap := snap.SnapFromContext(s.Context)

configController := controllers.NewNodeConfigurationController()
go configController.Run(s.Context, snap)

return nil
}
79 changes: 79 additions & 0 deletions src/k8s/pkg/k8sd/controllers/node_configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package controllers

import (
"context"
"fmt"
"log"
"time"

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/mitchellh/mapstructure"
v1 "k8s.io/api/core/v1"
)

type NodeConfigurationController struct {
}

func NewNodeConfigurationController() *NodeConfigurationController {
return &NodeConfigurationController{}
}

func (c *NodeConfigurationController) Run(ctx context.Context, snap snap.Snap) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(3 * time.Second):
default:
}

client, err := k8s.NewClient(snap.KubernetesNodeRESTClientGetter("kube-system"))
if err != nil {
// This fails when the node is not bootstrapped or joined
log.Println(fmt.Errorf("failed to create kubernetes node client: %w", err))
continue
}

if err := client.WatchConfigMap(ctx, "kube-system", "k8sd-config", func(configMap *v1.ConfigMap) error { return c.reconcile(ctx, snap, configMap) }); err != nil {
// This also can fail during bootstrapping/start up when api-server is not ready
// So the watch requests get connection refused replies
log.Println(fmt.Errorf("error while watching configmap: %w", err))
}
}
}

func (c *NodeConfigurationController) reconcile(ctx context.Context, snap snap.Snap, configMap *v1.ConfigMap) error {
var nodeConfig types.NodeConfig
if err := mapstructure.Decode(configMap.Data, &nodeConfig); err != nil {
return fmt.Errorf("failed to decode node config: %w", err)
}

var kubeletUpdateMap map[string]string = make(map[string]string)
var kubeletDeleteList []string

if nodeConfig.ClusterDNS != "" {
kubeletUpdateMap["--cluster-dns"] = nodeConfig.ClusterDNS
} else {
kubeletDeleteList = append(kubeletDeleteList, "--cluster-dns")
}

if nodeConfig.ClusterDomain != "" {
kubeletUpdateMap["--cluster-domain"] = nodeConfig.ClusterDomain
}

mustRestartKubelet, err := snaputil.UpdateServiceArguments(snap, "kubelet", kubeletUpdateMap, kubeletDeleteList)
if err != nil {
return fmt.Errorf("failed to update kubelet arguments: %w", err)
}

if mustRestartKubelet {
if err := snap.RestartService(ctx, "kubelet"); err != nil {
return fmt.Errorf("failed to restart kubelet to apply node configuration: %w", err)
}
}

return nil
}
3 changes: 3 additions & 0 deletions src/k8s/pkg/k8sd/database/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func WithDB(t *testing.T, f func(context.Context, DB)) {
databaseCh <- s.Database
return nil
},
OnStart: func(s *state.State) error {
return nil
},
})
}()

Expand Down
7 changes: 7 additions & 0 deletions src/k8s/pkg/k8sd/types/node_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package types

type NodeConfig struct {
CloudProvider string `mapstructure:"cloud-provider,omitempty"`
ClusterDNS string `mapstructure:"cluster-dns,omitempty"`
ClusterDomain string `mapstructure:"cluster-domain,omitempty"`
}
3 changes: 2 additions & 1 deletion src/k8s/pkg/snap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type Snap interface {

Components() map[string]types.Component // available components

KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // admin kubernetes client
KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // admin kubernetes client
KubernetesNodeRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // node kubernetes client

K8sDqliteClient(ctx context.Context) (*dqlite.Client, error) // go-dqlite client for k8s-dqlite
}
Loading

0 comments on commit 0213732

Please sign in to comment.