Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Nov 15, 2023
1 parent b2597da commit 073faae
Show file tree
Hide file tree
Showing 43 changed files with 1,519 additions and 438 deletions.
3 changes: 3 additions & 0 deletions acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import json
import logging
import os
import random
import signal
import sys
import threading
import time
from datetime import datetime

random.seed(0)

start_time = time.time()
workdir_path = 'testrun-%s' % datetime.now().strftime('%Y-%m-%d-%H-%M')

Expand Down
289 changes: 80 additions & 209 deletions acto/deploy.py
Original file line number Diff line number Diff line change
@@ -1,237 +1,108 @@
import json
import time

import kubernetes

import acto.exception
from acto.lib.operator_config import DeployMethod
from acto.kubectl_client.kubectl import KubectlClient
from acto.lib.operator_config import DeployConfig
import acto.utils as utils
from acto.common import *
from acto.constant import CONST
from acto.utils import get_thread_logger
from acto.utils.preprocess import add_acto_label

CONST = CONST()


class Deploy:

'''Class for different deploying methods
TODO: @Tyler: This class needs to be refactored.
The deploy is better to be stateful, storing the kubernetes client and kubectl client
Currently the context and kubeconfig are passed to each method.
The `kubectl` function is currently copied to this file temporarily,
the plan is to switch to use the KubectlClient class
'''

def __init__(self, deploy_method: DeployMethod, path: str, init_yaml=None):
self.path = path
self.init_yaml = init_yaml
self.deploy_method = deploy_method
self.wait = 20 # sec

def undeploy(self, context: dict, apiclient):
logger = get_thread_logger(with_prefix=False)
corev1Api = kubernetes.client.CoreV1Api(apiclient)
try:
corev1Api.delete_namespace(name=context['namespace'])
except Exception as e:
logger.error(e)

while True:
nss = corev1Api.list_namespace().items
for ns in nss:
if ns.metadata.name == context['namespace']:
logger.info('Namespace %s still exists' %
context['namespace'])
time.sleep(5)
continue

def wait_for_pod_ready(apiclient: kubernetes.client.ApiClient):
logger = get_thread_logger(with_prefix=True)
logger.debug('Waiting for all pods to be ready')
time.sleep(5)
pod_ready = False
for tick in range(600):
# check if all pods are ready
pods = kubernetes.client.CoreV1Api(
apiclient).list_pod_for_all_namespaces().items

all_pods_ready = True
for pod in pods:
if pod.status.phase == 'Succeeded':
continue
if not utils.is_pod_ready(pod):
all_pods_ready = False

if all_pods_ready:
logger.info('Operator ready')
pod_ready = True
break
time.sleep(5)
logger.info('Namespace %s deleted' % context['namespace'])

def deploy(self, kubeconfig: str, context_name: str, namespace: str):
# XXX: context param is temporary, need to figure out why rabbitmq complains about namespace
pass

def deploy_with_retry(self, kubeconfig: str, context_name: str, namespace: str, retry_count=3):
logger = get_thread_logger(with_prefix=False)
while retry_count > 0:
try:
return self.deploy(kubeconfig, context_name, namespace)
except Exception as e:
logger.warn(e)
logger.info(
"deploy() failed. Double wait = " + str(self.wait))
self.wait = self.wait * 2
retry_count -= 1
logger.info('All pods took %d seconds to get ready' % (tick * 5))
if not pod_ready:
logger.error("Some pods failed to be ready within timeout")
return False

def check_status(self, kubeconfig: str, context_name: str):
'''
We need to make sure operator to be ready before applying test cases, because Acto would
crash later when running oracle if operator hasn't been ready
'''
logger = get_thread_logger(with_prefix=False)

apiclient = kubernetes_client(kubeconfig, context_name)

logger.debug('Waiting for all pods to be ready')
time.sleep(10)
pod_ready = False
for tick in range(600):
# check if all pods are ready
pods = kubernetes.client.CoreV1Api(
apiclient).list_pod_for_all_namespaces().items

