Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spalloc new only #1194

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
from typing import ContextManager, Dict, Tuple, Optional, Union, cast

from spinn_utilities.config_holder import (
get_config_bool, get_config_str_or_none, get_config_str_list)
get_config_bool, get_config_str_or_none)
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.typing.coords import XY
from spinn_utilities.config_holder import get_config_int, get_config_str

from spalloc_client import Job # type: ignore[import]
from spalloc_client.states import JobState # type: ignore[import]
from spinn_utilities.config_holder import (
get_config_str, set_config)

from spinnman.connections.udp_packet_connections import (
SCAMPConnection, EIEIOConnection)
Expand All @@ -39,7 +37,6 @@
MachineAllocationController)
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.interface.provenance import ProvenanceWriter
from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc

logger = FormatAdapter(logging.getLogger(__name__))
_MACHINE_VERSION = 5 # Spalloc only ever works with v5 boards
Expand Down Expand Up @@ -198,73 +195,6 @@ def make_report(self, filename: str):
report.write(f"Job: {self._job}")


class _OldSpallocJobController(MachineAllocationController):
__slots__ = (
# the spalloc job object
"_job",
# the current job's old state
"_state"
)

def __init__(self, job: Job, host: str):
"""
:param ~spalloc.job.Job job:
"""
if job is None:
raise TypeError("must have a real job")
self._job = job
self._state = job.state
super().__init__("SpallocJobController", host)

@overrides(MachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time: float):
# Does Nothing in this allocator - machines are held until exit
pass

@overrides(MachineAllocationController.close)
def close(self) -> None:
super().close()
self._job.destroy()

@property
def power(self) -> bool:
"""
:rtype: bool
"""
return self._job.power

def set_power(self, power: bool):
"""
:param bool power:
"""
self._job.set_power(power)
if power:
self._job.wait_until_ready()

