From a6c2df1c380d6b9aee0f9f22b09b8460e2c5bd05 Mon Sep 17 00:00:00 2001 From: "radu.mutilica" Date: Fri, 10 Jan 2025 13:28:54 +0200 Subject: [PATCH 1/3] Validator misc changes - added logging instead of prints - added a proxy app for chat completions endpoint --- cortext/dendrite.py | 14 +++++--- cursor/app/core/dendrite.py | 1 + cursor/app/core/query_to_validator.py | 52 +++++++++++++++++++++------ cursor/app/endpoints/text.py | 3 +- cursor/start_cursor.sh | 2 +- validators/core/axon.py | 1 - validators/weight_setter.py | 33 ++++++++++++++--- 7 files changed, 82 insertions(+), 24 deletions(-) diff --git a/cortext/dendrite.py b/cortext/dendrite.py index de885103..f9de0733 100644 --- a/cortext/dendrite.py +++ b/cortext/dendrite.py @@ -8,7 +8,7 @@ import traceback import time from typing import Optional, List - +from loguru import logger from cortext import StreamPrompting @@ -30,6 +30,7 @@ async def call_stream( organic: bool = True ) -> AsyncGenerator[Any, Any]: start_time = time.time() + logger.info(f"Axon: {target_axon}") target_axon = ( target_axon.info() if isinstance(target_axon, bt.axon) @@ -38,13 +39,16 @@ async def call_stream( # Build request endpoint from the synapse class request_name = synapse.__class__.__name__ + # endpoint = ( + # f"0.0.0.0:{str(target_axon.port)}" + # if target_axon.ip == str(self.external_ip) + # else f"{target_axon.ip}:{str(target_axon.port)}" + # ) endpoint = ( - f"0.0.0.0:{str(target_axon.port)}" - if target_axon.ip == str(self.external_ip) - else f"{target_axon.ip}:{str(target_axon.port)}" + f"{target_axon.ip}:{str(target_axon.port)}" ) url = f"http://{endpoint}/{request_name}" - + logger.info(f"url: {url}") # Preprocess synapse for making a request synapse: StreamPrompting = self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore max_try = 0 diff --git a/cursor/app/core/dendrite.py b/cursor/app/core/dendrite.py index 7b3b9878..7f567837 100644 --- a/cursor/app/core/dendrite.py +++ b/cursor/app/core/dendrite.py @@ -52,6 +52,7 @@ async def call_stream( connector = aiohttp.TCPConnector(limit=200) session = aiohttp.ClientSession(timeout=timeout, connector=connector) try: + bt.logging.info(f"Attempting to connect to {url}") while max_try < 2: async with session.post( url, diff --git a/cursor/app/core/query_to_validator.py b/cursor/app/core/query_to_validator.py index ab3cf516..3ec4d9a7 100644 --- a/cursor/app/core/query_to_validator.py +++ b/cursor/app/core/query_to_validator.py @@ -6,44 +6,74 @@ from cursor.app.core.config import config from cortext.dendrite import CortexDendrite import traceback +from loguru import logger +import numpy as np subtensor = bt.subtensor(network="finney") meta = subtensor.metagraph(netuid=18) -print("metagraph synched!") +logger.info("metagraph synched!") # This needs to be your validator wallet that is running your subnet 18 validator wallet = bt.wallet(name=config.wallet_name, hotkey=config.wallet_hotkey) -print(f"wallet_name is {config.wallet_name}, hot_key is {config.wallet_hotkey}") +logger.info(f"wallet_name is {config.wallet_name}, hot_key is {config.wallet_hotkey}") dendrite = CortexDendrite(wallet=wallet) vali_uid = meta.hotkeys.index(wallet.hotkey.ss58_address) -axon_to_use = meta.axons[vali_uid] +# axon_to_use = meta.axons[vali_uid] +import threading +import time +top_incentive_axons = [] +logger.info(f"top_incentive_axons: {top_incentive_axons}") + +def update_top_incentive_axons(): + global top_incentive_axons + while True: + axons = meta.axons + incentives = meta.I + # Convert incentives to numpy array and get indices of top 50 + incentive_indices = np.argsort(np.array(incentives, dtype=np.float32))[-50:] + logger.info(f"incentive_indices: {incentive_indices}") + top_incentive_axons = [axons[int(i)] for i in incentive_indices] + logger.info(f"top_incentive_axons: {top_incentive_axons}") + time.sleep(600) # Sleep for 10 minutes + +def get_top_incentive_axons(): + global top_incentive_axons + return top_incentive_axons + +# Start background thread +thread = threading.Thread(target=update_top_incentive_axons, daemon=True) +thread.start() +import random async def query_miner(chat_request: ChatRequest) -> AsyncGenerator[str, None]: try: synapse = StreamPrompting(**chat_request.dict()) - + axon_to_use = random.choice(get_top_incentive_axons()) + logger.info("query_miner.synapse", synapse) resp = dendrite.call_stream( target_axon=axon_to_use, synapse=synapse, timeout=60 ) + logger.info("query_miner.resp", resp) async for chunk in resp: + logger.info("query_miner.chunk", chunk) if isinstance(chunk, str): obj = {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"delta":{"content":chunk},"index":0,"finish_reason":None}]} yield "data: " + json.dumps(obj) + "\n\n" - print(chunk, end='', flush=True) + logger.info(chunk, end='', flush=True) else: - print(f"\n\nFinal synapse: {chunk}\n") + logger.info(f"\n\nFinal synapse: {chunk}\n") yield "[DONE]" except Exception as e: - print(f"Exception during query: {traceback.format_exc()}") + logger.info(f"Exception during query: {traceback.format_exc()}") yield "Exception ocurred." async def query_miner_no_stream(chat_request: ChatRequest): try: synapse = StreamPrompting(**chat_request.dict()) - + axon_to_use = random.choice(get_top_incentive_axons()) resp = dendrite.call_stream( target_axon=axon_to_use, synapse=synapse, @@ -53,11 +83,11 @@ async def query_miner_no_stream(chat_request: ChatRequest): async for chunk in resp: if isinstance(chunk, str): full_resp += chunk - print(chunk, end='', flush=True) + logger.info(chunk, end='', flush=True) else: - print(f"\n\nFinal synapse: {chunk}\n") + logger.info(f"\n\nFinal synapse: {chunk}\n") return full_resp except Exception as e: - print(f"Exception during query: {traceback.format_exc()}") + logger.info(f"Exception during query: {traceback.format_exc()}") return "" \ No newline at end of file diff --git a/cursor/app/endpoints/text.py b/cursor/app/endpoints/text.py index 03e4f854..9df5b13f 100644 --- a/cursor/app/endpoints/text.py +++ b/cursor/app/endpoints/text.py @@ -7,11 +7,12 @@ from cursor.app.core.query_to_validator import query_miner, query_miner_no_stream import bittensor as bt import traceback - +from loguru import logger async def chat( chat_request: ChatRequest ) -> StreamingResponse | JSONResponse: + logger.info(chat_request) try: if chat_request.stream: return StreamingResponse(query_miner(chat_request), media_type="text/event-stream") diff --git a/cursor/start_cursor.sh b/cursor/start_cursor.sh index 78f46207..559250f3 100644 --- a/cursor/start_cursor.sh +++ b/cursor/start_cursor.sh @@ -1 +1 @@ -uvicorn cursor.app.main:app --port 8001 --reload --host 0.0.0.0 \ No newline at end of file +uvicorn cursor.app.main:app --port 8000 --reload --host 0.0.0.0 diff --git a/validators/core/axon.py b/validators/core/axon.py index 4d0f8753..cea2b9a9 100644 --- a/validators/core/axon.py +++ b/validators/core/axon.py @@ -11,7 +11,6 @@ from inspect import Signature -from cursor.app.core.query_to_validator import axon_to_use from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI, APIRouter, Request, Response, Depends diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 672eb114..1f77fce9 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -30,11 +30,19 @@ from cursor.app.endpoints.text import chat from cursor.app.endpoints.generic import models from cursor.app.core.middleware import APIKeyMiddleware +from loguru import logger +from concurrent.futures import ThreadPoolExecutor +logger.add("logs/weight_setter.log") + +logger.info("WeightSetter.......") scoring_organic_timeout = 60 NUM_INTERVALS_PER_CYCLE = 10 +# bt.logging.enable_default() +# bt.logging.info("WeightSetter.......") +# bt.logging.enable_trace() class WeightSetter: def __init__(self, config, cache: QueryResponseCache, loop=None): @@ -215,8 +223,9 @@ async def handle_response(resp): metagraph=self.metagraph) }) query_syn.time_taken = query_syn.dendrite.process_time - - axon = self.metagraph.axons[uid] + logger.info(f"query_miner.query_syn: {query_syn}") + print(self.metagraph.axons[:10]) + axon = self.metagraph.axons[131] response = self.dendrite.call_stream( target_axon=axon, synapse=query_syn, @@ -449,7 +458,7 @@ def base_blacklist(self, synapse, blacklist_amt=20000) -> Tuple[bool, str]: bt.logging.exception(err) async def images(self, synapse: ImageResponse) -> ImageResponse: - bt.logging.info(f"Received {synapse}") + logger.info(f"Received {synapse}") axon = self.metagraph.axons[synapse.uid] start_time = time.time() @@ -493,7 +502,7 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings: return synapse_response async def prompt(self, synapse: StreamPrompting) -> StreamingSynapse.BTStreamingResponse: - bt.logging.info(f"Received {synapse}") + logger.info(f"Received {synapse}") contents = " ".join([message.get("content") for message in synapse.messages]) if len(contents.split()) > 10000: raise HTTPException(status_code=413, detail="Request entity too large") @@ -563,11 +572,25 @@ async def consume_organic_queries(self): ) self.cursor_setup() self.axon.serve(netuid=self.netuid, subtensor=self.subtensor) - print(f"axon: {self.axon}") + logger.info(f"axon: {self.axon}") self.axon.start() bt.logging.info(f"Running validator on uid: {self.my_uid}") def cursor_setup(self): + from fastapi import FastAPI + import uvicorn + self.proxy_app = FastAPI() + self.proxy_app.add_api_route( + "/v1/chat/completions", + chat, + methods=["POST", "OPTIONS"], + tags=["StreamPrompting"], + response_model=None + ) + self.proxy_app.add_api_route("/v1/models", models, methods=["GET"], tags=["Text"], response_model=None) + # Run this app in uvicorn threadpool + threadpool_executor = ThreadPoolExecutor(max_workers=1) + threadpool_executor.submit(uvicorn.run(self.proxy_app, host="0.0.0.0", port=8000)) self.axon.router.add_api_route( "/v1/chat/completions", chat, From 2ad4da8b2e6d0cd2b6fb789d95ef2c22987e6fb0 Mon Sep 17 00:00:00 2001 From: "radu.mutilica" Date: Sat, 11 Jan 2025 23:09:38 +0200 Subject: [PATCH 2/3] Removed debug lines lol --- validators/weight_setter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 1f77fce9..19bf21be 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -224,8 +224,7 @@ async def handle_response(resp): }) query_syn.time_taken = query_syn.dendrite.process_time logger.info(f"query_miner.query_syn: {query_syn}") - print(self.metagraph.axons[:10]) - axon = self.metagraph.axons[131] + axon = random.choice(self.metagraph.axons[:15]) response = self.dendrite.call_stream( target_axon=axon, synapse=query_syn, From 073ff4cfda8b03758436e185103d1204f604bec7 Mon Sep 17 00:00:00 2001 From: "radu.mutilica" Date: Sat, 11 Jan 2025 23:10:26 +0200 Subject: [PATCH 3/3] Removed fluff --- validators/weight_setter.py | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 19bf21be..af23ffc9 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -1,49 +1,38 @@ import asyncio -import copy import random import threading -import json - -import torch import time - from collections import defaultdict -from substrateinterface import SubstrateInterface +from concurrent.futures import ThreadPoolExecutor from functools import partial from typing import Tuple -from fastapi import HTTPException import bittensor as bt +import torch from bittensor import StreamingSynapse - -import cortext +from fastapi import HTTPException +from loguru import logger from starlette.types import Send +from substrateinterface import SubstrateInterface -from cortext.protocol import IsAlive, StreamPrompting, ImageResponse, Embeddings +import cortext +from cortext.dendrite import CortexDendrite from cortext.metaclasses import ValidatorRegistryMeta -from validators.services import CapacityService, BaseValidator, TextValidator, ImageValidator +from cortext.protocol import IsAlive, StreamPrompting, ImageResponse, Embeddings +from cursor.app.endpoints.generic import models +from cursor.app.endpoints.text import chat +from validators.core.axon import CortexAxon +from validators.services import CapacityService from validators.services.cache import QueryResponseCache -from validators.utils import error_handler, setup_max_capacity, load_entire_questions from validators.task_manager import TaskMgr -from validators.core.axon import CortexAxon -from cortext.dendrite import CortexDendrite -from cursor.app.endpoints.text import chat -from cursor.app.endpoints.generic import models -from cursor.app.core.middleware import APIKeyMiddleware -from loguru import logger -from concurrent.futures import ThreadPoolExecutor +from validators.utils import setup_max_capacity, load_entire_questions logger.add("logs/weight_setter.log") - logger.info("WeightSetter.......") scoring_organic_timeout = 60 NUM_INTERVALS_PER_CYCLE = 10 -# bt.logging.enable_default() -# bt.logging.info("WeightSetter.......") -# bt.logging.enable_trace() - class WeightSetter: def __init__(self, config, cache: QueryResponseCache, loop=None):