Skip to content

Commit

Permalink
Use Task
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Jan 27, 2025
1 parent 055aeac commit 01aac46
Showing 1 changed file with 117 additions and 88 deletions.
205 changes: 117 additions & 88 deletions github_app_geo_project/scripts/process_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def _process_dashboard_issue(


# Where 2147483647 is the PostgreSQL max int, see: https://www.postgresql.org/docs/current/datatype-numeric.html
async def _process_one_job(
async def _get_process_one_job(
config: dict[str, Any],
Session: sqlalchemy.orm.sessionmaker[ # pylint: disable=invalid-name,unsubscriptable-object
sqlalchemy.orm.Session
Expand Down Expand Up @@ -664,94 +664,108 @@ async def _process_one_job(
_LOGGER.debug("Process one job (max priority: %i): Steal long pending job", max_priority)
return True

sentry_sdk.set_context("job", {"id": job.id, "event": job.event_name, "module": job.module or "-"})
await asyncio.create_task(
_process_one_job(job, session, config, make_pending, max_priority),
name=f"Process Job {job.id} - {job.event_name} - {job.module or '-'}",
)

# Capture_logs
root_logger = logging.getLogger()
handler = _Handler(job.id)
handler.setFormatter(_Formatter("%(levelname)-5.5s %(pathname)s:%(lineno)d %(funcName)s()"))
return False

module_data_formatted = utils.format_json(job.module_data)
event_data_formatted = utils.format_json(job.event_data)
message = module_utils.HtmlMessage(
f"<p>module data:</p>{module_data_formatted}<p>event data:</p>{event_data_formatted}"
)
message.title = f"Start process job '{job.event_name}' id: {job.id}, on {job.owner}/{job.repository} on module: {job.module}, on application {job.application}"
root_logger.addHandler(handler)
_LOGGER.info(message)
_RUNNING_JOBS[job.id] = _JobInfo(
job.module or "-", job.event_name, job.repository, job.priority, max_priority
)
root_logger.removeHandler(handler)

if make_pending:
_LOGGER.info("Make job ID %s pending", job.id)
job.status = models.JobStatus.PENDING
job.started_at = datetime.datetime.now(tz=datetime.UTC)
session.commit()
_LOGGER.debug("Process one job (max priority: %i): Make pending", max_priority)
return False
async def _process_one_job(
job: models.Queue,
session: sqlalchemy.orm.Session,
config: dict[str, Any],
make_pending: bool,
max_priority: int,
) -> None:
sentry_sdk.set_context("job", {"id": job.id, "event": job.event_name, "module": job.module or "-"})

try:
job.status = models.JobStatus.PENDING
job.started_at = datetime.datetime.now(tz=datetime.UTC)
session.commit()
_NB_JOBS.labels(models.JobStatus.PENDING.name).set(
session.query(models.Queue).filter(models.Queue.status == models.JobStatus.PENDING).count()
)
# Capture_logs
root_logger = logging.getLogger()
handler = _Handler(job.id)
handler.setFormatter(_Formatter("%(levelname)-5.5s %(pathname)s:%(lineno)d %(funcName)s()"))

success = True
if not job.module:
if job.event_data.get("type") == "event":
_process_event(config, job.event_data, session)
job.status = models.JobStatus.DONE
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
elif job.event_name == "dashboard":
success = _validate_job(config, job.application, job.event_data)
if success:
_LOGGER.info("Process dashboard issue %i", job.id)
_process_dashboard_issue(
config,
session,
job.event_data,
job.application,
job.owner,
job.repository,
)
job.status = models.JobStatus.DONE
else:
job.status = models.JobStatus.ERROR
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
else:
_LOGGER.error("Unknown event name: %s", job.event_name)
job.status = models.JobStatus.ERROR
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
success = False
else:
module_data_formatted = utils.format_json(job.module_data)
event_data_formatted = utils.format_json(job.event_data)
message = module_utils.HtmlMessage(
f"<p>module data:</p>{module_data_formatted}<p>event data:</p>{event_data_formatted}"
)
message.title = f"Start process job '{job.event_name}' id: {job.id}, on {job.owner}/{job.repository} on module: {job.module}, on application {job.application}"
root_logger.addHandler(handler)
_LOGGER.info(message)
_RUNNING_JOBS[job.id] = _JobInfo(
job.module or "-", job.event_name, job.repository, job.priority, max_priority
)
root_logger.removeHandler(handler)

if make_pending:
_LOGGER.info("Make job ID %s pending", job.id)
job.status = models.JobStatus.PENDING
job.started_at = datetime.datetime.now(tz=datetime.UTC)
session.commit()
_LOGGER.debug("Process one job (max priority: %i): Make pending", max_priority)
return

try:
job.status = models.JobStatus.PENDING
job.started_at = datetime.datetime.now(tz=datetime.UTC)
session.commit()
_NB_JOBS.labels(models.JobStatus.PENDING.name).set(
session.query(models.Queue).filter(models.Queue.status == models.JobStatus.PENDING).count()
)

success = True
if not job.module:
if job.event_data.get("type") == "event":
_process_event(config, job.event_data, session)
job.status = models.JobStatus.DONE
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
elif job.event_name == "dashboard":
success = _validate_job(config, job.application, job.event_data)
if success:
success = await _process_job(
_LOGGER.info("Process dashboard issue %i", job.id)
_process_dashboard_issue(
config,
session,
root_logger,
handler,
job,
job.event_data,
job.application,
job.owner,
job.repository,
)

except Exception: # pylint: disable=broad-exception-caught
_LOGGER.exception("Failed to process job id: %s on module: %s.", job.id, job.module or "-")
job.log = "\n".join([handler.format(msg) for msg in handler.results])
finally:
sentry_sdk.set_context("job", {})
if job.status == models.JobStatus.PENDING:
_LOGGER.error("Job %s finished with pending status", job.id)
job.status = models.JobStatus.DONE
else:
job.status = models.JobStatus.ERROR
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
else:
_LOGGER.error("Unknown event name: %s", job.event_name)
job.status = models.JobStatus.ERROR
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
session.commit()
_RUNNING_JOBS.pop(job.id)
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
success = False
else:
success = _validate_job(config, job.application, job.event_data)
if success:
success = await _process_job(
config,
session,
root_logger,
handler,
job,
)

_LOGGER.debug("Process one job (max priority: %i): Done", max_priority)
return False
except Exception: # pylint: disable=broad-exception-caught
_LOGGER.exception("Failed to process job id: %s on module: %s.", job.id, job.module or "-")
job.log = "\n".join([handler.format(msg) for msg in handler.results])
finally:
sentry_sdk.set_context("job", {})
if job.status == models.JobStatus.PENDING:
_LOGGER.error("Job %s finished with pending status", job.id)
job.status = models.JobStatus.ERROR
job.finished_at = datetime.datetime.now(tz=datetime.UTC)
session.commit()
_RUNNING_JOBS.pop(job.id)

_LOGGER.debug("Process one job (max priority: %i): Done", max_priority)


class _Run:
Expand All @@ -770,15 +784,12 @@ def __init__(
self.max_priority = max_priority

async def __call__(self, *args: Any, **kwds: Any) -> Any:
current_task = asyncio.current_task()
if current_task is not None:
current_task.set_name(f"Run ({self.max_priority})")
empty_thread_sleep = int(os.environ.get("GHCI_EMPTY_THREAD_SLEEP", 10))

while True:
empty = True
try:
empty = await _process_one_job(
empty = await _get_process_one_job(
self.config,
self.Session,
no_steal_long_pending=self.end_when_empty,
Expand Down Expand Up @@ -811,10 +822,22 @@ async def __call__(self, *args: Any, **kwds: Any) -> Any:
await asyncio.to_thread(self._watch)

def _watch(self) -> None:
cont = 0
while True:
_LOGGER.debug("Prometheus watch: alive")
try:
_NB_JOBS.labels("Tasks").set(len(asyncio.all_tasks()))
cont += 1
if cont % 10 == 0:
message = ["Running tasks:"]
for task in asyncio.all_tasks():
if not task.done():
message.append(task.get_name())
if cont % 100 == 0:
string_io = io.StringIO()
task.print_stack(limit=5, file=string_io)
message.append(string_io.getvalue())
_LOGGER.error("\n".join(message))
except RuntimeError:
pass
with self.Session() as session:
Expand Down Expand Up @@ -883,12 +906,14 @@ async def _async_main() -> None:
# Create tables if they do not exist
models.Base.metadata.create_all(engine)
if args.only_one:
await _process_one_job(
await _get_process_one_job(
config, Session, no_steal_long_pending=args.exit_when_empty, make_pending=args.make_pending
)
sys.exit(0)
if args.make_pending:
await _process_one_job(config, Session, no_steal_long_pending=args.exit_when_empty, make_pending=True)
await _get_process_one_job(
config, Session, no_steal_long_pending=args.exit_when_empty, make_pending=True
)
sys.exit(0)

if not args.exit_when_empty and "C2C_PROMETHEUS_PORT" in os.environ:
Expand All @@ -905,14 +930,18 @@ def log_message(self, *args: Any) -> None:

priority_groups = [int(e) for e in os.environ.get("GHCI_PRIORITY_GROUPS", "2147483647").split(",")]

threads_call = []
tasks = []
if not args.exit_when_empty:
threads_call.append(_WatchDog()())
threads_call.append(_PrometheusWatch(Session)())
tasks.append(asyncio.create_task(_WatchDog()(), name="Watch Dog"))
tasks.append(asyncio.create_task(_PrometheusWatch(Session)(), name="Prometheus Watch"))

for priority in priority_groups:
threads_call.append(_Run(config, Session, args.exit_when_empty, priority)())
await asyncio.gather(*threads_call)
tasks.append(
asyncio.create_task(
_Run(config, Session, args.exit_when_empty, priority)(), name=f"Run ({priority})"
)
)
await asyncio.gather(*tasks)


def main() -> None:
Expand Down

0 comments on commit 01aac46

Please sign in to comment.