Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Replace fluentd deployment with statefulset #370

Merged
118 changes: 45 additions & 73 deletions api/turing/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type Controller interface {
ApplyIstioVirtualService(ctx context.Context, routerEndpoint *VirtualService) error
DeleteIstioVirtualService(ctx context.Context, svcName string, namespace string) error

// Deployment
DeleteKubernetesDeployment(ctx context.Context, name string, namespace string, ignoreNotFound bool) error
// StatefulSet
DeleteKubernetesStatefulSet(ctx context.Context, name string, namespace string, ignoreNotFound bool) error

// Service
DeployKubernetesService(ctx context.Context, svc *KubernetesService) error
Expand All @@ -82,8 +82,7 @@ type Controller interface {
DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error

// PVC
ApplyPersistentVolumeClaim(ctx context.Context, namespace string, pvc *PersistentVolumeClaim) error
DeletePersistentVolumeClaim(ctx context.Context, pvcName string, namespace string, ignoreNotFound bool) error
DeletePVCs(ctx context.Context, listOptions metav1.ListOptions, namespace string, ignoreNotFound bool) error

// Pod
ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error)
Expand Down Expand Up @@ -347,34 +346,34 @@ func (c *controller) GetKnativeServiceDesiredReplicas(
return int(*rev.Status.DesiredReplicas), nil
}

// DeployKubernetesService deploys a kubernetes service and deployment
// DeployKubernetesService deploys a kubernetes service and stateful set
func (c *controller) DeployKubernetesService(
ctx context.Context,
svcConf *KubernetesService,
) error {
desiredDeployment, desiredSvc := svcConf.BuildKubernetesServiceConfig()
desiredStatefulSet, desiredSvc := svcConf.BuildKubernetesServiceConfig()

// Deploy deployment
deployments := c.k8sAppsClient.Deployments(svcConf.Namespace)
// Check if deployment already exists. If exists, update it. If not, create.
var existingDeployment *apiappsv1.Deployment
// Deploy stateful set
statefulSets := c.k8sAppsClient.StatefulSets(svcConf.Namespace)
// Check if stateful set already exists. If exists, update it. If not, create.
var existingStatefulSet *apiappsv1.StatefulSet
var err error
existingDeployment, err = deployments.Get(ctx, svcConf.Name, metav1.GetOptions{})
existingStatefulSet, err = statefulSets.Get(ctx, svcConf.Name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
// Create new deployment
_, err = deployments.Create(ctx, desiredDeployment, metav1.CreateOptions{})
_, err = statefulSets.Create(ctx, desiredStatefulSet, metav1.CreateOptions{})
} else {
// Unexpected error, return it
return err
}
} else {
// Check for differences between current and new specs
if !k8sDeploymentSemanticEquals(desiredDeployment, existingDeployment) {
if !k8sStatefulSetSemanticEquals(existingStatefulSet, existingStatefulSet) {
// Update the existing service with the new config
existingDeployment.Spec.Template = desiredDeployment.Spec.Template
existingDeployment.ObjectMeta.Labels = desiredDeployment.ObjectMeta.Labels
_, err = deployments.Update(ctx, existingDeployment, metav1.UpdateOptions{})
existingStatefulSet.Spec.Template = desiredStatefulSet.Spec.Template
existingStatefulSet.ObjectMeta.Labels = desiredStatefulSet.ObjectMeta.Labels
_, err = statefulSets.Update(ctx, existingStatefulSet, metav1.UpdateOptions{})
}
}
if err != nil {
Expand Down Expand Up @@ -405,25 +404,25 @@ func (c *controller) DeployKubernetesService(
}

// Wait until deployment ready and return any errors
return c.waitDeploymentReady(ctx, svcConf.Name, svcConf.Namespace)
return c.waitStatefulSetReady(ctx, svcConf.Name, svcConf.Namespace)
}

// DeleteKubernetesDeployment deletes a kubernetes deployment
func (c *controller) DeleteKubernetesDeployment(
// DeleteKubernetesStatefulSet deletes a stateful set
func (c *controller) DeleteKubernetesStatefulSet(
ctx context.Context,
name string,
namespace string,
ignoreNotFound bool,
) error {
deployments := c.k8sAppsClient.Deployments(namespace)
_, err := deployments.Get(ctx, name, metav1.GetOptions{})
statefulSets := c.k8sAppsClient.StatefulSets(namespace)
_, err := statefulSets.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if ignoreNotFound {
return nil
}
return err
}
return deployments.Delete(ctx, name, metav1.DeleteOptions{})
return statefulSets.Delete(ctx, name, metav1.DeleteOptions{})
}

// DeleteKubernetesService deletes a kubernetes service
Expand Down Expand Up @@ -517,44 +516,21 @@ func (c *controller) GetKnativeServiceURL(ctx context.Context, svcName string, n
return url
}

// ApplyPersistentVolumeClaim creates a PVC in the given namespace.
// If the PVC already exists, it will update the existing PVC.
func (c *controller) ApplyPersistentVolumeClaim(
// DeletePVCs deletes all PVCs specified by the given list options in the given namespace.
func (c *controller) DeletePVCs(
ctx context.Context,
namespace string,
pvcCfg *PersistentVolumeClaim,
) error {
pvcs := c.k8sCoreClient.PersistentVolumeClaims(namespace)
existingPVC, err := pvcs.Get(ctx, pvcCfg.Name, metav1.GetOptions{})
pvc := pvcCfg.BuildPersistentVolumeClaim()

// If not exists, create
if err != nil {
_, err := pvcs.Create(ctx, pvc, metav1.CreateOptions{})
return err
}
// If exists, update
existingPVC.Spec.Resources = pvc.Spec.Resources
_, err = pvcs.Update(ctx, existingPVC, metav1.UpdateOptions{})
return err
}

// DeletePersistentVolumeClaim deletes the PVC in the given namespace.
func (c *controller) DeletePersistentVolumeClaim(
ctx context.Context,
pvcName string,
listOptions metav1.ListOptions,
namespace string,
ignoreNotFound bool,
) error {
pvcs := c.k8sCoreClient.PersistentVolumeClaims(namespace)
_, err := pvcs.Get(ctx, pvcName, metav1.GetOptions{})
_, err := c.k8sCoreClient.PersistentVolumeClaims(namespace).List(ctx, listOptions)
if err != nil {
if ignoreNotFound {
return nil
}
return fmt.Errorf("unable to get pvc with name %s: %s", pvcName, err.Error())
return err
}
return pvcs.Delete(ctx, pvcName, metav1.DeleteOptions{})
return c.k8sCoreClient.PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions)
}

func (c *controller) ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error) {
Expand Down Expand Up @@ -813,49 +789,45 @@ func (c *controller) getKnativePodTerminationMessage(ctx context.Context, svcNam
return terminationMessage
}

// waitDeploymentReady waits for the given k8s deployment to become ready, until the
// waitStatefulSetReady waits for the given k8s stateful set to become ready, until the
// default timeout
func (c *controller) waitDeploymentReady(
func (c *controller) waitStatefulSetReady(
ctx context.Context,
deploymentName string,
statefulSetName string,
namespace string,
) error {
// Init ticker to check status every second
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

// Init knative ServicesGetter
deployments := c.k8sAppsClient.Deployments(namespace)
// Init stateful set getter
statefulSets := c.k8sAppsClient.StatefulSets(namespace)

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for deployment %s to be ready", deploymentName)
return fmt.Errorf("timeout waiting for stateful set %s to be ready", statefulSetName)
case <-ticker.C:
deployment, err := deployments.Get(ctx, deploymentName, metav1.GetOptions{})
statefulSet, err := statefulSets.Get(ctx, statefulSetName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("unable to get deployment status for %s: %v", deploymentName, err)
return fmt.Errorf("unable to get stateful set status for %s: %v", statefulSetName, err)
}

if deploymentReady(deployment) {
if statefulSetReady(statefulSet) {
// Service is completely ready
return nil
}
}
}
}

func deploymentReady(deployment *apiappsv1.Deployment) bool {
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := deployment.Status.Conditions[0]
ready := cond.Type == apiappsv1.DeploymentAvailable
if deployment.Spec.Replicas != nil {
func statefulSetReady(statefulSet *apiappsv1.StatefulSet) bool {
if statefulSet.Generation <= statefulSet.Status.ObservedGeneration {
if statefulSet.Spec.Replicas != nil {
// Account for replica surge during updates
ready = ready &&
deployment.Status.ReadyReplicas == *deployment.Spec.Replicas &&
deployment.Status.Replicas == *deployment.Spec.Replicas
return statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas &&
statefulSet.Status.Replicas == *statefulSet.Spec.Replicas
}
return ready
}
return false
}
Expand All @@ -876,10 +848,10 @@ func knServiceSemanticEquals(desiredService, service *knservingv1.Service) bool
equality.Semantic.DeepEqual(desiredService.ObjectMeta.Labels, service.ObjectMeta.Labels)
}

func k8sDeploymentSemanticEquals(desiredDeployment, deployment *apiappsv1.Deployment) bool {
return equality.Semantic.DeepEqual(desiredDeployment.Spec.Template, deployment.Spec.Template) &&
equality.Semantic.DeepEqual(desiredDeployment.ObjectMeta.Labels, deployment.ObjectMeta.Labels) &&
desiredDeployment.Spec.Replicas == deployment.Spec.Replicas
func k8sStatefulSetSemanticEquals(desiredStatefulSet, statefulSet *apiappsv1.StatefulSet) bool {
return equality.Semantic.DeepEqual(desiredStatefulSet.Spec.Template, statefulSet.Spec.Template) &&
equality.Semantic.DeepEqual(desiredStatefulSet.ObjectMeta.Labels, statefulSet.ObjectMeta.Labels) &&
desiredStatefulSet.Spec.Replicas == statefulSet.Spec.Replicas
}

func k8sServiceSemanticEquals(desiredService, service *apicorev1.Service) bool {
Expand Down
Loading
Loading