Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nonblocking callback #459

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 174 additions & 66 deletions ara/plugins/callback/ara_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import os
import socket
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

from ansible import __version__ as ANSIBLE_VERSION, constants as C
from ansible.parsing.ajson import AnsibleJSONEncoder
Expand Down Expand Up @@ -253,18 +255,27 @@
)


class CallbackModule(CallbackBase):
class CallbackSend:
"""
Saves data from an Ansible run into a database
Same as the one defined in ansible.executor.task_queue_manager;
should we just import it?
"""

CALLBACK_VERSION = 2.0
CALLBACK_TYPE = "awesome"
CALLBACK_NAME = "ara_default"
def __init__(self, method_name, *args, **kwargs):
self.method_name = method_name
self.args = args
self.kwargs = kwargs

def __init__(self):
super().__init__()
self.log = logging.getLogger("ara.plugins.callback.default")

class AraQueueDone:
pass


class AraWorker:
def __init__(self, queue: Queue):
self.log = logging.getLogger("ara.plugins.callback.worker")
self.log.info("NEW ARA")
self.queue = queue
self.localhost_hostname = None
# These are configured in self.set_options
self.client = None
Expand Down Expand Up @@ -296,60 +307,14 @@ def __init__(self):
self.delegation_cache = {}
self.warned_about_host_length = []

def set_options(self, task_keys=None, var_options=None, direct=None):
super().set_options(task_keys=task_keys, var_options=var_options, direct=direct)

self.argument_labels = self.get_option("argument_labels")
self.default_labels = self.get_option("default_labels")
self.ignored_facts = self.get_option("ignored_facts")
self.ignored_arguments = self.get_option("ignored_arguments")
self.ignored_files = self.get_option("ignored_files")
self.localhost_as_hostname = self.get_option("localhost_as_hostname")
self.localhost_as_hostname_format = self.get_option("localhost_as_hostname_format")
self.record_controller = self.get_option("record_controller")
self.record_user = self.get_option("record_user")

# The intent for the ignored_files default value is to ignore the ansible local tmpdir but the path
# can be changed by the user's configuration so retrieve that and use it instead.
# https://github.com/ansible-community/ara/issues/385
for pattern in self.ignored_files:
if pattern == ".ansible/tmp":
tmpdir_config = os.path.dirname(C.config.get_config_value("DEFAULT_LOCAL_TMP"))
index = self.ignored_files.index(pattern)
self.ignored_files[index] = tmpdir_config
break

client = self.get_option("api_client")
endpoint = self.get_option("api_server")
timeout = self.get_option("api_timeout")
username = self.get_option("api_username")
password = self.get_option("api_password")
cert = self.get_option("api_cert")
key = self.get_option("api_key")
ca = self.get_option("api_ca")
insecure = self.get_option("api_insecure")

verify = False if insecure else True
if ca:
verify = ca

self.client = client_utils.get_client(
client=client,
endpoint=endpoint,
timeout=timeout,
username=username,
password=password,
cert=cert,
key=key,
verify=verify,
)

# TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter.
# In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE.
# Otherwise we can hit "urllib3.connectionpool: Connection pool is full"
self.callback_threads = self.get_option("callback_threads")
if self.callback_threads > 4:
self.callback_threads = 4
def run(self):
while True:
message = self.queue.get()
if isinstance(message, AraQueueDone):
self.queue.task_done()
return
method = getattr(self, message.method_name)
method(*message.args, **message.kwargs)

def _submit_thread(self, threadpool, func, *args, **kwargs):
# Manages whether or not the function should be threaded to keep things DRY
Expand Down Expand Up @@ -425,7 +390,8 @@ def v2_playbook_on_start(self, playbook):
# Record the playbook file
self._submit_thread("global", self._get_or_create_file, path, content)

return self.playbook
# return self.playbook
self.queue.task_done()

def v2_playbook_on_play_start(self, play):
self.log.debug("v2_playbook_on_play_start")
Expand Down Expand Up @@ -466,7 +432,10 @@ def v2_playbook_on_play_start(self, play):
self._submit_thread("global", self._set_playbook_labels, labels)

