Skip to content

Commit

Permalink
WIP: prometheus_exporter iteration 8
Browse files Browse the repository at this point in the history
- Substantial cleanup and cut on code duplication
- Fix linting and style
- Metric labels moved to default constants, leave the door opened for
  the possibility of customizing them
- Retrofit what we learned back to the playbook metrics
- Re-enable playbook metrics
  • Loading branch information
dmsimard committed Jun 20, 2023
1 parent 2d7cd30 commit 7558a6f
Showing 1 changed file with 114 additions and 150 deletions.
264 changes: 114 additions & 150 deletions ara/cli/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

import logging
import os
import sys
import time
from collections import defaultdict
Expand All @@ -15,226 +14,193 @@
from ara.clients.utils import get_client

try:
from prometheus_client import Gauge, Histogram, Summary, start_http_server
from prometheus_client import Gauge, Summary, start_http_server

HAS_PROMETHEUS_CLIENT = True
except ImportError:
HAS_PROMETHEUS_CLIENT = False

# Where possible and relevant, apply these labels to the metrics so we can write prometheus
# queries to filter and aggregate by these properties
# TODO: make configurable
DEFAULT_PLAYBOOK_LABELS = [
"ansible_version",
"client_version",
"controller",
"name",
"path",
"python_version",
"server_version",
"status",
"updated",
"user",
]
DEFAULT_TASK_LABELS = ["action", "name", "path", "playbook", "status", "updated"]
DEFAULT_HOST_LABELS = ["name", "playbook", "updated"]


# TODO: This method should be more flexible and live in a library
def get_search_results(client, kind, limit, created_after):
"""
kind: string, one of ["playbooks", "hosts", "tasks"]
limit: int, the number of items to return per page
created_after: string, a date formatted as such: 2020-01-31T15:45:36.737000Z
"""
query = f"/api/v1/{kind}?order=-id&limit={limit}"
if created_after is not None:
query += f"&created_after={created_after}"

response = client.get(query)
items = response["results"]

# Iterate through multiple pages of results if necessary
while response["next"]:
# For example:
# "next": "https://demo.recordsansible.org/api/v1/playbooks?limit=1000&offset=2000",
uri = response["next"].replace(client.endpoint, "")
response = client.get(uri)
items.extend(response["results"])

return items


class AraPlaybookCollector(object):
def __init__(self, client, log):
def __init__(self, client, log, limit, labels=DEFAULT_PLAYBOOK_LABELS):
self.client = client
self.log = log
self.limit = limit
self.labels = labels

self.metrics = {
"total": Gauge("ara_playbooks_total", "Total number of playbooks recorded by ara"),
"completed": Gauge("ara_playbooks_completed", "Completed Ansible playbooks", labels),
"expired": Gauge("ara_playbooks_expired", "Expired Ansible playbooks", labels),
"failed": Gauge("ara_playbooks_failed", "Failed Ansible playbooks", labels),
"range": Gauge("ara_playbooks_range", "Limit metric collection to the N most recent playbooks"),

"playbooks": Histogram(
"ara_playbooks",
"Ansible playbooks recorded by ara",
[
"timestamp",
"path",
"status",
"plays",
"tasks",
"results",
"hosts",
"files",
"records",
"ansible_version",
"python_version",
"client_version",
"server_version",
],
buckets=(0.1, 1.0, 5.0, 10.0, 30.0, 60.0, 90.0, 120.0, 300.0, 600.0, 1200.0, 1800.0, 3600.0, float("inf"))
),
"running": Gauge("ara_playbooks_running", "Running Ansible playbooks", labels),
"total": Gauge("ara_playbooks_total", "Total number of playbooks recorded by ara"),
"duration": Summary("ara_playbooks_duration", "Duration (in seconds) of playbooks recorded by ara", labels),
}

def collect_metrics(self, created_after=None, limit=1000):
self.log.info("collecting playbook metrics...")
self.metrics["range"].set(limit)

if created_after is None:
query = self.client.get(f"/api/v1/playbooks?order=-id&limit={limit}")
else:
query = self.client.get(f"/api/v1/playbooks?order=-id&limit={limit}&created_after={created_after}")
playbooks = query["results"]

# Iterate through multiple pages of results if necessary
while query["next"]:
# For example:
# "next": "https://demo.recordsansible.org/api/v1/playbooks?limit=1000&offset=2000",
uri = query["next"].replace(self.client.endpoint, "")
query = self.client.get(uri)
playbooks.extend(query["results"])

playbooks = get_search_results(self.client, "playbooks", self.limit, created_after)
# Save the most recent timestamp so we only scrape beyond it next time
if playbooks:
created_after = cli_utils.increment_timestamp(playbooks[0]["created"])
self.log.info(f"parsing metrics for {len(playbooks)} playbooks...")
self.log.info(f"updating metrics for {len(playbooks)} playbooks...")

