diff --git a/neurons/validators/src/clients/compute_client.py b/neurons/validators/src/clients/compute_client.py index 70bcf96..e59439a 100644 --- a/neurons/validators/src/clients/compute_client.py +++ b/neurons/validators/src/clients/compute_client.py @@ -25,6 +25,7 @@ DuplicateExecutorsRequest, ExecutorSpecRequest, LogStreamRequest, + ResetVerifiedJobRequest, RentedMachineRequest, ) from pydantic import BaseModel @@ -36,8 +37,9 @@ from services.redis_service import ( DUPLICATED_MACHINE_SET, MACHINE_SPEC_CHANNEL_NAME, - RENTED_MACHINE_SET, + RENTED_MACHINE_PREFIX, STREAMING_LOG_CHANNEL, + RESET_VERIFIED_JOB_CHANNEL, ) logger = logging.getLogger(__name__) @@ -118,10 +120,12 @@ async def run_forever(self) -> NoReturn: # subscribe to channel to get machine specs pubsub = await self.miner_service.redis_service.subscribe(MACHINE_SPEC_CHANNEL_NAME) log_channel = await self.miner_service.redis_service.subscribe(STREAMING_LOG_CHANNEL) + reset_verified_job_channel = await self.miner_service.redis_service.subscribe(RESET_VERIFIED_JOB_CHANNEL) # send machine specs to facilitator self.specs_task = asyncio.create_task(self.wait_for_specs(pubsub)) asyncio.create_task(self.wait_for_log_streams(log_channel)) + asyncio.create_task(self.wait_for_reset_verified_job(reset_verified_job_channel)) except Exception as exc: logger.error( _m("redis connection error", extra={**self.logging_extra, "error": str(exc)}) @@ -366,6 +370,78 @@ async def wait_for_log_streams(self, channel: aioredis.client.PubSub): except TimeoutError: pass + async def wait_for_reset_verified_job(self, channel: aioredis.client.PubSub): + logs_queue: list[ResetVerifiedJobRequest] = [] + while True: + validator_hotkey = self.my_hotkey() + logger.info( + _m( + f"Waiting for clear verified jobs: {validator_hotkey}", + extra=self.logging_extra, + ) + ) + try: + msg = await channel.get_message(ignore_subscribe_messages=True, timeout=100 * 60) + if msg is None: + logger.warning( + _m( + "No clear job request yet", + extra=self.logging_extra, + ) + ) + continue + + msg = json.loads(msg["data"]) + reset_request = None + + try: + reset_request = ResetVerifiedJobRequest( + miner_hotkey=msg["miner_hotkey"], + validator_hotkey=validator_hotkey, + executor_uuid=msg["executor_uuid"], + ) + + logger.info( + _m( + f'Successfully created ResetVerifiedJobRequest instance with {msg}', + extra=self.logging_extra, + ) + ) + except Exception as exc: + logger.error( + _m( + "Failed to get ResetVerifiedJobRequest instance", + extra={ + **self.logging_extra, + "error": str(exc), + "msg": str(msg), + }, + ), + exc_info=True, + ) + continue + + logs_queue.append(reset_request) + if self.ws is not None: + while len(logs_queue) > 0: + log_to_send = logs_queue.pop(0) + try: + await self.send_model(log_to_send) + except Exception as exc: + logs_queue.insert(0, log_to_send) + logger.error( + _m( + msg, + extra={ + **self.logging_extra, + "error": str(exc), + }, + ) + ) + break + except TimeoutError: + pass + def create_metagraph_refresh_task(self, period=None): return create_metagraph_refresh_task(period=period) @@ -445,7 +521,7 @@ async def handle_message(self, raw_msg: str | bytes): ) redis_service = self.miner_service.redis_service - await redis_service.delete(RENTED_MACHINE_SET) + await redis_service.delete(RENTED_MACHINE_PREFIX) for machine in response.machines: await redis_service.add_rented_machine(machine) diff --git a/neurons/validators/src/protocol/vc_protocol/compute_requests.py b/neurons/validators/src/protocol/vc_protocol/compute_requests.py index fe1b787..4b03e22 100644 --- a/neurons/validators/src/protocol/vc_protocol/compute_requests.py +++ b/neurons/validators/src/protocol/vc_protocol/compute_requests.py @@ -21,6 +21,7 @@ class RentedMachine(BaseModel): executor_id: str executor_ip_address: str executor_ip_port: str + container_name: str class RentedMachineResponse(BaseModel): diff --git a/neurons/validators/src/protocol/vc_protocol/validator_requests.py b/neurons/validators/src/protocol/vc_protocol/validator_requests.py index 88a4428..38e1ff3 100644 --- a/neurons/validators/src/protocol/vc_protocol/validator_requests.py +++ b/neurons/validators/src/protocol/vc_protocol/validator_requests.py @@ -13,6 +13,7 @@ class RequestType(enum.Enum): ExecutorSpecRequest = "ExecutorSpecRequest" RentedMachineRequest = "RentedMachineRequest" LogStreamRequest = "LogStreamRequest" + ResetVerifiedJobRequest = "ResetVerifiedJobRequest" DuplicateExecutorsRequest = "DuplicateExecutorsRequest" @@ -73,5 +74,12 @@ class LogStreamRequest(BaseValidatorRequest): logs: list[dict] +class ResetVerifiedJobRequest(BaseValidatorRequest): + message_type: RequestType = RequestType.ResetVerifiedJobRequest + validator_hotkey: str + miner_hotkey: str + executor_uuid: str + + class DuplicateExecutorsRequest(BaseValidatorRequest): message_type: RequestType = RequestType.DuplicateExecutorsRequest diff --git a/neurons/validators/src/services/docker_service.py b/neurons/validators/src/services/docker_service.py index 23b98bb..d6274f2 100644 --- a/neurons/validators/src/services/docker_service.py +++ b/neurons/validators/src/services/docker_service.py @@ -236,9 +236,9 @@ async def clean_exisiting_containers( command = f'docker volume prune -af' await ssh_client.run(command) - async def clear_verified_job_count(self, renting_machine: RentedMachine): - await self.redis_service.remove_pending_pod(renting_machine) - await self.redis_service.clear_verified_job_info(renting_machine.executor_id) + async def clear_verified_job_count(self, miner_hotkey: str, executor_id: str): + await self.redis_service.remove_pending_pod(miner_hotkey, executor_id) + await self.redis_service.clear_verified_job_info(miner_hotkey, executor_id) async def create_container( self, @@ -258,13 +258,6 @@ async def create_container( "debug": payload.debug, } - renting_machine = RentedMachine( - miner_hotkey=payload.miner_hotkey, - executor_id=payload.executor_id, - executor_ip_address=executor_info.address, - executor_ip_port=str(executor_info.port), - ) - logger.info( _m( "Create Docker Container", @@ -290,7 +283,7 @@ async def create_container( log_text = "No port mappings found" logger.error(log_text) - await self.clear_verified_job_count(renting_machine) + await self.clear_verified_job_count(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, @@ -300,7 +293,7 @@ async def create_container( ) # add executor in pending status dict - await self.redis_service.add_pending_pod(renting_machine) + await self.redis_service.add_pending_pod(payload.miner_hotkey, payload.executor_id) private_key = self.ssh_service.decrypt_payload(keypair.ss58_address, private_key) pkey = asyncssh.import_private_key(private_key) @@ -356,7 +349,7 @@ async def create_container( logger.error(log_text) await self.finish_stream_logs() - await self.clear_verified_job_count(renting_machine) + await self.clear_verified_job_count(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, @@ -439,7 +432,7 @@ async def create_container( logger.error(log_text) await self.finish_stream_logs() - await self.clear_verified_job_count(renting_machine) + await self.clear_verified_job_count(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, @@ -497,7 +490,7 @@ async def create_container( logger.error(log_text) await self.finish_stream_logs() - await self.clear_verified_job_count(renting_machine) + await self.clear_verified_job_count(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, @@ -518,7 +511,7 @@ async def create_container( logger.error(log_text) await self.finish_stream_logs() - await self.clear_verified_job_count(renting_machine) + await self.clear_verified_job_count(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, @@ -536,8 +529,14 @@ async def create_container( await self.finish_stream_logs() - await self.redis_service.add_rented_machine(renting_machine) - await self.redis_service.remove_pending_pod(renting_machine) + await self.redis_service.add_rented_machine(RentedMachine( + miner_hotkey=payload.miner_hotkey, + executor_id=payload.executor_id, + executor_ip_address=executor_info.address, + executor_ip_port=str(executor_info.port), + container_name=container_name, + )) + await self.redis_service.remove_pending_pod(payload.miner_hotkey, payload.executor_id) return ContainerCreatedResult( container_name=container_name, @@ -554,7 +553,7 @@ async def create_container( logger.error(log_text, exc_info=True) await self.finish_stream_logs() - await self.clear_verified_job_count(renting_machine) + await self.clear_verified_job_count(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, @@ -699,14 +698,7 @@ async def delete_container( ), ) - await self.redis_service.remove_rented_machine( - RentedMachine( - miner_hotkey=payload.miner_hotkey, - executor_id=payload.executor_id, - executor_ip_address=executor_info.address, - executor_ip_port=str(executor_info.port), - ) - ) + await self.redis_service.remove_rented_machine(payload.miner_hotkey, payload.executor_id) async def get_docker_hub_digests(self, repositories) -> dict[str, str]: """Retrieve all tags and their corresponding digests from Docker Hub.""" diff --git a/neurons/validators/src/services/miner_service.py b/neurons/validators/src/services/miner_service.py index 22cde2e..b5fd9da 100644 --- a/neurons/validators/src/services/miner_service.py +++ b/neurons/validators/src/services/miner_service.py @@ -387,14 +387,7 @@ async def handle_container(self, payload: ContainerBaseRequest): ) ) - await self.redis_service.remove_rented_machine( - RentedMachine( - miner_hotkey=payload.miner_hotkey, - executor_id=payload.executor_id, - executor_ip_address=executor.address if executor else "", - executor_ip_port=str(executor.port if executor else ""), - ) - ) + await self.redis_service.remove_rented_machine(payload.miner_hotkey, payload.executor_id) return FailedContainerRequest( miner_hotkey=payload.miner_hotkey, diff --git a/neurons/validators/src/services/redis_service.py b/neurons/validators/src/services/redis_service.py index 914c188..748bf22 100644 --- a/neurons/validators/src/services/redis_service.py +++ b/neurons/validators/src/services/redis_service.py @@ -6,7 +6,9 @@ MACHINE_SPEC_CHANNEL_NAME = "channel:1" STREAMING_LOG_CHANNEL = "channel:2" +RESET_VERIFIED_JOB_CHANNEL = "channel:3" RENTED_MACHINE_SET = "rented_machines" +RENTED_MACHINE_PREFIX = "rented_machines_prefix" PENDING_PODS_SET = "pending_pods" DUPLICATED_MACHINE_SET = "duplicated_machines" EXECUTOR_COUNT_PREFIX = "executor_counts" @@ -63,18 +65,6 @@ async def smembers(self, key: str): async with self.lock: return await self.redis.smembers(key) - async def add_rented_machine(self, machine: RentedMachine): - await self.sadd(RENTED_MACHINE_SET, f"{machine.miner_hotkey}:{machine.executor_id}") - - async def remove_rented_machine(self, machine: RentedMachine): - await self.srem(RENTED_MACHINE_SET, f"{machine.miner_hotkey}:{machine.executor_id}") - - async def add_pending_pod(self, machine: RentedMachine): - await self.sadd(PENDING_PODS_SET, f"{machine.miner_hotkey}:{machine.executor_id}") - - async def remove_pending_pod(self, machine: RentedMachine): - await self.srem(PENDING_PODS_SET, f"{machine.miner_hotkey}:{machine.executor_id}") - async def lpush(self, key: str, element: bytes): """Add an element to a list in Redis.""" async with self.lock: @@ -126,6 +116,25 @@ async def clear_by_pattern(self, pattern: str): async for key in self.redis.scan_iter(match=pattern): await self.redis.delete(key.decode()) + async def add_rented_machine(self, machine: RentedMachine): + await self.hset(RENTED_MACHINE_PREFIX, f"{machine.miner_hotkey}:{machine.executor_id}", machine.model_dump_json()) + + async def remove_rented_machine(self, miner_hotkey: str, executor_id: str): + await self.hdel(RENTED_MACHINE_PREFIX, f"{miner_hotkey}:{executor_id}") + + async def get_rented_machine(self, miner_hotkey: str, executor_id: str): + data = await self.hget(RENTED_MACHINE_PREFIX, f"{miner_hotkey}:{executor_id}") + if not data: + return None + + return json.loads(data) + + async def add_pending_pod(self, miner_hotkey: str, executor_id: str): + await self.sadd(PENDING_PODS_SET, f"{miner_hotkey}:{executor_id}") + + async def remove_pending_pod(self, miner_hotkey: str, executor_id: str): + await self.srem(PENDING_PODS_SET, f"{miner_hotkey}:{executor_id}") + async def clear_all_executor_counts(self): pattern = f"{EXECUTOR_COUNT_PREFIX}:*" cursor = 0 @@ -142,7 +151,14 @@ async def clear_all_ssh_ports(self): pattern = f"{AVAILABLE_PORT_MAPS_PREFIX}:*" await self.clear_by_pattern(pattern) - async def set_verified_job_info(self, executor_id: str, prev_info: dict, success: bool = True, spec: str = ''): + async def set_verified_job_info( + self, + miner_hotkey: str, + executor_id: str, + prev_info: dict = {}, + success: bool = True, + spec: str = '' + ): count = prev_info.get('count', 0) failed = prev_info.get('failed', 0) prev_spec = prev_info.get('spec', '') @@ -153,8 +169,11 @@ async def set_verified_job_info(self, executor_id: str, prev_info: dict, success failed += 1 if failed * 20 >= count: - count = 0 - failed = 0 + return await self.clear_verified_job_info( + miner_hotkey=miner_hotkey, + executor_id=executor_id, + prev_info=prev_info, + ) data = { "count": count, @@ -164,7 +183,12 @@ async def set_verified_job_info(self, executor_id: str, prev_info: dict, success await self.hset(VERIFIED_JOB_COUNT_KEY, executor_id, json.dumps(data)) - async def clear_verified_job_info(self, executor_id, prev_info: dict = {}): + async def clear_verified_job_info( + self, + miner_hotkey: str, + executor_id, + prev_info: dict = {} + ): spec = prev_info.get('spec', '') data = { "count": 0, @@ -172,6 +196,13 @@ async def clear_verified_job_info(self, executor_id, prev_info: dict = {}): "spec": spec, } await self.hset(VERIFIED_JOB_COUNT_KEY, executor_id, json.dumps(data)) + await self.publish( + RESET_VERIFIED_JOB_CHANNEL, + { + "miner_hotkey": miner_hotkey, + "executor_uuid": executor_id, + }, + ) async def get_verified_job_info(self, executor_id: str): data = await self.hget(VERIFIED_JOB_COUNT_KEY, executor_id) diff --git a/neurons/validators/src/services/task_service.py b/neurons/validators/src/services/task_service.py index befc30d..6478033 100644 --- a/neurons/validators/src/services/task_service.py +++ b/neurons/validators/src/services/task_service.py @@ -343,8 +343,34 @@ async def docker_connection_check( return False, log_text, log_status - async def clear_verified_job_count(self, executor_info: ExecutorSSHInfo, prev_info: dict = {}): - await self.redis_service.clear_verified_job_info(executor_info.uuid, prev_info) + async def clear_verified_job_count( + self, + miner_info: MinerJobRequestPayload, + executor_info: ExecutorSSHInfo, + prev_info: dict = {} + ): + await self.redis_service.clear_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=prev_info, + ) + + async def check_pod_running( + self, + ssh_client: asyncssh.SSHClientConnection, + miner_hotkey: str, + container_name: str, + executor_info: ExecutorSSHInfo, + ): + # check container running or not + result = await ssh_client.run(f"docker ps -q -f name={container_name}") + if result.stdout.strip(): + return True + + # remove pod in redis + await self.redis_service.remove_rented_machine(miner_hotkey, executor_info.uuid) + + return False async def create_task( self, @@ -440,7 +466,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( None, @@ -512,7 +543,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -558,7 +594,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -585,7 +626,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -614,7 +660,11 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.clear_verified_job_count(executor_info, verified_job_info) + await self.clear_verified_job_count( + miner_info=miner_info, + executor_info=executor_info, + prev_info=verified_job_info + ) return ( machine_spec, @@ -641,7 +691,11 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.clear_verified_job_count(executor_info, verified_job_info) + await self.clear_verified_job_count( + miner_info=miner_info, + executor_info=executor_info, + prev_info=verified_job_info + ) return ( machine_spec, @@ -671,7 +725,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -703,7 +762,12 @@ async def create_task( # logger.warning(log_text) # await self.clear_remote_directory(ssh_client, remote_dir) - # await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + # await self.redis_service.set_verified_job_info( + # miner_hotkey=miner_info.miner_hotkey, + # executor_id=executor_info.uuid, + # prev_info=verified_job_info, + # success=False, + # ) # return ( # machine_spec, @@ -735,7 +799,11 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.clear_verified_job_count(executor_info, verified_job_info) + await self.clear_verified_job_count( + miner_info=miner_info, + executor_info=executor_info, + prev_info=verified_job_info + ) return ( machine_spec, @@ -751,7 +819,46 @@ async def create_task( is_rented = await self.redis_service.is_elem_exists_in_set( RENTED_MACHINE_SET, f"{miner_info.miner_hotkey}:{executor_info.uuid}" ) - if is_rented: + rented_machine = await self.redis_service.get_rented_machine(miner_info.miner_hotkey, executor_info.uuid) + if is_rented or rented_machine: + if rented_machine: + container_name = rented_machine.get("container_name", "") + is_pod_running = await self.check_pod_running( + ssh_client=ssh_client, + miner_hotkey=miner_info.miner_hotkey, + container_name=container_name, + executor_info=executor_info, + ) + if not is_pod_running: + log_status = "warning" + log_text = _m( + "Pod is not running", + extra=get_extra_info( + { + **default_extra, + "container_name": container_name, + } + ), + ) + logger.warning(log_text) + + await self.clear_remote_directory(ssh_client, remote_dir) + await self.clear_verified_job_count( + miner_info=miner_info, + executor_info=executor_info, + prev_info=verified_job_info + ) + + return ( + machine_spec, + executor_info, + 0, + 0, + miner_info.job_batch_id, + log_status, + log_text, + ) + score = max_score * gpu_count log_text = _m( "Executor is already rented.", @@ -789,7 +896,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -815,7 +927,12 @@ async def create_task( ) if not success: await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( None, @@ -842,7 +959,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( None, @@ -866,7 +988,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( None, @@ -907,7 +1034,12 @@ async def create_task( logger.warning(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -945,7 +1077,12 @@ async def create_task( logger.error(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -966,7 +1103,12 @@ async def create_task( logger.error(log_text) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) return ( machine_spec, @@ -1055,7 +1197,13 @@ async def create_task( ) await self.clear_remote_directory(ssh_client, remote_dir) - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, True, gpu_model_count) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=True, + spec=gpu_model_count, + ) if verified_job_count >= VERIFY_JOB_REQUIRED_COUNT: return ( @@ -1085,7 +1233,12 @@ async def create_task( ) try: - await self.redis_service.set_verified_job_info(executor_info.uuid, verified_job_info, False) + await self.redis_service.set_verified_job_info( + miner_hotkey=miner_info.miner_hotkey, + executor_id=executor_info.uuid, + prev_info=verified_job_info, + success=False, + ) key = f"{AVAILABLE_PORT_MAPS_PREFIX}:{miner_info.miner_hotkey}:{executor_info.uuid}" await self.redis_service.delete(key) diff --git a/neurons/validators/src/test_validator.py b/neurons/validators/src/test_validator.py index 1614f0c..acfb9c7 100644 --- a/neurons/validators/src/test_validator.py +++ b/neurons/validators/src/test_validator.py @@ -1,57 +1,70 @@ -import asyncio -import bittensor +# import asyncio +# import bittensor -from core.config import settings -from fastapi.testclient import TestClient -from concurrent.futures import ThreadPoolExecutor, as_completed -from services.docker_service import DockerService -from services.ioc import ioc +# from core.config import settings +# from fastapi.testclient import TestClient +# from concurrent.futures import ThreadPoolExecutor, as_completed +# from services.docker_service import DockerService +# from services.ioc import ioc -from validator import app +# from validator import app +from protocol.vc_protocol.compute_requests import RentedMachine +import json -client = TestClient(app) +# client = TestClient(app) -def send_post_request(): - response = client.post( - "/miner_request", - json={ - "miner_hotkey": "5EHgHZBfx4ZwU7GzGCS8VCMBLBEKo5eaCvXKiu6SASwWT6UY", - "miner_address": "localhost", - "miner_port": 8000 - }, - ) - assert response.status_code == 200 +# def send_post_request(): +# response = client.post( +# "/miner_request", +# json={ +# "miner_hotkey": "5EHgHZBfx4ZwU7GzGCS8VCMBLBEKo5eaCvXKiu6SASwWT6UY", +# "miner_address": "localhost", +# "miner_port": 8000 +# }, +# ) +# assert response.status_code == 200 -def test_socket_connections(): - num_requests = 10 # Number of simultaneous requests - with ThreadPoolExecutor(max_workers=num_requests) as executor: - futures = [executor.submit(send_post_request) for _ in range(num_requests)] +# def test_socket_connections(): +# num_requests = 10 # Number of simultaneous requests +# with ThreadPoolExecutor(max_workers=num_requests) as executor: +# futures = [executor.submit(send_post_request) for _ in range(num_requests)] - for future in as_completed(futures): - response = future.result() - assert response.status_code == 200 +# for future in as_completed(futures): +# response = future.result() +# assert response.status_code == 200 -async def check_docker_port_mappings(): - docker_service: DockerService = ioc["DockerService"] - miner_hotkey = '5Df8qGLMd19BXByefGCZFN57fWv6jDm5hUbnQeUTu2iqNBhT' - executor_id = 'c272060f-8eae-4265-8e26-1d83ac96b498' - port_mappings = await docker_service.generate_portMappings(miner_hotkey, executor_id) - print('port_mappings ==>', port_mappings) +# async def check_docker_port_mappings(): +# docker_service: DockerService = ioc["DockerService"] +# miner_hotkey = '5Df8qGLMd19BXByefGCZFN57fWv6jDm5hUbnQeUTu2iqNBhT' +# executor_id = 'c272060f-8eae-4265-8e26-1d83ac96b498' +# port_mappings = await docker_service.generate_portMappings(miner_hotkey, executor_id) +# print('port_mappings ==>', port_mappings) if __name__ == "__main__": # test_socket_connections() - asyncio.run(check_docker_port_mappings()) - - config = settings.get_bittensor_config() - subtensor = bittensor.subtensor(config=config) - node = subtensor.substrate - - netuid = settings.BITTENSOR_NETUID - tempo = subtensor.tempo(netuid) - weights_rate_limit = node.query("SubtensorModule", "WeightsSetRateLimit", [netuid]).value - server_rate_limit = node.query("SubtensorModule", "WeightsSetRateLimit", [netuid]).value - serving_rate_limit = node.query("SubtensorModule", "ServingRateLimit", [netuid]).value - print('rate limit ===>', tempo, weights_rate_limit, serving_rate_limit) + # asyncio.run(check_docker_port_mappings()) + + # config = settings.get_bittensor_config() + # subtensor = bittensor.subtensor(config=config) + # node = subtensor.substrate + + # netuid = settings.BITTENSOR_NETUID + # tempo = subtensor.tempo(netuid) + # weights_rate_limit = node.query("SubtensorModule", "WeightsSetRateLimit", [netuid]).value + # server_rate_limit = node.query("SubtensorModule", "WeightsSetRateLimit", [netuid]).value + # serving_rate_limit = node.query("SubtensorModule", "ServingRateLimit", [netuid]).value + # print('rate limit ===>', tempo, weights_rate_limit, serving_rate_limit) + + rented_machine = RentedMachine( + miner_hotkey="miner_hotkey", + executor_id='executor_id', + executor_ip_address='executor_ip_address', + executor_ip_port='2000', + container_name='container_name', + ) + rented_machine_str =rented_machine.model_dump_json() + machine = RentedMachine.model_validate(json.loads(rented_machine_str)) + print(json.loads(rented_machine_str)) diff --git a/neurons/validators/version.txt b/neurons/validators/version.txt index 3ecfa57..ffbf6d8 100644 --- a/neurons/validators/version.txt +++ b/neurons/validators/version.txt @@ -1 +1 @@ -3.4.21 +3.4.22