Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add retry behaviour. #11

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions collect_single/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import os
import sys
from argparse import Namespace, ArgumentParser
from datetime import timedelta
from itertools import takewhile
from pathlib import Path
from typing import List, Tuple, Dict

from fixcloudutils.logging import setup_logger
from fixcloudutils.util import utc
from redis.asyncio import Redis

from collect_single.collect_and_sync import CollectAndSync
from fixcloudutils.logging import setup_logger

log = logging.getLogger("resoto.coordinator")

Expand All @@ -44,17 +46,36 @@ async def startup(
if args.redis_url.startswith("rediss://") and args.ca_cert:
redis_args["ssl_ca_certs"] = args.ca_cert
async with Redis.from_url(args.redis_url, decode_responses=True, **redis_args) as redis:
collect_and_sync = CollectAndSync(
redis=redis,
tenant_id=args.tenant_id,
account_id=args.account_id,
job_id=args.job_id,
core_args=core_args,
worker_args=worker_args,
push_gateway_url=args.push_gateway_url,
logging_context=logging_context,
)
await collect_and_sync.sync()

async def collect_and_sync(send_on_failed: bool) -> bool:
async with CollectAndSync(
redis=redis,
tenant_id=args.tenant_id,
account_id=args.account_id,
job_id=args.job_id,
core_args=core_args,
worker_args=worker_args,
push_gateway_url=args.push_gateway_url,
logging_context=logging_context,
) as cas:
return await cas.sync(send_on_failed)

if retry := args.retry_failed_for:
log.info(f"Collect job with retry enabled for {retry}.")
has_result = False
deadline = utc() + retry
while not has_result and utc() < deadline:
# collect and do not send a message in the failing case
has_result = await collect_and_sync(False)
if not has_result:
log.info("Failed collect with retry enabled. Retrying in 30s.")
await asyncio.sleep(30)
# if we come here without a result, collect and also send a message in the failing case
if not has_result:
log.info("Last attempt to collect with retry enabled.")
await collect_and_sync(True)
else:
await collect_and_sync(True)


def main() -> None:
Expand All @@ -80,6 +101,9 @@ def main() -> None:
parser.add_argument("--redis-password", default=os.environ.get("REDIS_PASSWORD"), help="Redis password")
parser.add_argument("--push-gateway-url", help="Prometheus push gateway url")
parser.add_argument("--ca-cert", help="Path to CA cert file")
parser.add_argument(
"--retry-failed-for", type=lambda x: timedelta(seconds=int(x)), help="Seconds to retry failed jobs."
)
parsed = parser.parse_args(coordinator_args)

# setup logging
Expand Down
9 changes: 7 additions & 2 deletions collect_single/collect_and_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ async def post_process(self) -> Tuple[Json, List[str]]:
for name, query in SNAPSHOT_METRICS.items():
res = await self.core_client.time_series_snapshot(name, query, account_id)
log.info(f"Created timeseries snapshot: {name} created {res} entries")
else:
raise ValueError("No account info found. Give up!")

return account_info, messages

Expand Down Expand Up @@ -159,7 +161,7 @@ async def push_metrics(self) -> None:
)
log.info("Metrics pushed to gateway")

async def sync(self) -> None:
async def sync(self, send_on_failed: bool) -> bool:
result_send = False
try:
async with ProcessWrapper(self.core_args, self.logging_context):
Expand Down Expand Up @@ -191,5 +193,8 @@ async def sync(self) -> None:
await asyncio.wait_for(self.send_result_events(True), 600) # wait up to 10 minutes
result_send = True
except Exception as ex:
if not result_send:
log.info(f"Got Exception during sync: {ex}")
if send_on_failed and not result_send:
await asyncio.wait_for(self.send_result_events(False, [str(ex)]), 600) # wait up to 10 minutes
result_send = True
return result_send