Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator misc changes #113

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cortext/dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import traceback
import time
from typing import Optional, List

from loguru import logger
from cortext import StreamPrompting


Expand All @@ -30,6 +30,7 @@ async def call_stream(
organic: bool = True
) -> AsyncGenerator[Any, Any]:
start_time = time.time()
logger.info(f"Axon: {target_axon}")
target_axon = (
target_axon.info()
if isinstance(target_axon, bt.axon)
Expand All @@ -38,13 +39,16 @@ async def call_stream(

# Build request endpoint from the synapse class
request_name = synapse.__class__.__name__
# endpoint = (
# f"0.0.0.0:{str(target_axon.port)}"
# if target_axon.ip == str(self.external_ip)
# else f"{target_axon.ip}:{str(target_axon.port)}"
# )
endpoint = (
f"0.0.0.0:{str(target_axon.port)}"
if target_axon.ip == str(self.external_ip)
else f"{target_axon.ip}:{str(target_axon.port)}"
f"{target_axon.ip}:{str(target_axon.port)}"
)
url = f"http://{endpoint}/{request_name}"

logger.info(f"url: {url}")
# Preprocess synapse for making a request
synapse: StreamPrompting = self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore
max_try = 0
Expand Down
1 change: 1 addition & 0 deletions cursor/app/core/dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async def call_stream(
connector = aiohttp.TCPConnector(limit=200)
session = aiohttp.ClientSession(timeout=timeout, connector=connector)
try:
bt.logging.info(f"Attempting to connect to {url}")
while max_try < 2:
async with session.post(
url,
Expand Down
52 changes: 41 additions & 11 deletions cursor/app/core/query_to_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,74 @@
from cursor.app.core.config import config
from cortext.dendrite import CortexDendrite
import traceback
from loguru import logger
import numpy as np

subtensor = bt.subtensor(network="finney")
meta = subtensor.metagraph(netuid=18)
print("metagraph synched!")
logger.info("metagraph synched!")

# This needs to be your validator wallet that is running your subnet 18 validator
wallet = bt.wallet(name=config.wallet_name, hotkey=config.wallet_hotkey)
print(f"wallet_name is {config.wallet_name}, hot_key is {config.wallet_hotkey}")
logger.info(f"wallet_name is {config.wallet_name}, hot_key is {config.wallet_hotkey}")
dendrite = CortexDendrite(wallet=wallet)
vali_uid = meta.hotkeys.index(wallet.hotkey.ss58_address)
axon_to_use = meta.axons[vali_uid]
# axon_to_use = meta.axons[vali_uid]
import threading
import time

top_incentive_axons = []
logger.info(f"top_incentive_axons: {top_incentive_axons}")

def update_top_incentive_axons():
global top_incentive_axons
while True:
axons = meta.axons
incentives = meta.I
# Convert incentives to numpy array and get indices of top 50
incentive_indices = np.argsort(np.array(incentives, dtype=np.float32))[-50:]
logger.info(f"incentive_indices: {incentive_indices}")
top_incentive_axons = [axons[int(i)] for i in incentive_indices]
logger.info(f"top_incentive_axons: {top_incentive_axons}")
time.sleep(600) # Sleep for 10 minutes

def get_top_incentive_axons():
global top_incentive_axons
return top_incentive_axons

# Start background thread
thread = threading.Thread(target=update_top_incentive_axons, daemon=True)
thread.start()
import random

async def query_miner(chat_request: ChatRequest) -> AsyncGenerator[str, None]:
try:
synapse = StreamPrompting(**chat_request.dict())

axon_to_use = random.choice(get_top_incentive_axons())
logger.info("query_miner.synapse", synapse)
resp = dendrite.call_stream(
target_axon=axon_to_use,
synapse=synapse,
timeout=60
)
logger.info("query_miner.resp", resp)
async for chunk in resp:
logger.info("query_miner.chunk", chunk)
if isinstance(chunk, str):
obj = {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"delta":{"content":chunk},"index":0,"finish_reason":None}]}
yield "data: " + json.dumps(obj) + "\n\n"
print(chunk, end='', flush=True)
logger.info(chunk, end='', flush=True)
else:
print(f"\n\nFinal synapse: {chunk}\n")
logger.info(f"\n\nFinal synapse: {chunk}\n")
yield "[DONE]"
except Exception as e:
print(f"Exception during query: {traceback.format_exc()}")
logger.info(f"Exception during query: {traceback.format_exc()}")
yield "Exception ocurred."

async def query_miner_no_stream(chat_request: ChatRequest):
try:
synapse = StreamPrompting(**chat_request.dict())

axon_to_use = random.choice(get_top_incentive_axons())
resp = dendrite.call_stream(
target_axon=axon_to_use,
synapse=synapse,
Expand All @@ -53,11 +83,11 @@ async def query_miner_no_stream(chat_request: ChatRequest):
async for chunk in resp:
if isinstance(chunk, str):
full_resp += chunk
print(chunk, end='', flush=True)
logger.info(chunk, end='', flush=True)
else:
print(f"\n\nFinal synapse: {chunk}\n")
logger.info(f"\n\nFinal synapse: {chunk}\n")
return full_resp

except Exception as e:
print(f"Exception during query: {traceback.format_exc()}")
logger.info(f"Exception during query: {traceback.format_exc()}")
return ""
3 changes: 2 additions & 1 deletion cursor/app/endpoints/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from cursor.app.core.query_to_validator import query_miner, query_miner_no_stream
import bittensor as bt
import traceback

from loguru import logger

async def chat(
chat_request: ChatRequest
) -> StreamingResponse | JSONResponse:
logger.info(chat_request)
try:
if chat_request.stream:
return StreamingResponse(query_miner(chat_request), media_type="text/event-stream")
Expand Down
2 changes: 1 addition & 1 deletion cursor/start_cursor.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
uvicorn cursor.app.main:app --port 8001 --reload --host 0.0.0.0
uvicorn cursor.app.main:app --port 8000 --reload --host 0.0.0.0
1 change: 0 additions & 1 deletion validators/core/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from inspect import Signature

from cursor.app.core.query_to_validator import axon_to_use
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, APIRouter, Request, Response, Depends
Expand Down
57 changes: 34 additions & 23 deletions validators/weight_setter.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
import asyncio
import copy
import random
import threading
import json

import torch
import time

from collections import defaultdict
from substrateinterface import SubstrateInterface
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Tuple

from fastapi import HTTPException
import bittensor as bt
import torch
from bittensor import StreamingSynapse

import cortext
from fastapi import HTTPException
from loguru import logger
from starlette.types import Send
from substrateinterface import SubstrateInterface

from cortext.protocol import IsAlive, StreamPrompting, ImageResponse, Embeddings
import cortext
from cortext.dendrite import CortexDendrite
from cortext.metaclasses import ValidatorRegistryMeta
from validators.services import CapacityService, BaseValidator, TextValidator, ImageValidator
from cortext.protocol import IsAlive, StreamPrompting, ImageResponse, Embeddings
from cursor.app.endpoints.generic import models
from cursor.app.endpoints.text import chat
from validators.core.axon import CortexAxon
from validators.services import CapacityService
from validators.services.cache import QueryResponseCache
from validators.utils import error_handler, setup_max_capacity, load_entire_questions
from validators.task_manager import TaskMgr
from validators.core.axon import CortexAxon
from cortext.dendrite import CortexDendrite
from cursor.app.endpoints.text import chat
from cursor.app.endpoints.generic import models
from cursor.app.core.middleware import APIKeyMiddleware
from validators.utils import setup_max_capacity, load_entire_questions

logger.add("logs/weight_setter.log")
logger.info("WeightSetter.......")

scoring_organic_timeout = 60
NUM_INTERVALS_PER_CYCLE = 10


class WeightSetter:
def __init__(self, config, cache: QueryResponseCache, loop=None):

Expand Down Expand Up @@ -215,8 +212,8 @@ async def handle_response(resp):
metagraph=self.metagraph)
})
query_syn.time_taken = query_syn.dendrite.process_time