all_pods_ready = True
for pod in pods:
if pod.status.phase == 'Succeeded':
continue
if not utils.is_pod_ready(pod):
all_pods_ready = False

if all_pods_ready:
logger.info('Operator ready')
pod_ready = True
break

time.sleep(5)
logger.info('All pods took %d seconds to get ready' % (tick * 5))
if not pod_ready:
logger.error("Some pods failed to be ready within timeout")
return False
else:
return True

def new(self):
if self.deploy_method is DeployMethod.HELM:
return Helm(self.deploy_method, self.path, self.init_yaml)
elif self.deploy_method is DeployMethod.YAML:
return Yaml(self.deploy_method, self.path, self.init_yaml)
elif self.deploy_method is DeployMethod.KUSTOMIZE:
return Kustomize(self.deploy_method, self.path, self.init_yaml)
else:
raise acto.exception.UnknownDeployMethodError


class Helm(Deploy):

def deploy(self, context: dict, context_name: str) -> bool:
logger = get_thread_logger(with_prefix=False)

context['namespace'] = CONST.ACTO_NAMESPACE
if self.init_yaml:
kubectl(['apply', '--server-side', '-f',
self.init_yaml], context_name)
helm(['dependency', 'build', self.path], context_name)
helm([
'install', 'acto-test-operator', '--create-namespace', self.path, '--wait', '--timeout',
'3m', '-n', context['namespace']
], context_name)

# use a counter to wait for 2 min (thus 24 below, since each wait is 5s)
counter = 0
while not self.check_status(context, context_name):
if counter > 24:
logger.fatal(
'Helm chart deployment failed to be ready within timeout')
return False
time.sleep(5)

# TODO: Return True if deploy successfully
else:
return True

def check_status(self, context, context_name: str) -> bool:
logger = get_thread_logger(with_prefix=False)

helm_ls_result = helm(
['list', '-o', 'json', '--all-namespaces', '--all'], context_name)
try:
helm_release = json.loads(helm_ls_result.stdout)[0]
except Exception as e:
logger.error('Failed to get helm chart\'s status: %s' % e)
quit()
class Deploy():

if helm_release["status"] != "deployed":
return False
else:
return True
def __init__(self, deploy_config: DeployConfig) -> None:
self._deploy_config = deploy_config

self._operator_yaml: str = None
for step in self._deploy_config.steps:
if step.apply and step.apply.operator:
self._operator_yaml = step.apply.file
break
else:
raise Exception('No operator yaml found in deploy config')

class Yaml(Deploy):
@property
def operator_yaml(self) -> str:
return self._operator_yaml

def deploy(self, kubeconfig: str, context_name: str, namespace: str):
# TODO: We cannot specify namespace ACTO_NAMESPACE here.
# rabbitMQ operator will report the error message
'''
the namespace from the provided object "rabbitmq-system" does not
match the namespace "acto-namespace". You must pass '--namespace=rabbitmq-system' to perform this operation.
'''
def deploy(self,
kubeconfig: str,
context_name: str,
kubectl_client: KubectlClient,
namespace: str):
logger = get_thread_logger(with_prefix=True)
print_event('Deploying operator...')
api_client = kubernetes_client(kubeconfig, context_name)

