Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Acto to run with user provided Kubernetes cluster #399

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from acto.engine import Acto, apply_testcase
from acto.input.input import DeterministicInputModel
from acto.kubernetes_engine import base, kind, provided

Check warning on line 13 in acto/__main__.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 13
from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_diff_test import PostDiffTest
from acto.utils.error_handler import handle_excepthook, thread_excepthook
Expand Down Expand Up @@ -133,11 +134,28 @@

apply_testcase_f = apply_testcase

kubernetes_engine: base.KubernetesEngine
if config.kubernetes_engine.self_provided:
kubernetes_engine = provided.ProvidedKubernetesEngine(
acto_namespace=0,
feature_gates=config.kubernetes_engine.feature_gates,
num_nodes=config.num_nodes,
version=config.kubernetes_version,
provided=config.kubernetes_engine.provided,
)
else:
kubernetes_engine = kind.Kind(

Check warning on line 147 in acto/__main__.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 137-147
acto_namespace=0,
feature_gates=config.kubernetes_engine.feature_gates,
num_nodes=config.num_nodes,
version=config.kubernetes_version,
)

start_time = datetime.now()
acto = Acto(
workdir_path=args.workdir_path,
operator_config=config,
cluster_runtime="KIND",
kubernetes_engine=kubernetes_engine,
context_file=context_cache,
helper_crd=args.helper_crd,
num_workers=args.num_workers,
Expand Down
4 changes: 2 additions & 2 deletions acto/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import random
import re
import string
from typing import Any, Sequence, Tuple, TypeAlias, Union
from typing import Any, Optional, Sequence, Tuple, TypeAlias, Union

import deepdiff.model as deepdiff_model
import kubernetes
Expand Down Expand Up @@ -391,7 +391,7 @@ def translate_op(input_op: str):


def kubernetes_client(
kubeconfig: str, context_name: str
kubeconfig: str, context_name: Optional[str]
) -> kubernetes.client.ApiClient:
"""Create a kubernetes client from kubeconfig and context name"""
return kubernetes.config.kube_config.new_client_from_config(
Expand Down
43 changes: 16 additions & 27 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from acto.input.testplan import TestGroup
from acto.input.value_with_schema import ValueWithSchema, attach_schema_to_value
from acto.kubectl_client import KubectlClient
from acto.kubernetes_engine import base, kind
from acto.kubernetes_engine import base
from acto.lib.operator_config import OperatorConfig
from acto.oracle_handle import OracleHandle
from acto.result import (
Expand Down Expand Up @@ -149,12 +149,20 @@
prev_pods = prev_system_state["pod"]

for k, v in curr_pods.items():
if "owner_reference" in v["metadata"] and v["metadata"]["owner_reference"] is not None and ["owner_references"][0]["kind"] == "Job":
if (
"owner_reference" in v["metadata"]
and v["metadata"]["owner_reference"] is not None
and v["metadata"]["owner_references"][0]["kind"] == "Job"
):
continue
curr_system_state[k] = v

for k, v in prev_pods.items():
if "owner_reference" in v["metadata"] and v["metadata"]["owner_reference"] is not None and ["owner_references"][0]["kind"] == "Job":
if (

Check warning on line 161 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 152-161
"owner_reference" in v["metadata"]
and v["metadata"]["owner_reference"] is not None
and v["metadata"]["owner_references"][0]["kind"] == "Job"
):
continue
prev_system_state[k] = v

Expand Down Expand Up @@ -255,7 +263,7 @@
runner_t: type,
checker_t: type,
wait_time: int,
custom_on_init: Optional[Callable],
custom_on_init: Optional[list[Callable]],
custom_checker: Optional[type[CheckerInterface]],
workdir: str,
cluster: base.KubernetesEngine,
Expand Down Expand Up @@ -805,7 +813,7 @@
self,
workdir_path: str,
operator_config: OperatorConfig,
cluster_runtime: str,
kubernetes_engine: base.KubernetesEngine,
context_file: str,
helper_crd: Optional[str],
num_workers: int,
Expand Down Expand Up @@ -833,26 +841,7 @@

deploy = Deploy(operator_config.deploy)

if cluster_runtime == "KIND":
cluster = kind.Kind(
acto_namespace=acto_namespace,
feature_gates=operator_config.kubernetes_engine.feature_gates,
num_nodes=operator_config.num_nodes,
version=operator_config.kubernetes_version,
)
else:
logger.warning(
"Cluster Runtime %s is not supported, defaulted to use kind",
cluster_runtime,
)
cluster = kind.Kind(
acto_namespace=acto_namespace,
feature_gates=operator_config.kubernetes_engine.feature_gates,
num_nodes=operator_config.num_nodes,
version=operator_config.kubernetes_version,
)

self.cluster = cluster
self.cluster = kubernetes_engine
self.deploy = deploy
self.operator_config = operator_config
self.crd_name = operator_config.crd_name
Expand Down Expand Up @@ -889,7 +878,7 @@
self.sequence_base = 0

self.custom_oracle: Optional[type[CheckerInterface]] = None
self.custom_on_init: Optional[Callable] = None
self.custom_on_init: Optional[list[Callable]] = None
if operator_config.custom_oracle is not None:
module = importlib.import_module(operator_config.custom_oracle)
if hasattr(module, "CUSTOM_CHECKER") and issubclass(
Expand Down
11 changes: 8 additions & 3 deletions acto/kubernetes_engine/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import kubernetes

from acto.constant import CONST
from acto.lib.operator_config import SelfProvidedKubernetesConfig
from acto.utils import get_thread_logger

KubernetesEnginePostHookType = Callable[[kubernetes.client.ApiClient], None]
Expand All @@ -22,13 +23,17 @@ def __init__(
feature_gates: Optional[dict[str, bool]] = None,
num_nodes=1,
version="",
provided: Optional[SelfProvidedKubernetesConfig] = None,
) -> None:
"""Constructor for KubernetesEngine

Args:
acto_namespace: the namespace of the acto
posthooks: a list of posthooks to be executed after the cluster is created
feature_gates: a list of feature gates to be enabled
feature_gates: a list of Kubernetes feature gates to be enabled
num_nodes: the number of nodes in the cluster
version: the version of Kubernetes
provided: the configuration for a self-provided Kubernetes engine
"""

@abstractmethod
Expand Down Expand Up @@ -91,10 +96,10 @@ def restart_cluster(self, name: str, kubeconfig: str):
continue
break

def get_node_list(self, name: str):
def get_node_list(self, name: str) -> list[str]:
"""Fetch the name of worker nodes inside a cluster
Args:
1. name: name of the cluster name
name: name of the cluster name
"""
_ = get_thread_logger(with_prefix=False)

Expand Down
8 changes: 4 additions & 4 deletions acto/kubernetes_engine/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import subprocess
import time
from typing import Any, Dict, List, Optional
from typing import Any, Optional

import kubernetes
import yaml
Expand All @@ -19,8 +19,8 @@
def __init__(
self,
acto_namespace: int,
posthooks: List[base.KubernetesEnginePostHookType] = None,
feature_gates: Dict[str, bool] = None,
posthooks: Optional[list[base.KubernetesEnginePostHookType]] = None,
feature_gates: Optional[dict[str, bool]] = None,
num_nodes=1,
version: Optional[str] = None,
):
Expand Down Expand Up @@ -140,7 +140,7 @@

if self._posthooks:
for posthook in self._posthooks:
posthook(apiclient=apiclient)
posthook(apiclient)

Check warning on line 143 in acto/kubernetes_engine/kind.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 143

def load_images(self, images_archive_path: str, name: str):
logging.info("Loading preload images")
Expand Down
113 changes: 113 additions & 0 deletions acto/kubernetes_engine/provided.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging
import subprocess
from typing import Optional

import kubernetes

from acto.common import kubernetes_client, print_event
from acto.constant import CONST

from . import base


class ProvidedKubernetesEngine(base.KubernetesEngine):
"""KubernetesEngine for user-provided k8s cluster

Configuration for user-provided k8s cluster is very limited,
as it is assumed that the user has already set up the cluster.
Everything needs to be deployed in the ACTO_NAMESPACE to provide the
necessary isolation.
"""

def __init__(
self,
acto_namespace: int,
posthooks: Optional[list[base.KubernetesEnginePostHookType]] = None,
feature_gates: Optional[dict[str, bool]] = None,
num_nodes: int = 1,
version: Optional[str] = None,
provided: Optional[base.SelfProvidedKubernetesConfig] = None,
):
self._posthooks = posthooks

if feature_gates:
logging.error("Feature gates are not supported in provided k8s")

Check warning on line 34 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 34

if num_nodes != 1:
logging.error("num_nodes is not supported in provided k8s")

Check warning on line 37 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 37

if version:
logging.error("version is not supported in provided k8s")

Check warning on line 40 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 40

if provided is None:
raise ValueError("Missing configuration for provided k8s")

Check warning on line 43 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 43
self._kube_config = provided.kube_config
self._kube_context = provided.kube_context

def get_context_name(self, cluster_name: str) -> str:
"""Returns the kubecontext based on the cluster name
KIND always adds `kind` before the cluster name
"""
return self._kube_context

Check warning on line 51 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 51

def create_cluster(self, name: str, kubeconfig: str):
"""Does nothing as the cluster is already created
Args:
name: name of the cluster
config: path of the config file for cluster
version: k8s version
"""
print_event("Connecting to a user-provided Kubernetes cluster...")

try:
kubernetes.config.load_kube_config(
config_file=self._kube_config, context=self._kube_context
)
apiclient = kubernetes_client(self._kube_config, self._kube_context)
except Exception as e:
logging.debug("Incorrect kube config file:")
with open(self._kube_config, encoding="utf-8") as f:
logging.debug(f.read())
raise e

Check warning on line 71 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 67-71

if self._posthooks:
for posthook in self._posthooks:
posthook(apiclient)

def load_images(self, images_archive_path: str, name: str):
logging.info("Loading preload images")
cmd = ["kind", "load", "image-archive"]
if images_archive_path is None:
logging.warning(
"No image to preload, we at least should have operator image"
)

if name is not None:
cmd.extend(["--name", name])
else:
logging.error("Missing cluster name for kind load")

p = subprocess.run(cmd + [images_archive_path], check=False)
if p.returncode != 0:
logging.error("Failed to preload images archive")

def delete_cluster(self, name: str, kubeconfig: str):
"""Cluster deletion via deleting the acto-namespace
Args:
name: name of the cluster
kubeconfig: path of the config file for cluster
kubecontext: context of the cluster
"""
logging.info("Deleting cluster %s", name)
apiclient = kubernetes_client(self._kube_config, self._kube_context)
core_v1 = kubernetes.client.CoreV1Api(apiclient)
core_v1.delete_namespace(
CONST.ACTO_NAMESPACE, propagation_policy="Foreground"
)

def get_node_list(self, name: str) -> list[str]:
"""We don't have a way to get the node list for a user-provided cluster
Args:
Name of the cluster
"""
return []
39 changes: 38 additions & 1 deletion acto/lib/operator_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Optional

import pydantic
from typing_extensions import Self
Expand Down Expand Up @@ -134,12 +134,49 @@ class AnalysisConfig(pydantic.BaseModel, extra="forbid"):
)


class SelfProvidedKubernetesConfig(pydantic.BaseModel, extra="forbid"):
"""Configuration for self-provided Kubernetes engine"""

kube_config: str = pydantic.Field(
description="Path to the kubeconfig file for the Kubernetes cluster"
)
kube_context: str = pydantic.Field(
description="Context name for the Kubernetes cluster"
)


class KubernetesEngineConfig(pydantic.BaseModel, extra="forbid"):
"""Configuration for Kubernetes"""

num_nodes: int = pydantic.Field(
description="Number of workers in the Kubernetes cluster", default=4
)
kubernetes_version: str = pydantic.Field(
default="v1.28.0", description="Kubernetes version"
)
feature_gates: dict[str, bool] = pydantic.Field(
description="Path to the feature gates file", default=None
)
self_provided: Optional[SelfProvidedKubernetesConfig] = pydantic.Field(
default=None,
description="Configuration for self-provided Kubernetes engine",
)

@pydantic.model_validator(mode="before")
@classmethod
def check_self_provided(cls, data: Any) -> Any:
"""Check if the self-provided Kubernetes engine is valid"""
if isinstance(data, dict) and "self_provided" in data:
if (
"num_nodes" in data
or "kubernetes_version" in data
or "feature_gates" in data
):
raise ValueError(
"num_nodes, kubernetes_version, and feature_gates "
+ "are not supported in self-provided Kubernetes engine"
)
return data


class OperatorConfig(pydantic.BaseModel, extra="forbid"):
Expand Down
Loading
Loading