Skip to content

Commit

Permalink
feat: import firework.bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
RF-Tar-Railt committed Jan 3, 2025
1 parent ee698de commit 9a2b790
Show file tree
Hide file tree
Showing 10 changed files with 677 additions and 14 deletions.
File renamed without changes.
88 changes: 88 additions & 0 deletions _bootstrap/_resolve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Iterable

if TYPE_CHECKING:
from .service import Service


class RequirementResolveFailed(Exception):
pass


class DependencyBrokenError(Exception):
pass


def _build_dependencies_map(services: Iterable[Service]) -> dict[str, set[str]]:
dependencies_map: dict[str, set[str]] = {}

for service in services:
dependencies_map[service.id] = set(service.dependencies) | set(service.after)

for before in service.before:
dependencies_map.setdefault(before, set()).add(service.id)

return dependencies_map


def resolve_dependencies(
services: Iterable[Service],
exclude: Iterable[Service] = (),
*,
reverse: bool = False,
) -> list[list[str]]:
services = list(services)

dependencies_map = _build_dependencies_map(services)

unresolved = {s.id: s for s in services}
resolved_id = {i.id for i in exclude}
result: list[list[str]] = []

while unresolved:
layer_candidates = [service for service in unresolved.values() if resolved_id.issuperset(dependencies_map[service.id])]

if not layer_candidates:
raise TypeError("Failed to resolve requirements due to cyclic dependencies or unmet constraints.")

# 根据是否有 before 约束进行分类
befores = []
no_befores = []

for service in layer_candidates:
if service.before:
befores.append(service)
else:
no_befores.append(service)

# 优先无 before 的服务,一旦无 before 的服务存在,就先放这一层
current_layer = no_befores or befores

# 从未解决中移除当前层的服务
for cid in current_layer:
del unresolved[cid]

resolved_id.update(current_layer)
result.append(current_layer)

if reverse:
result.reverse()

return result


def validate_services_removal(existed: Iterable[Service], services_to_remove: Iterable[Service]):
graph = {service.id: set() for service in existed}

for service, deps in _build_dependencies_map(existed).items():
for dep in deps:
if dep in graph:
graph[dep].add(service)

to_remove = {service.id for service in services_to_remove}

for node in to_remove:
for dependent in graph.get(node, ()):
if dependent not in to_remove:
raise DependencyBrokenError(f"Cannot remove node '{node}' because node '{dependent}' depends on it.")
89 changes: 89 additions & 0 deletions _bootstrap/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING

from .status import Phase, ServiceStatusValue, Stage

if TYPE_CHECKING:
from .core import Bootstrap


@dataclass
class ServiceContext:
bootstrap: Bootstrap

def __post_init__(self):
self._status: ServiceStatusValue = (Stage.EXIT, Phase.WAITING)
self._sigexit = asyncio.Event()
self._notify = asyncio.Event()

def _forward(self, stage: Stage, phase: Phase):
prev_stage, prev_phase = self._status

if stage < prev_stage and prev_stage != Stage.EXIT:
raise ValueError(f"Cannot update stage from {prev_stage} to {stage}")

if stage == prev_stage:
if phase <= prev_phase:
raise ValueError(f"Cannot update phase from {prev_phase} to {phase}")
else:
phase = Phase.WAITING

self._status = (stage, phase)

self._notify.set()
self._notify.clear()

@property
def should_exit(self):
return self._sigexit.is_set()

async def wait_for(self, stage: Stage, phase: Phase):
val = (stage, phase)

while val > self._status:
await self._notify.wait()

async def wait_for_sigexit(self):
await self._sigexit.wait()

@asynccontextmanager
async def prepare(self):
self._forward(Stage.PREPARE, Phase.WAITING)
await self.wait_for(Stage.PREPARE, Phase.PENDING)
yield
self._forward(Stage.PREPARE, Phase.COMPLETED)

@asynccontextmanager
async def online(self):
self._forward(Stage.ONLINE, Phase.WAITING)
await self.wait_for(Stage.ONLINE, Phase.PENDING)
yield
self._forward(Stage.ONLINE, Phase.COMPLETED)

@asynccontextmanager
async def cleanup(self):
self._forward(Stage.CLEANUP, Phase.WAITING)
await self.wait_for(Stage.CLEANUP, Phase.PENDING)
yield
self._forward(Stage.CLEANUP, Phase.COMPLETED)

def dispatch_prepare(self):
self._forward(Stage.PREPARE, Phase.PENDING)

def dispatch_online(self):
self._forward(Stage.ONLINE, Phase.PENDING)

def dispatch_cleanup(self):
self._forward(Stage.CLEANUP, Phase.PENDING)

def exit(self):
"""Call by the manager"""
self._sigexit.set()

def exit_complete(self):
"""Call by the manager"""
self._status = (Stage.EXIT, Phase.COMPLETED)
Loading

0 comments on commit 9a2b790

Please sign in to comment.