for playbook in playbooks:
self.metrics["total"].inc()

# Gather the values of each label so we can attach them to our metrics
labels = {label: playbook[label] for label in self.labels}
self.metrics[playbook["status"]].labels(**labels).inc()

# The API returns a duration in string format, convert it back to seconds
# so we can use it as a value for the metric.
if playbook["duration"] is not None:
duration = datetime.strptime(playbook["duration"], "%H:%M:%S.%f").time()
seconds = duration.hour * 3600 + duration.minute * 60 + duration.second + duration.microsecond / 1000000
# TODO: parse_timedelta throws an exception for playbooks that last longer than a day
# That was meant to be fixed in https://github.com/ansible-community/ara/commit/db8243c3af938ece12c9cd59dd7fe4d9a711b76d
try:
seconds = cli_utils.parse_timedelta(playbook["duration"])
except ValueError:
seconds = 0
else:
seconds = 0
self.metrics["duration"].labels(**labels).observe(seconds)

self.metrics["playbooks"].labels(
timestamp=playbook["created"],
path=playbook["path"],
status=playbook["status"],
ansible_version=playbook["ansible_version"],
python_version=playbook["python_version"],
client_version=playbook["client_version"],
server_version=playbook["server_version"],
plays=playbook["items"]["plays"],
tasks=playbook["items"]["tasks"],
results=playbook["items"]["results"],
hosts=playbook["items"]["hosts"],
files=playbook["items"]["files"],
records=playbook["items"]["records"]
).observe(seconds)

self.log.info("finished updating playbook metrics.")
return (self.metrics, created_after)
return created_after


class AraTaskCollector(object):
def __init__(self, client, log, limit):
def __init__(self, client, log, limit, labels=DEFAULT_TASK_LABELS):
self.client = client
self.log = log
self.limit = limit
self.labels = labels

labels = ["name", "playbook", "status", "path", "action", "duration", "updated", "results"]
self.metrics = {
"total": Gauge(f"ara_tasks_total", f"Number of tasks recorded by ara in prometheus"),
"range": Gauge(f"ara_tasks_range", f"Limit metric collection to the N most recent tasks"),
"completed": Gauge("ara_tasks_completed", "Completed Ansible tasks", labels),
"expired": Gauge("ara_tasks_expired", "Expired Ansible tasks", labels),
"failed": Gauge("ara_tasks_failed", "Failed Ansible tasks", labels),
"range": Gauge("ara_tasks_range", "Limit metric collection to the N most recent tasks"),
"running": Gauge("ara_tasks_running", "Running Ansible tasks", labels),
"expired": Gauge("ara_tasks_expired", "Expired Ansible tasks", labels),
"total": Gauge("ara_tasks_total", "Number of tasks recorded by ara in prometheus"),
"duration": Summary(
"ara_tasks_duration",
"Duration, in seconds, of playbook tasks recorded by ara",
labels
)
"ara_tasks_duration", "Duration, in seconds, of playbook tasks recorded by ara", labels
),
}

def collect_metrics(self, created_after=None):
self.metrics["range"].set(self.limit)

query = f"/api/v1/tasks?order=-id&limit={self.limit}"
if created_after is not None:
query += f"&created_after={created_after}"

response = self.client.get(query)
tasks = response["results"]

# Iterate through multiple pages of results if necessary
while response["next"]:
# For example:
# "next": "https://demo.recordsansible.org/api/v1/tasks?limit=1000&offset=2000",
uri = response["next"].replace(self.client.endpoint, "")
response = self.client.get(uri)
tasks.extend(response["results"])

tasks = get_search_results(self.client, "tasks", self.limit, created_after)
# Save the most recent timestamp so we only scrape beyond it next time
if tasks:
# Save the most recent timestamp so we only scrape beyond it next time
created_after = cli_utils.increment_timestamp(tasks[0]["created"])
self.log.info(f"updating metrics for {len(tasks)} tasks...")

for task in tasks:
self.metrics["total"].inc()

self.metrics[task["status"]].labels(
name=task["name"],
playbook=task["playbook"],
status=task["status"],
path=task["path"],
action=task["action"],
duration=task["duration"],
updated=task["updated"],
results=task["items"]["results"],
).inc()
# Gather the values of each label so we can attach them to our metrics
labels = {label: task[label] for label in self.labels}
self.metrics[task["status"]].labels(**labels).inc()

# The API returns a duration in string format, convert it back to seconds
# so we can use it as a value for the metric.
if task["duration"] is not None:
duration = datetime.strptime(task["duration"], "%H:%M:%S.%f")
delta = timedelta(
hours=duration.hour,
minutes=duration.minute,
seconds=duration.second,
microseconds=duration.microsecond
)
seconds = delta.total_seconds()
# TODO: parse_timedelta throws an exception for tasks that last longer than a day
# That was meant to be fixed in https://github.com/ansible-community/ara/commit/db8243c3af938ece12c9cd59dd7fe4d9a711b76d
try:
seconds = cli_utils.parse_timedelta(task["duration"])
except ValueError:
seconds = 0
else:
seconds = 0

