diff --git a/hivemind/averaging/key_manager.py b/hivemind/averaging/key_manager.py index f613615c8..b4f4d654f 100644 --- a/hivemind/averaging/key_manager.py +++ b/hivemind/averaging/key_manager.py @@ -11,7 +11,6 @@ GroupKey = str GROUP_PATTERN = re.compile("^(([^.])+)[.]0b[01]*$") # e.g. bert_exp4_averaging.0b01001101 -DEFAULT_NUM_BUCKETS = 256 logger = get_logger(__name__) @@ -92,7 +91,7 @@ async def get_averagers(self, group_key: GroupKey, only_active: bool) -> List[Tu logger.warning(f"Could not parse group key {key} ({looking_for_group}, exc={e})") return averagers - async def update_key_on_group_assembled(self, group_info: GroupInfo, is_leader: bool = True): + async def update_key_on_group_assembled(self, group_info: GroupInfo): """this function is triggered every time an averager finds an allreduce group""" rng = random.Random(group_info.group_id) index = group_info.peer_ids.index(self.peer_id) diff --git a/hivemind/averaging/matchmaking.py b/hivemind/averaging/matchmaking.py index b5a5f1146..889eac013 100644 --- a/hivemind/averaging/matchmaking.py +++ b/hivemind/averaging/matchmaking.py @@ -21,7 +21,7 @@ class Matchmaking: - f""" + """ An internal class that is used to form groups of averages for running allreduce See DecentralizedAverager docstring for the detailed description of all parameters @@ -383,7 +383,7 @@ async def leader_assemble_group(self) -> GroupInfo: logger.debug(f"{self.peer_id} - assembled group of {len(ordered_peer_ids)} peers") group_info = GroupInfo(group_id, tuple(ordered_peer_ids), gathered) - await self.group_key_manager.update_key_on_group_assembled(group_info, is_leader=True) + await self.group_key_manager.update_key_on_group_assembled(group_info) self.assembled_group.set_result(group_info) return group_info diff --git a/hivemind/dht/routing.py b/hivemind/dht/routing.py index 1e2e0184a..a62e7066f 100644 --- a/hivemind/dht/routing.py +++ b/hivemind/dht/routing.py @@ -13,7 +13,7 @@ from hivemind.utils import MSGPackSerializer, get_dht_time DHTKey = Subkey = DHTValue = Any -BinaryDHTID = BinaryDHTValue = bytes +BinaryDHTValue = bytes class RoutingTable: @@ -251,7 +251,8 @@ def __repr__(self): class DHTID(int): HASH_FUNC = hashlib.sha1 HASH_NBYTES = 20 # SHA1 produces a 20-byte (aka 160bit) number - RANGE = MIN, MAX = 0, 2 ** (HASH_NBYTES * 8) # inclusive min, exclusive max + MIN = 0 + MAX = 2 ** (HASH_NBYTES * 8) def __new__(cls, value: int): assert cls.MIN <= value < cls.MAX, f"DHTID must be in [{cls.MIN}, {cls.MAX}) but got {value}" diff --git a/hivemind/dht/schema.py b/hivemind/dht/schema.py index de946721f..26d21cb31 100644 --- a/hivemind/dht/schema.py +++ b/hivemind/dht/schema.py @@ -91,7 +91,6 @@ def validate(self, record: DHTRecord) -> bool: return False [field_name] = list(record.keys()) - n_outside_schema = 0 validation_errors = [] for schema in self._schemas: try: diff --git a/hivemind/moe/server/task_pool.py b/hivemind/moe/server/task_pool.py index c763dc776..a3ba850bb 100644 --- a/hivemind/moe/server/task_pool.py +++ b/hivemind/moe/server/task_pool.py @@ -10,7 +10,7 @@ from collections import namedtuple from concurrent.futures import Future from queue import Empty -from typing import Any, Dict, Generator, List, Tuple +from typing import Any, Dict, List, Tuple import torch diff --git a/hivemind/optim/__init__.py b/hivemind/optim/__init__.py index b144ba7fa..58c049480 100644 --- a/hivemind/optim/__init__.py +++ b/hivemind/optim/__init__.py @@ -1,3 +1,3 @@ -from hivemind.optim.grad_scaler import GradScaler, HivemindGradScaler +from hivemind.optim.grad_scaler import GradScaler from hivemind.optim.optimizer import Optimizer from hivemind.optim.training_averager import TrainingAverager diff --git a/hivemind/optim/grad_scaler.py b/hivemind/optim/grad_scaler.py index 38bcbb740..e999be91d 100644 --- a/hivemind/optim/grad_scaler.py +++ b/hivemind/optim/grad_scaler.py @@ -126,9 +126,3 @@ def _unscale_grads_( def are_grads_finite(self, optimizer: TorchOptimizer, use_cached: bool = False) -> bool: opt_dict = self._found_inf_per_device(optimizer) if use_cached else self._check_inf_per_device(optimizer) return not sum(v.item() for v in opt_dict.values()) - - -class HivemindGradScaler(GradScaler): - def __init__(self, *args, **kwargs): - logger.warning("HivemindGradScaler was renamed to hivemind.GradScaler, this reference will be removed in v1.1") - super().__init__(*args, **kwargs) diff --git a/hivemind/p2p/p2p_daemon_bindings/datastructures.py b/hivemind/p2p/p2p_daemon_bindings/datastructures.py index 920aa920b..03b2c7b2b 100644 --- a/hivemind/p2p/p2p_daemon_bindings/datastructures.py +++ b/hivemind/p2p/p2p_daemon_bindings/datastructures.py @@ -10,7 +10,7 @@ import base58 import multihash from cryptography.hazmat.primitives import serialization -from multiaddr import Multiaddr, protocols +from multiaddr import Multiaddr from hivemind.proto import crypto_pb2, p2pd_pb2 @@ -18,13 +18,8 @@ class PeerID: def __init__(self, peer_id_bytes: bytes) -> None: self._bytes = peer_id_bytes - self._xor_id = int(sha256_digest(self._bytes).hex(), 16) self._b58_str = base58.b58encode(self._bytes).decode() - @property - def xor_id(self) -> int: - return self._xor_id - def to_bytes(self) -> bytes: return self._bytes @@ -137,31 +132,3 @@ def __str__(self): def __repr__(self): return f"PeerInfo(peer_id={repr(self.peer_id)}, addrs={repr(self.addrs)})" - - -class InvalidAddrError(ValueError): - pass - - -def info_from_p2p_addr(addr: Multiaddr) -> PeerInfo: - if addr is None: - raise InvalidAddrError("`addr` should not be `None`") - - parts = addr.split() - if not parts: - raise InvalidAddrError(f"`parts`={parts} should at least have a protocol `P_P2P`") - - p2p_part = parts[-1] - last_protocol_code = p2p_part.protocols()[0].code - if last_protocol_code != protocols.P_P2P: - raise InvalidAddrError(f"The last protocol should be `P_P2P` instead of `{last_protocol_code}`") - - # make sure the /p2p value parses as a peer.ID - peer_id_str: str = p2p_part.value_for_protocol(protocols.P_P2P) - peer_id = PeerID.from_base58(peer_id_str) - - # we might have received just an / p2p part, which means there's no addr. - if len(parts) > 1: - addr = Multiaddr.join(*parts[:-1]) - - return PeerInfo(peer_id, [addr]) diff --git a/hivemind/utils/asyncio.py b/hivemind/utils/asyncio.py index 36901a2d3..caa7f0830 100644 --- a/hivemind/utils/asyncio.py +++ b/hivemind/utils/asyncio.py @@ -1,5 +1,4 @@ import asyncio -import concurrent.futures import multiprocessing as mp import os from concurrent.futures import ThreadPoolExecutor