# Record all the files involved in the play
for path in play._loader._FILE_CACHE.keys():
# make a list of the keys and iterate that for thread safety
# avoiding `RuntimeError: dictionary changed size during iteration`
play_files = list(play._loader._FILE_CACHE.keys())
for path in play_files:
# The cache can be pre-populated with files that aren't relevant to the playbook report
# If there are matches that should be ignored here, don't record them at all
ignored = False
Expand All @@ -492,7 +461,8 @@ def v2_playbook_on_play_start(self, play):
started=datetime.datetime.now(datetime.timezone.utc).isoformat(),
)

return self.play
# return self.play
self.queue.task_done()

def v2_playbook_on_handler_task_start(self, task):
self.log.debug("v2_playbook_on_handler_task_start")
Expand Down Expand Up @@ -522,16 +492,19 @@ def v2_playbook_on_task_start(self, task, is_conditional, handler=False):
# Get task
self.task = self._get_or_create_task(task, task_file["id"], lineno, handler)

return self.task
# return self.task
self.queue.task_done()

def v2_runner_on_start(self, host, task):
self.log.debug("v2_runner_on_start")
# v2_runner_on_start was added in 2.8 so this doesn't get run for Ansible 2.7 and below.
self.result_started[host.get_name()] = datetime.datetime.now(datetime.timezone.utc).isoformat()
self.queue.task_done()

def v2_runner_on_ok(self, result, **kwargs):
self.log.debug("v2_runner_on_ok")
self._submit_thread("task", self._load_result, result, "ok", **kwargs)
self.queue.task_done()

def v2_runner_on_unreachable(self, result, **kwargs):
self.log.debug("v2_runner_on_unreachable")
Expand All @@ -544,6 +517,7 @@ def v2_runner_on_unreachable(self, result, **kwargs):
)
self.task = self.task_cache[task_uuid]
self._submit_thread("task", self._load_result, result, "unreachable", **kwargs)
self.queue.task_done()

def v2_runner_on_failed(self, result, **kwargs):
self.log.debug("v2_runner_on_failed")
Expand All @@ -556,21 +530,26 @@ def v2_runner_on_failed(self, result, **kwargs):
)
self.task = self.task_cache[task_uuid]
self._submit_thread("task", self._load_result, result, "failed", **kwargs)
self.queue.task_done()

def v2_runner_on_skipped(self, result, **kwargs):
self.log.debug("v2_runner_on_skipped")
self._submit_thread("task", self._load_result, result, "skipped", **kwargs)
self.queue.task_done()

def v2_runner_item_on_ok(self, result):
self.log.debug("v2_runner_item_on_ok")
self._update_delegation_cache(result)
self.queue.task_done()

def v2_runner_item_on_failed(self, result):
self.log.debug("v2_runner_item_on_failed")
self._update_delegation_cache(result)
self.queue.task_done()

def v2_runner_item_on_skipped(self, result):
self.log.debug("v2_runner_item_on_skipped")
self.queue.task_done()
pass
# result._task.delegate_to can end up being a variable from this hook, don't save it.
# https://github.com/ansible/ansible/issues/75339
Expand All @@ -579,6 +558,7 @@ def v2_runner_item_on_skipped(self, result):
def v2_playbook_on_include(self, included_file):
self.log.debug("v2_playbook_on_include")
# ara has not used this hook before, maybe we can do something with it in the future.
self.queue.task_done()
pass

def v2_playbook_on_stats(self, stats):
Expand All @@ -587,6 +567,7 @@ def v2_playbook_on_stats(self, stats):
self._end_play()
self._load_stats(stats)
self._end_playbook(stats)
self.queue.task_done()

def _end_task(self):
if self.callback_threads:
Expand Down Expand Up @@ -855,3 +836,130 @@ def _get_user(self):
pass

return user


class CallbackModule(CallbackBase):
"""
Saves data from an Ansible run into a database
"""

CALLBACK_VERSION = 2.0
CALLBACK_TYPE = "awesome"
CALLBACK_NAME = "ara_default"

def __init__(self):
super().__init__()
self.log = logging.getLogger("ara.plugins.callback.default")
self.queue = Queue()
# started at the end of set_options()
self.worker = AraWorker(queue=self.queue)
self.worker_thread = threading.Thread(target=self.worker.run, daemon=True)