@overrides(MachineAllocationController.where_is_machine)
def where_is_machine(
self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
return self._job.where_is_machine(chip_y=chip_y, chip_x=chip_x)

@overrides(MachineAllocationController._wait)
def _wait(self) -> bool:
try:
if self._state != JobState.destroyed:
self._state = self._job.wait_for_state_change(self._state)
except TypeError:
pass
except Exception as e: # pylint: disable=broad-except
if not self._exited:
raise e
return self._state != JobState.destroyed

@overrides(MachineAllocationController._teardown)
def _teardown(self) -> None:
if not self._exited:
self._job.close()
super()._teardown()


_MACHINE_VERSION = 5


Expand Down Expand Up @@ -313,13 +243,15 @@ def spalloc_allocator(
if n_boards - n_boards_float < 0.5:
n_boards += 1

if is_server_address(spalloc_server):
host, connections, mac = _allocate_job_new(
spalloc_server, n_boards, bearer_token, group, collab,
int(nmpi_job) if nmpi_job is not None else None,
nmpi_user)
else:
host, connections, mac = _allocate_job_old(spalloc_server, n_boards)
if not is_server_address(spalloc_server):
if spalloc_server.lower() in ["spinnaker.cs.man.ac.uk", "10.11.192.11"]:
set_config("Machine", "spalloc_use_proxy", "False")
spalloc_server = \
"https://hack:[email protected]/spalloc/"
host, connections, mac = _allocate_job_new(
spalloc_server, n_boards, bearer_token, group, collab,
int(nmpi_job) if nmpi_job is not None else None,
nmpi_user)
return (host, _MACHINE_VERSION, None, False, False, connections, mac)


Expand Down Expand Up @@ -375,83 +307,3 @@ def _allocate_job_new(
stack.pop_all()
assert root is not None, "no root of ready board"
return (root, connections, allocation_controller)


def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[
str, Dict[XY, str], MachineAllocationController]:
"""
Request a machine from an old-style spalloc server that will fit the
requested number of boards.

:param str spalloc_server:
The server from which the machine should be requested
:param int n_boards: The number of boards required
:rtype: tuple(str, dict(tuple(int,int),str), MachineAllocationController)
"""
host, port, user = parse_old_spalloc(
spalloc_server, get_config_int("Machine", "spalloc_port"),
get_config_str("Machine", "spalloc_user"))
spalloc_kwargs = {
'hostname': host,
'port': port,
'owner': user
}
spalloc_machine = get_config_str_or_none("Machine", "spalloc_machine")

if spalloc_machine is not None:
spalloc_kwargs['machine'] = spalloc_machine

job, hostname, scamp_connection_data = _launch_checked_job_old(
n_boards, spalloc_kwargs)
machine_allocation_controller = _OldSpallocJobController(job, hostname)
return (hostname, scamp_connection_data, machine_allocation_controller)


def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[
Job, str, Dict[XY, str]]:
"""
:rtype: tuple(~.Job, str, dict(tuple(int,int),str))
"""
logger.info(f"Requesting job with {n_boards} boards")
avoid_boards = get_config_str_list("Machine", "spalloc_avoid_boards")
avoid_jobs = []
try:
while True:
job = Job(n_boards, **spalloc_kwargs)
try:
job.wait_until_ready()
# get param from jobs before starting, so that hanging doesn't
# occur
hostname = job.hostname
except Exception as ex:
job.destroy(str(ex))
raise
connections = job.connections
if len(connections) < n_boards:
logger.warning(
"boards: {}",
str(connections).replace("{", "[").replace("}", "]"))
raise ValueError("Not enough connections detected")
if logger.isEnabledFor(logging.DEBUG):
logger.debug("boards: {}",
str(connections).replace("{", "[").replace(
"}", "]"))
with ProvenanceWriter() as db:
db.insert_board_provenance(connections)
if hostname not in avoid_boards:
break
avoid_jobs.append(job)
logger.warning(
f"Asking for new job as {hostname} "
f"as in the spalloc_avoid_boards list")
finally:
if avoid_boards:
for key in list(connections.keys()):
if connections[key] in avoid_boards:
logger.warning(
f"Removing connection info for {connections[key]} "
f"as in the spalloc avoid_boards list")
del connections[key]
for avoid_job in avoid_jobs:
avoid_job.destroy("Asked to avoid by cfg")
return job, hostname, connections
35 changes: 1 addition & 34 deletions spinn_front_end_common/utilities/utility_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import io
import os
import threading
from typing import (Optional, Union, TextIO, Tuple, TypeVar)
from urllib.parse import urlparse
from typing import (Optional, Union, TextIO, TypeVar)
from spinn_utilities.config_holder import get_config_bool
from spinn_machine import Chip
from spinnman.connections.udp_packet_connections import SCAMPConnection
Expand Down Expand Up @@ -106,38 +105,6 @@ def get_report_writer(
return io.TextIOWrapper(io.FileIO(name, "w"))


def parse_old_spalloc(
spalloc_server: str, spalloc_port: int,
spalloc_user: str) -> Tuple[str, int, str]:
"""
Parse a URL to the old-style service. This may take the form:

spalloc://[email protected]:22244

The leading ``spalloc://`` is the mandatory part (as is the actual host
name). If the port and user are omitted, the defaults given in the other
arguments are used (or default defaults).

A bare hostname can be used instead. If that's the case (i.e., there's no
``spalloc://`` prefix) then the port and user are definitely used.

:param str spalloc_server: Hostname or URL
:param int spalloc_port: Default port
:param str spalloc_user: Default user
:return: hostname, port, username
:rtype: tuple(str,int,str)
"""
if spalloc_port is None or spalloc_port == "":
spalloc_port = 22244
if spalloc_user is None or spalloc_user == "":
spalloc_user = "unknown user"
parsed = urlparse(spalloc_server, "spalloc")
if parsed.netloc == "" or parsed.hostname is None:
return spalloc_server, spalloc_port, spalloc_user
return parsed.hostname, (parsed.port or spalloc_port), \
(parsed.username or spalloc_user)


def retarget_tag(
connection: Union[SpallocEIEIOListener, SpallocEIEIOConnection,
SCAMPConnection], x: int, y: int, tag: int,
Expand Down
Loading