ret = utils.create_namespace(
kubernetes_client(kubeconfig, context_name), namespace)
ret = utils.create_namespace(api_client, namespace)
if ret == None:
logger.error('Failed to create namespace')
if self.init_yaml:
kubectl(['apply', '--server-side', '-f', self.init_yaml], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
kubectl(['apply', '--server-side', '-f', self.path, '-n', namespace], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
add_acto_label(kubernetes_client(kubeconfig, context_name), namespace)
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
time.sleep(20)

# TODO: Return True if deploy successfully
print_event('Operator deployed')
return True


class Kustomize(Deploy):
# Run the steps in the deploy config one by one
for step in self._deploy_config.steps:
if step.apply:
# Apply the yaml file and then wait for the pod to be ready
kubectl_client.kubectl(
['apply', '--server-side', '-f', step.apply.file, '-n', namespace,
'--context', context_name])
if not wait_for_pod_ready(api_client):
logger.error('Failed to deploy operator')
return False
elif step.wait:
# Simply wait for the specified duration
time.sleep(step.wait.duration)

# Add acto label to the operator pod
add_acto_label(api_client, namespace)
if not wait_for_pod_ready(api_client):
logger.error('Failed to deploy operator')
return False

def deploy(self, kubeconfig, context_name, namespace):
if self.init_yaml:
kubectl(['apply', '--server-side', '-f', self.init_yaml], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
kubectl(['apply', '--server-side', '-k', self.path, '-n', namespace], kubeconfig=kubeconfig,
context_name=context_name)
self.check_status(kubeconfig=kubeconfig,
context_name=context_name)
print_event('Operator deployed')
return True


def kubectl(args: list,
kubeconfig: str,
context_name: str,
capture_output=False,
text=False) -> subprocess.CompletedProcess:
logger = get_thread_logger(with_prefix=True)

cmd = ['kubectl']
cmd.extend(args)

if kubeconfig:
cmd.extend(['--kubeconfig', kubeconfig])
else:
raise Exception('Kubeconfig is not set')

if context_name == None:
logger.error('Missing context name for kubectl')
cmd.extend(['--context', context_name])

if capture_output:
p = subprocess.run(cmd, capture_output=capture_output, text=text)
else:
p = subprocess.run(cmd, stdout=subprocess.DEVNULL)
return p
def deploy_with_retry(self,
kubeconfig: str,
context_name: str,
kubectl_client: KubectlClient,
namespace: str,
retry_count: int = 3):
logger = get_thread_logger(with_prefix=True)
for i in range(retry_count):
if self.deploy(kubeconfig, context_name, kubectl_client, namespace):
return True
else:
logger.error('Failed to deploy operator, retrying...')
return False
14 changes: 9 additions & 5 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ def run(self, errors: List[RunResult], mode: str = InputModel.NORMAL):
apiclient = kubernetes_client(self.kubeconfig, self.context_name)
self.cluster.load_images(self.images_archive, self.cluster_name)
trial_k8s_bootstrap_time = time.time()
kubectl_client = KubectlClient(self.kubeconfig, self.context_name)
deployed = self.deploy.deploy_with_retry(self.kubeconfig,
self.context_name,
self.context['namespace'])
kubectl_client=kubectl_client,
namespace=self.context['namespace'])
if not deployed:
logger.info('Not deployed. Try again!')
continue
Expand Down Expand Up @@ -624,8 +626,7 @@ def __init__(self,
logger.error('Failed to read seed yaml, aborting')
quit()

deploy = Deploy(operator_config.deploy.method, operator_config.deploy.file,
operator_config.deploy.init).new()
deploy = Deploy(operator_config.deploy)

if cluster_runtime == "KIND":
cluster = kind.Kind(acto_namespace=acto_namespace,
Expand Down Expand Up @@ -783,11 +784,14 @@ def __learn(self, context_file, helper_crd, analysis_only=False):

while True:
self.cluster.restart_cluster('learn', learn_kubeconfig)
namespace = get_yaml_existing_namespace(self.deploy.path) or CONST.ACTO_NAMESPACE
namespace = get_yaml_existing_namespace(
self.deploy.operator_yaml) or CONST.ACTO_NAMESPACE
self.context['namespace'] = namespace
kubectl_client = KubectlClient(learn_kubeconfig, learn_context_name)
deployed = self.deploy.deploy_with_retry(learn_kubeconfig,
learn_context_name,
namespace)
kubectl_client=kubectl_client,
namespace=namespace)
if deployed:
break
apiclient = kubernetes_client(learn_kubeconfig, learn_context_name)
Expand Down
Loading

0 comments on commit 073faae

Please sign in to comment.