def __del__(self):
outstanding = self.queue.qsize()
if outstanding > 0:
self._display.display("ARA: Waiting for all requests to finish (about %d)..." % outstanding)
self.queue.join()
if self.worker_thread.is_alive():
if self.worker.global_threads:
self.worker.global_threads.shutdown()
if self.worker.task_threads:
self.worker.task_threads.shutdown()
self._display.display("ARA: Done!")

def set_options(self, task_keys=None, var_options=None, direct=None):
super().set_options(task_keys=task_keys, var_options=var_options, direct=direct)

self.worker.argument_labels = self.get_option("argument_labels")
self.worker.default_labels = self.get_option("default_labels")
self.worker.ignored_facts = self.get_option("ignored_facts")
self.worker.ignored_arguments = self.get_option("ignored_arguments")
self.worker.ignored_files = self.get_option("ignored_files")
self.worker.localhost_as_hostname = self.get_option("localhost_as_hostname")
self.worker.localhost_as_hostname_format = self.get_option("localhost_as_hostname_format")
self.worker.record_controller = self.get_option("record_controller")
self.worker.record_user = self.get_option("record_user")

# The intent for the ignored_files default value is to ignore the ansible local tmpdir but the path
# can be changed by the user's configuration so retrieve that and use it instead.
# https://github.com/ansible-community/ara/issues/385
for pattern in self.worker.ignored_files:
if pattern == ".ansible/tmp":
tmpdir_config = os.path.dirname(C.config.get_config_value("DEFAULT_LOCAL_TMP"))
index = self.worker.ignored_files.index(pattern)
self.worker.ignored_files[index] = tmpdir_config
break

client = self.get_option("api_client")
endpoint = self.get_option("api_server")
timeout = self.get_option("api_timeout")
username = self.get_option("api_username")
password = self.get_option("api_password")
cert = self.get_option("api_cert")
key = self.get_option("api_key")
ca = self.get_option("api_ca")
insecure = self.get_option("api_insecure")

verify = False if insecure else True
if ca:
verify = ca

self.worker.client = client_utils.get_client(
client=client,
endpoint=endpoint,
timeout=timeout,
username=username,
password=password,
cert=cert,
key=key,
verify=verify,
)

# TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter.
# In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE.
# Otherwise we can hit "urllib3.connectionpool: Connection pool is full"
self.worker.callback_threads = min(4, self.get_option("callback_threads"))
self.worker_thread.start()

def v2_playbook_on_start(self, playbook):
self.queue.put(CallbackSend("v2_playbook_on_start", playbook))

def v2_playbook_on_play_start(self, play):
self.queue.put(CallbackSend("v2_playbook_on_play_start", play))

def v2_playbook_on_handler_task_start(self, task):
self.queue.put(CallbackSend("v2_playbook_on_handler_task_start", task))

def v2_playbook_on_task_start(self, task, is_conditional, handler=False):
self.queue.put(CallbackSend("v2_playbook_on_task_start", task, is_conditional, handler))

def v2_runner_on_start(self, host, task):
self.queue.put(CallbackSend("v2_runner_on_start", host, task))

def v2_runner_on_ok(self, result, **kwargs):
self.queue.put(CallbackSend("v2_runner_on_ok", result, **kwargs))

def v2_runner_on_unreachable(self, result, **kwargs):
self.queue.put(CallbackSend("v2_runner_on_unreachable", result, **kwargs))

def v2_runner_on_failed(self, result, **kwargs):
self.queue.put(CallbackSend("v2_runner_on_failed", result, **kwargs))

def v2_runner_on_skipped(self, result, **kwargs):
self.queue.put(CallbackSend("v2_runner_on_skipped", result, **kwargs))

def v2_runner_item_on_ok(self, result):
self.queue.put(CallbackSend("v2_runner_item_on_ok", result))

def v2_runner_item_on_failed(self, result):
self.queue.put(CallbackSend("v2_runner_item_on_failed", result))

def v2_runner_item_on_skipped(self, result):
self.queue.put(CallbackSend("v2_runner_item_on_skipped", result))

def v2_playbook_on_include(self, included_file):
self.queue.put(CallbackSend("v2_playbook_on_include", included_file))

def v2_playbook_on_stats(self, stats):
self.queue.put(CallbackSend("v2_playbook_on_stats", stats))
self.queue.put(AraQueueDone())