axon = self.metagraph.axons[uid]
logger.info(f"query_miner.query_syn: {query_syn}")
axon = random.choice(self.metagraph.axons[:15])
response = self.dendrite.call_stream(
target_axon=axon,
synapse=query_syn,
Expand Down Expand Up @@ -449,7 +446,7 @@ def base_blacklist(self, synapse, blacklist_amt=20000) -> Tuple[bool, str]:
bt.logging.exception(err)

async def images(self, synapse: ImageResponse) -> ImageResponse:
bt.logging.info(f"Received {synapse}")
logger.info(f"Received {synapse}")

axon = self.metagraph.axons[synapse.uid]
start_time = time.time()
Expand Down Expand Up @@ -493,7 +490,7 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings:
return synapse_response

async def prompt(self, synapse: StreamPrompting) -> StreamingSynapse.BTStreamingResponse:
bt.logging.info(f"Received {synapse}")
logger.info(f"Received {synapse}")
contents = " ".join([message.get("content") for message in synapse.messages])
if len(contents.split()) > 10000:
raise HTTPException(status_code=413, detail="Request entity too large")
Expand Down Expand Up @@ -563,11 +560,25 @@ async def consume_organic_queries(self):
)
self.cursor_setup()
self.axon.serve(netuid=self.netuid, subtensor=self.subtensor)
print(f"axon: {self.axon}")
logger.info(f"axon: {self.axon}")
self.axon.start()
bt.logging.info(f"Running validator on uid: {self.my_uid}")

def cursor_setup(self):
from fastapi import FastAPI
import uvicorn
self.proxy_app = FastAPI()
self.proxy_app.add_api_route(
"/v1/chat/completions",
chat,
methods=["POST", "OPTIONS"],
tags=["StreamPrompting"],
response_model=None
)
self.proxy_app.add_api_route("/v1/models", models, methods=["GET"], tags=["Text"], response_model=None)
# Run this app in uvicorn threadpool
threadpool_executor = ThreadPoolExecutor(max_workers=1)
threadpool_executor.submit(uvicorn.run(self.proxy_app, host="0.0.0.0", port=8000))
self.axon.router.add_api_route(
"/v1/chat/completions",
chat,
Expand Down