From 0213732905b8498db0398c1ab7fb160c4c5d5426 Mon Sep 17 00:00:00 2001 From: Berkay Tekin Oz Date: Fri, 15 Mar 2024 10:56:58 +0300 Subject: [PATCH] Initial work for node config propagation --- src/k8s/go.mod | 1 + src/k8s/go.sum | 2 + src/k8s/pkg/component/dns.go | 2 +- src/k8s/pkg/component/gateway.go | 2 +- src/k8s/pkg/component/ingress.go | 2 +- src/k8s/pkg/component/loadbalancer.go | 4 +- src/k8s/pkg/k8sd/api/cluster_config.go | 20 +++++ src/k8s/pkg/k8sd/api/cluster_remove.go | 2 +- src/k8s/pkg/k8sd/api/impl/k8sd.go | 2 +- src/k8s/pkg/k8sd/api/worker.go | 2 +- src/k8s/pkg/k8sd/app/app.go | 4 + src/k8s/pkg/k8sd/app/cluster_util.go | 2 +- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 19 +++++ src/k8s/pkg/k8sd/app/hooks_join.go | 2 +- src/k8s/pkg/k8sd/app/hooks_start.go | 16 ++++ .../k8sd/controllers/node_configuration.go | 79 +++++++++++++++++++ src/k8s/pkg/k8sd/database/util_test.go | 3 + src/k8s/pkg/k8sd/types/node_config.go | 7 ++ src/k8s/pkg/snap/interface.go | 3 +- src/k8s/pkg/snap/mock/mock.go | 58 +++++++------- src/k8s/pkg/snap/snap.go | 11 +++ src/k8s/pkg/utils/k8s/client.go | 6 +- src/k8s/pkg/utils/k8s/configmap.go | 42 ++++++++++ 23 files changed, 250 insertions(+), 41 deletions(-) create mode 100644 src/k8s/pkg/k8sd/app/hooks_start.go create mode 100644 src/k8s/pkg/k8sd/controllers/node_configuration.go create mode 100644 src/k8s/pkg/k8sd/types/node_config.go create mode 100644 src/k8s/pkg/utils/k8s/configmap.go diff --git a/src/k8s/go.mod b/src/k8s/go.mod index effc3bd48..59588a879 100644 --- a/src/k8s/go.mod +++ b/src/k8s/go.mod @@ -6,6 +6,7 @@ require ( github.com/canonical/go-dqlite v1.21.0 github.com/canonical/lxd v0.0.0-20231002162033-38796399c135 github.com/canonical/microcluster v0.0.0-20240122235408-1525f8ea8d7a + github.com/mitchellh/mapstructure v1.5.0 github.com/moby/sys/mountinfo v0.7.1 github.com/onsi/gomega v1.30.0 github.com/pelletier/go-toml v1.9.5 diff --git a/src/k8s/go.sum b/src/k8s/go.sum index b359eb714..97640975f 100644 --- a/src/k8s/go.sum +++ b/src/k8s/go.sum @@ -464,6 +464,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= diff --git a/src/k8s/pkg/component/dns.go b/src/k8s/pkg/component/dns.go index 8ace87143..019a075a8 100644 --- a/src/k8s/pkg/component/dns.go +++ b/src/k8s/pkg/component/dns.go @@ -81,7 +81,7 @@ func UpdateDNSComponent(ctx context.Context, s snap.Snap, isRefresh bool, cluste } } - client, err := k8s.NewClient(s) + client, err := k8s.NewClient(s.KubernetesRESTClientGetter("")) if err != nil { return "", "", fmt.Errorf("failed to create kubernetes client: %w", err) } diff --git a/src/k8s/pkg/component/gateway.go b/src/k8s/pkg/component/gateway.go index 95d565ab3..e4e236865 100644 --- a/src/k8s/pkg/component/gateway.go +++ b/src/k8s/pkg/component/gateway.go @@ -38,7 +38,7 @@ func UpdateGatewayComponent(ctx context.Context, s snap.Snap, isRefresh bool) er return fmt.Errorf("failed to enable gateway component: %w", err) } - client, err := k8s.NewClient(s) + client, err := k8s.NewClient(s.KubernetesRESTClientGetter("")) if err != nil { return fmt.Errorf("failed to create kubernetes client: %w", err) } diff --git a/src/k8s/pkg/component/ingress.go b/src/k8s/pkg/component/ingress.go index 77d482a8e..c2600cc13 100644 --- a/src/k8s/pkg/component/ingress.go +++ b/src/k8s/pkg/component/ingress.go @@ -30,7 +30,7 @@ func UpdateIngressComponent(ctx context.Context, s snap.Snap, isRefresh bool, de return fmt.Errorf("failed to enable ingress component: %w", err) } - client, err := k8s.NewClient(s) + client, err := k8s.NewClient(s.KubernetesRESTClientGetter("")) if err != nil { return fmt.Errorf("failed to create kubernetes client: %w", err) } diff --git a/src/k8s/pkg/component/loadbalancer.go b/src/k8s/pkg/component/loadbalancer.go index b4e560538..2159a093d 100644 --- a/src/k8s/pkg/component/loadbalancer.go +++ b/src/k8s/pkg/component/loadbalancer.go @@ -40,7 +40,7 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo } // Wait for cilium CRDs to be available. - k8sClient, err := k8s.NewClient(s) + k8sClient, err := k8s.NewClient(s.KubernetesRESTClientGetter("")) if err != nil { return fmt.Errorf("failed to create k8s client: %w", err) } @@ -104,7 +104,7 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo } } - client, err := k8s.NewClient(s) + client, err := k8s.NewClient(s.KubernetesRESTClientGetter("")) if err != nil { return fmt.Errorf("failed to create kubernetes client: %w", err) } diff --git a/src/k8s/pkg/k8sd/api/cluster_config.go b/src/k8s/pkg/k8sd/api/cluster_config.go index 88df041d6..4cb9cf50a 100644 --- a/src/k8s/pkg/k8sd/api/cluster_config.go +++ b/src/k8s/pkg/k8sd/api/cluster_config.go @@ -9,12 +9,14 @@ import ( "net/http" api "github.com/canonical/k8s/api/v1" + "github.com/mitchellh/mapstructure" "github.com/canonical/k8s/pkg/component" "github.com/canonical/k8s/pkg/k8sd/database" "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/snap" "github.com/canonical/k8s/pkg/utils" + "github.com/canonical/k8s/pkg/utils/k8s" "github.com/canonical/k8s/pkg/utils/vals" "github.com/canonical/lxd/lxd/response" "github.com/canonical/microcluster/state" @@ -155,6 +157,7 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response { if err != nil { return response.InternalError(fmt.Errorf("failed to reconcile dns: %w", err)) } + if err := s.Database.Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error { if err := database.SetClusterConfig(ctx, tx, types.ClusterConfig{ Kubelet: types.Kubelet{ @@ -167,6 +170,23 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response { }); err != nil { return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err)) } + + var data map[string]string + if err := mapstructure.Decode(types.NodeConfig{ + ClusterDNS: dnsIP, + ClusterDomain: newConfig.Kubelet.ClusterDomain, + }, &data); err != nil { + return response.InternalError(fmt.Errorf("failed to encode node config: %w", err)) + } + + client, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) + if err != nil { + return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err)) + } + + if _, err := client.UpdateConfigMap(r.Context(), "kube-system", "k8sd-config", data); err != nil { + return response.InternalError(fmt.Errorf("failed to update node config: %w", err)) + } } if req.Config.LocalStorage != nil { diff --git a/src/k8s/pkg/k8sd/api/cluster_remove.go b/src/k8s/pkg/k8sd/api/cluster_remove.go index 7d93752f4..b80671dc1 100644 --- a/src/k8s/pkg/k8sd/api/cluster_remove.go +++ b/src/k8s/pkg/k8sd/api/cluster_remove.go @@ -44,7 +44,7 @@ func postClusterRemove(m *microcluster.MicroCluster, s *state.State, r *http.Req } if isWorker { // For worker nodes, we need to manually cleanup the kubernetes node and db entry. - c, err := k8s.NewClient(snap) + c, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) if err != nil { return response.InternalError(fmt.Errorf("failed to create k8s client: %w", err)) } diff --git a/src/k8s/pkg/k8sd/api/impl/k8sd.go b/src/k8s/pkg/k8sd/api/impl/k8sd.go index c7bf2a4b4..cf50bc194 100644 --- a/src/k8s/pkg/k8sd/api/impl/k8sd.go +++ b/src/k8s/pkg/k8sd/api/impl/k8sd.go @@ -27,7 +27,7 @@ func GetClusterStatus(ctx context.Context, s *state.State) (apiv1.ClusterStatus, return apiv1.ClusterStatus{}, fmt.Errorf("failed to get user-facing cluster config: %w", err) } - client, err := k8s.NewClient(snap) + client, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) if err != nil { return apiv1.ClusterStatus{}, fmt.Errorf("failed to create k8s client: %w", err) } diff --git a/src/k8s/pkg/k8sd/api/worker.go b/src/k8s/pkg/k8sd/api/worker.go index 6970cb50b..41561f022 100644 --- a/src/k8s/pkg/k8sd/api/worker.go +++ b/src/k8s/pkg/k8sd/api/worker.go @@ -45,7 +45,7 @@ func postWorkerInfo(s *state.State, r *http.Request) response.Response { } snap := snap.SnapFromContext(s.Context) - client, err := k8s.NewClient(snap) + client, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) if err != nil { return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err)) } diff --git a/src/k8s/pkg/k8sd/app/app.go b/src/k8s/pkg/k8sd/app/app.go index 09c1fa122..849d5e629 100644 --- a/src/k8s/pkg/k8sd/app/app.go +++ b/src/k8s/pkg/k8sd/app/app.go @@ -61,6 +61,7 @@ func (a *App) Run(customHooks *config.Hooks) error { OnBootstrap: onBootstrap, PostJoin: onPostJoin, PreRemove: onPreRemove, + OnStart: onStart, } if customHooks != nil { if customHooks.OnBootstrap != nil { @@ -72,6 +73,9 @@ func (a *App) Run(customHooks *config.Hooks) error { if customHooks.PreRemove != nil { hooks.PreRemove = customHooks.PreRemove } + if customHooks.OnStart != nil { + hooks.OnStart = customHooks.OnStart + } } err := a.MicroCluster.Start(api.Endpoints(a.MicroCluster), database.SchemaExtensions, hooks) if err != nil { diff --git a/src/k8s/pkg/k8sd/app/cluster_util.go b/src/k8s/pkg/k8sd/app/cluster_util.go index 770566177..f623aebcb 100644 --- a/src/k8s/pkg/k8sd/app/cluster_util.go +++ b/src/k8s/pkg/k8sd/app/cluster_util.go @@ -84,7 +84,7 @@ func startControlPlaneServices(ctx context.Context, snap snap.Snap, datastore st func waitApiServerReady(ctx context.Context, snap snap.Snap) error { // Wait for API server to come up - client, err := k8s.NewClient(snap) + client, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) if err != nil { return fmt.Errorf("failed to create Kubernetes client: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 98ad33ef9..55e648b69 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -21,8 +21,10 @@ import ( "github.com/canonical/k8s/pkg/snap" snaputil "github.com/canonical/k8s/pkg/snap/util" "github.com/canonical/k8s/pkg/utils" + "github.com/canonical/k8s/pkg/utils/k8s" "github.com/canonical/k8s/pkg/utils/vals" "github.com/canonical/microcluster/state" + "github.com/mitchellh/mapstructure" ) // onBootstrap is called after we bootstrap the first cluster node. @@ -305,6 +307,23 @@ func onBootstrapControlPlane(s *state.State, initConfig map[string]string) error }); err != nil { return fmt.Errorf("database transaction to update cluster configuration failed: %w", err) } + + var data map[string]string + if err := mapstructure.Decode(types.NodeConfig{ + ClusterDNS: dnsIP, + ClusterDomain: cfg.Kubelet.ClusterDomain, + }, &data); err != nil { + return fmt.Errorf("failed to encode node config: %w", err) + } + + client, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) + if err != nil { + return fmt.Errorf("failed to create kubernetes client: %w", err) + } + + if _, err := client.UpdateConfigMap(s.Context, "kube-system", "k8sd-config", data); err != nil { + return fmt.Errorf("failed to update node configs: %w", err) + } } if cfg.LocalStorage.Enabled != nil { diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index 5bfb916ee..1333ff5cf 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -164,7 +164,7 @@ func onPreRemove(s *state.State, force bool) error { default: } - c, err := k8s.NewClient(snap) + c, err := k8s.NewClient(snap.KubernetesRESTClientGetter("")) if err != nil { return fmt.Errorf("failed to create Kubernetes client: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_start.go b/src/k8s/pkg/k8sd/app/hooks_start.go new file mode 100644 index 000000000..9a781effc --- /dev/null +++ b/src/k8s/pkg/k8sd/app/hooks_start.go @@ -0,0 +1,16 @@ +package app + +import ( + "github.com/canonical/k8s/pkg/k8sd/controllers" + "github.com/canonical/k8s/pkg/snap" + "github.com/canonical/microcluster/state" +) + +func onStart(s *state.State) error { + snap := snap.SnapFromContext(s.Context) + + configController := controllers.NewNodeConfigurationController() + go configController.Run(s.Context, snap) + + return nil +} diff --git a/src/k8s/pkg/k8sd/controllers/node_configuration.go b/src/k8s/pkg/k8sd/controllers/node_configuration.go new file mode 100644 index 000000000..21c842dc6 --- /dev/null +++ b/src/k8s/pkg/k8sd/controllers/node_configuration.go @@ -0,0 +1,79 @@ +package controllers + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/k8s/pkg/snap" + snaputil "github.com/canonical/k8s/pkg/snap/util" + "github.com/canonical/k8s/pkg/utils/k8s" + "github.com/mitchellh/mapstructure" + v1 "k8s.io/api/core/v1" +) + +type NodeConfigurationController struct { +} + +func NewNodeConfigurationController() *NodeConfigurationController { + return &NodeConfigurationController{} +} + +func (c *NodeConfigurationController) Run(ctx context.Context, snap snap.Snap) error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(3 * time.Second): + default: + } + + client, err := k8s.NewClient(snap.KubernetesNodeRESTClientGetter("kube-system")) + if err != nil { + // This fails when the node is not bootstrapped or joined + log.Println(fmt.Errorf("failed to create kubernetes node client: %w", err)) + continue + } + + if err := client.WatchConfigMap(ctx, "kube-system", "k8sd-config", func(configMap *v1.ConfigMap) error { return c.reconcile(ctx, snap, configMap) }); err != nil { + // This also can fail during bootstrapping/start up when api-server is not ready + // So the watch requests get connection refused replies + log.Println(fmt.Errorf("error while watching configmap: %w", err)) + } + } +} + +func (c *NodeConfigurationController) reconcile(ctx context.Context, snap snap.Snap, configMap *v1.ConfigMap) error { + var nodeConfig types.NodeConfig + if err := mapstructure.Decode(configMap.Data, &nodeConfig); err != nil { + return fmt.Errorf("failed to decode node config: %w", err) + } + + var kubeletUpdateMap map[string]string = make(map[string]string) + var kubeletDeleteList []string + + if nodeConfig.ClusterDNS != "" { + kubeletUpdateMap["--cluster-dns"] = nodeConfig.ClusterDNS + } else { + kubeletDeleteList = append(kubeletDeleteList, "--cluster-dns") + } + + if nodeConfig.ClusterDomain != "" { + kubeletUpdateMap["--cluster-domain"] = nodeConfig.ClusterDomain + } + + mustRestartKubelet, err := snaputil.UpdateServiceArguments(snap, "kubelet", kubeletUpdateMap, kubeletDeleteList) + if err != nil { + return fmt.Errorf("failed to update kubelet arguments: %w", err) + } + + if mustRestartKubelet { + if err := snap.RestartService(ctx, "kubelet"); err != nil { + return fmt.Errorf("failed to restart kubelet to apply node configuration: %w", err) + } + } + + return nil +} diff --git a/src/k8s/pkg/k8sd/database/util_test.go b/src/k8s/pkg/k8sd/database/util_test.go index df3877c1c..e9e23985d 100644 --- a/src/k8s/pkg/k8sd/database/util_test.go +++ b/src/k8s/pkg/k8sd/database/util_test.go @@ -70,6 +70,9 @@ func WithDB(t *testing.T, f func(context.Context, DB)) { databaseCh <- s.Database return nil }, + OnStart: func(s *state.State) error { + return nil + }, }) }() diff --git a/src/k8s/pkg/k8sd/types/node_config.go b/src/k8s/pkg/k8sd/types/node_config.go new file mode 100644 index 000000000..957abe2af --- /dev/null +++ b/src/k8s/pkg/k8sd/types/node_config.go @@ -0,0 +1,7 @@ +package types + +type NodeConfig struct { + CloudProvider string `mapstructure:"cloud-provider,omitempty"` + ClusterDNS string `mapstructure:"cluster-dns,omitempty"` + ClusterDomain string `mapstructure:"cluster-domain,omitempty"` +} diff --git a/src/k8s/pkg/snap/interface.go b/src/k8s/pkg/snap/interface.go index 81b51fb0d..8f3d76abb 100644 --- a/src/k8s/pkg/snap/interface.go +++ b/src/k8s/pkg/snap/interface.go @@ -47,7 +47,8 @@ type Snap interface { Components() map[string]types.Component // available components - KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // admin kubernetes client + KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // admin kubernetes client + KubernetesNodeRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // node kubernetes client K8sDqliteClient(ctx context.Context) (*dqlite.Client, error) // go-dqlite client for k8s-dqlite } diff --git a/src/k8s/pkg/snap/mock/mock.go b/src/k8s/pkg/snap/mock/mock.go index 787c7c8f6..4f1d79b6a 100644 --- a/src/k8s/pkg/snap/mock/mock.go +++ b/src/k8s/pkg/snap/mock/mock.go @@ -10,33 +10,34 @@ import ( ) type Mock struct { - Strict bool - OnLXD bool - OnLXDErr error - UID int - GID int - KubernetesConfigDir string - KubernetesPKIDir string - EtcdPKIDir string - KubeletRootDir string - CNIConfDir string - CNIBinDir string - CNIPlugins []string - CNIPluginsBinary string - ContainerdConfigDir string - ContainerdExtraConfigDir string - ContainerdRegistryConfigDir string - ContainerdRootDir string - ContainerdSocketDir string - ContainerdStateDir string - K8sdStateDir string - K8sDqliteStateDir string - ServiceArgumentsDir string - ServiceExtraConfigDir string - LockFilesDir string - Components map[string]types.Component - KubernetesRESTClientGetter genericclioptions.RESTClientGetter - K8sDqliteClient *dqlite.Client + Strict bool + OnLXD bool + OnLXDErr error + UID int + GID int + KubernetesConfigDir string + KubernetesPKIDir string + EtcdPKIDir string + KubeletRootDir string + CNIConfDir string + CNIBinDir string + CNIPlugins []string + CNIPluginsBinary string + ContainerdConfigDir string + ContainerdExtraConfigDir string + ContainerdRegistryConfigDir string + ContainerdRootDir string + ContainerdSocketDir string + ContainerdStateDir string + K8sdStateDir string + K8sDqliteStateDir string + ServiceArgumentsDir string + ServiceExtraConfigDir string + LockFilesDir string + Components map[string]types.Component + KubernetesRESTClientGetter genericclioptions.RESTClientGetter + KubernetesNodeRESTClientGetter genericclioptions.RESTClientGetter + K8sDqliteClient *dqlite.Client } // Snap is a mock implementation for snap.Snap. @@ -151,6 +152,9 @@ func (s *Snap) Components() map[string]types.Component { func (s *Snap) KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter { return s.Mock.KubernetesRESTClientGetter } +func (s *Snap) KubernetesNodeRESTClientGetter(namespace string) genericclioptions.RESTClientGetter { + return s.Mock.KubernetesNodeRESTClientGetter +} func (s *Snap) K8sDqliteClient(context.Context) (*dqlite.Client, error) { return s.Mock.K8sDqliteClient, nil } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index f51e43d3b..08ac4d6ef 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -10,6 +10,7 @@ import ( "github.com/canonical/k8s/pkg/client/dqlite" "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/utils" + "github.com/canonical/k8s/pkg/utils/vals" "github.com/moby/sys/mountinfo" "gopkg.in/yaml.v2" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -238,6 +239,16 @@ func (s *snap) KubernetesRESTClientGetter(namespace string) genericclioptions.RE return flags } +func (s *snap) KubernetesNodeRESTClientGetter(namespace string) genericclioptions.RESTClientGetter { + flags := &genericclioptions.ConfigFlags{ + KubeConfig: vals.Pointer(path.Join(s.KubernetesConfigDir(), "kubelet.conf")), + } + if namespace != "" { + flags.Namespace = &namespace + } + return flags +} + func (s *snap) K8sDqliteClient(ctx context.Context) (*dqlite.Client, error) { client, err := dqlite.NewClient(ctx, dqlite.ClientOpts{ ClusterYAML: path.Join(s.snapCommonDir, "var", "lib", "k8s-dqlite", "cluster.yaml"), diff --git a/src/k8s/pkg/utils/k8s/client.go b/src/k8s/pkg/utils/k8s/client.go index d43daad54..5e761cb4b 100644 --- a/src/k8s/pkg/utils/k8s/client.go +++ b/src/k8s/pkg/utils/k8s/client.go @@ -3,7 +3,7 @@ package k8s import ( "fmt" - "github.com/canonical/k8s/pkg/snap" + "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" ) @@ -12,8 +12,8 @@ type Client struct { kubernetes.Interface } -func NewClient(snap snap.Snap) (*Client, error) { - config, err := snap.KubernetesRESTClientGetter("").ToRESTConfig() +func NewClient(restClientGetter genericclioptions.RESTClientGetter) (*Client, error) { + config, err := restClientGetter.ToRESTConfig() if err != nil { return nil, fmt.Errorf("failed to build Kubernetes REST config: %w", err) } diff --git a/src/k8s/pkg/utils/k8s/configmap.go b/src/k8s/pkg/utils/k8s/configmap.go new file mode 100644 index 000000000..20b9e78eb --- /dev/null +++ b/src/k8s/pkg/utils/k8s/configmap.go @@ -0,0 +1,42 @@ +package k8s + +import ( + "context" + "fmt" + "log" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + applyv1 "k8s.io/client-go/applyconfigurations/core/v1" +) + +func (c *Client) WatchConfigMap(ctx context.Context, namespace string, name string, reconcile func(configMap *v1.ConfigMap) error) error { + w, err := c.CoreV1().ConfigMaps(namespace).Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name})) + if err != nil { + return fmt.Errorf("failed to watch configmap, namespace: %s name: %s: %w", namespace, name, err) + } + defer w.Stop() + for { + select { + case <-ctx.Done(): + return nil + case evt := <-w.ResultChan(): + if evt.Object != nil { + configMap := evt.Object.(*v1.ConfigMap) + + if err := reconcile(configMap); err != nil { + log.Println(fmt.Errorf("failed to reconcile configmap, namespace: %s name: %s: %w", namespace, name, err)) + } + } + } + } +} + +func (c *Client) UpdateConfigMap(ctx context.Context, namespace string, name string, data map[string]string) (*v1.ConfigMap, error) { + opts := applyv1.ConfigMap(name, namespace).WithData(data) + configmap, err := c.CoreV1().ConfigMaps(namespace).Apply(ctx, opts, metav1.ApplyOptions{FieldManager: "ck-k8s-client"}) + if err != nil { + return nil, fmt.Errorf("failed to update configmap, namespace: %s name: %s: %w", namespace, name, err) + } + return configmap, nil +}