self.metrics["duration"].labels(
name=task["name"],
playbook=task["playbook"],
status=task["status"],
path=task["path"],
action=task["action"],
duration=task["duration"],
updated=task["updated"],
results=task["items"]["results"],
).observe(seconds)
self.metrics["duration"].labels(**labels).observe(seconds)

return created_after


class AraHostCollector(object):
def __init__(self, client, log, limit):
def __init__(self, client, log, limit, labels=DEFAULT_HOST_LABELS):
self.client = client
self.log = log
self.limit = limit
self.labels = labels

labels = ["name", "playbook", "updated"]
self.metrics = {
"total": Gauge(f"ara_hosts_total", f"Hosts recorded by ara"),
"range": Gauge(f"ara_hosts_range", f"Limit metric collection to the N most recent hosts"),
"changed": Gauge("ara_hosts_changed", "Number of changes on a host", labels),
"failed": Gauge("ara_hosts_failed", "Number of failures on a host", labels),
"ok": Gauge("ara_hosts_ok", "Number of successful tasks without changes on a host", labels),
"range": Gauge("ara_hosts_range", "Limit metric collection to the N most recent hosts"),
"skipped": Gauge("ara_hosts_skipped", "Number of skipped tasks on a host", labels),
"unreachable": Gauge("ara_hosts_unreachable", "Number of unreachable errors on a host", labels)
"total": Gauge("ara_hosts_total", "Hosts recorded by ara"),
"unreachable": Gauge("ara_hosts_unreachable", "Number of unreachable errors on a host", labels),
}

def collect_metrics(self, created_after=None):
self.metrics["range"].set(self.limit)

query = f"/api/v1/hosts?order=-id&limit={self.limit}"
if created_after is not None:
query += f"&created_after={created_after}"

response = self.client.get(query)
hosts = response["results"]

# Iterate through multiple pages of results if necessary
while response["next"]:
# For example:
# "next": "https://demo.recordsansible.org/api/v1/hosts?limit=1000&offset=2000",
uri = response["next"].replace(self.client.endpoint, "")
response = self.client.get(uri)
hosts.extend(response["results"])

hosts = get_search_results(self.client, "hosts", self.limit, created_after)
# Save the most recent timestamp so we only scrape beyond it next time
if hosts:
# Save the most recent timestamp so we only scrape beyond it next time
created_after = cli_utils.increment_timestamp(hosts[0]["created"])
self.log.info(f"updating metrics for {len(hosts)} hosts...")

for host in hosts:
self.metrics["total"].inc()

# Gather the values of each label so we can attach them to our metrics
labels = {label: host[label] for label in self.labels}

# The values of "changed", "failed" and so on are integers so we can
# use them as values for our metric
for status in ["changed", "failed", "ok", "skipped", "unreachable"]:
if host[status]:
self.metrics[status].labels(
name=host["name"],
playbook=host["playbook"],
updated=host["updated"]
).set(host[status])
self.metrics[status].labels(**labels).set(host[status])

return created_after

Expand Down Expand Up @@ -306,24 +272,22 @@ def take_action(self, args):
run_sql_migrations=False,
)

#playbooks = AraPlaybookCollector(client=client, log=self.log)

# Host metrics
# Prepare collectors so we can gather various metrics
playbooks = AraPlaybookCollector(client=client, log=self.log, limit=args.playbook_limit)
hosts = AraHostCollector(client=client, log=self.log, limit=args.host_limit)

# Task metrics
tasks = AraTaskCollector(client=client, log=self.log, limit=args.task_limit)

start_http_server(args.prometheus_port)
self.log.info(f"ara prometheus exporter listening on http://0.0.0.0:{args.prometheus_port}/metrics")

created_after = (datetime.now() - timedelta(days=args.max_days)).isoformat()
self.log.info(f"Backfilling metrics for the last {args.max_days} days since {created_after}... This can take a while based on volume.")
latest = defaultdict(lambda: created_after)
self.log.info(
f"Backfilling metrics for the last {args.max_days} days since {created_after}... This can take a while."
)

latest = defaultdict(lambda: created_after)
while True:
#playbook_metrics, latest["playbook"] = playbooks.collect_metrics(latest["playbook"], limit=args.playbook_limit)

latest["playbooks"] = playbooks.collect_metrics(latest["playbooks"])
latest["hosts"] = hosts.collect_metrics(latest["hosts"])
latest["tasks"] = tasks.collect_metrics(latest["tasks"])

Expand Down

0 comments on commit 7558a6f

Please sign in to comment.