From 3d9fdd27aea1764af52ac0eba6eba59b710edd76 Mon Sep 17 00:00:00 2001 From: sal Date: Thu, 30 Jan 2025 09:07:19 -0500 Subject: [PATCH] network refactor --- commune/module.py | 3 +- commune/server/manager.py | 159 --------------------------- commune/server/network.py | 206 +++++++++++++++++++++++++++++++---- commune/server/server.py | 59 +++++----- commune/utils/os.py | 160 ++++++++++++++++----------- commune/utils/storage.py | 1 + modules/agent/agent.py | 6 +- modules/builder/builder.py | 10 +- modules/subspace/subspace.py | 18 +-- 9 files changed, 326 insertions(+), 296 deletions(-) delete mode 100644 commune/server/manager.py diff --git a/commune/module.py b/commune/module.py index 73b739d6..ff338895 100755 --- a/commune/module.py +++ b/commune/module.py @@ -48,7 +48,6 @@ class c: "client": 'server.client', 'network': 'server.network', 'local': 'server.network', - 'network.local': 'server.network', } splitters = [':', '/', '.'] route_cache = None @@ -1580,7 +1579,7 @@ def new_module( cls, add_module = new_module def build(self, *args, **kwargs): - return c.module('builder')().forward(*args, **kwargs) + return c.module('builder')().build(*args, **kwargs) @classmethod def filter(cls, text_list: List[str], filter_text: str) -> List[str]: diff --git a/commune/server/manager.py b/commune/server/manager.py deleted file mode 100644 index aa81adb6..00000000 --- a/commune/server/manager.py +++ /dev/null @@ -1,159 +0,0 @@ - -import os -import commune as c -import commune as c -from typing import * -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from sse_starlette.sse import EventSourceResponse -import uvicorn -import os -import json -import asyncio -from .network import Network - -class Manager: - description = 'Process manager manages processes using pm2' - pm2_dir = os.path.expanduser('~/.pm2') - - def __init__(self, network='local', **kwargs): - self.net = Network(network=network) - self.ensure_env() - attrs = ['add_server', 'rm_server', 'namespace', 'modules'] - self.add_server = self.net.add_server - self.rm_server = self.net.rm_server - self.namespace = self.net.namespace - - def kill(self, name:str, verbose:bool = True, **kwargs): - try: - if name == 'all': - return self.kill_all(verbose=verbose) - c.cmd(f"pm2 delete {name}", verbose=False) - self.rm_logs(name) - result = {'message':f'Killed {name}', 'success':True} - except Exception as e: - result = {'message':f'Error killing {name}', 'success':False, 'error':e} - c.rm_server(name) - return result - - def kill_all(self, verbose:bool = True, timeout=20): - servers = self.processes() - futures = [c.submit(self.kill, kwargs={'name':s, 'update': False}) for s in servers] - results = c.wait(futures, timeout=timeout) - return results - - def killall(self, **kwargs): - return self.kill_all(**kwargs) - - def logs_path_map(self, name=None): - logs_path_map = {} - for l in c.ls(f'{self.pm2_dir}/logs/'): - key = '-'.join(l.split('/')[-1].split('-')[:-1]).replace('-',':') - logs_path_map[key] = logs_path_map.get(key, []) + [l] - for k in logs_path_map.keys(): - logs_path_map[k] = {l.split('-')[-1].split('.')[0]: l for l in list(logs_path_map[k])} - if name != None: - return logs_path_map.get(name, {}) - return logs_path_map - - - def rm_logs( self, name): - logs_map = self.logs_path_map(name) - for k in logs_map.keys(): - c.rm(logs_map[k]) - - def logs(self, module:str, tail: int =100, mode: str ='cmd', **kwargs): - if mode == 'local': - text = '' - for m in ['out','error']: - # I know, this is fucked - path = f'{self.pm2_dir}/logs/{module.replace("/", "-")}-{m}.log'.replace(':', '-').replace('_', '-') - try: - text += c.get_text(path, tail=tail) - except Exception as e: - c.print('ERROR GETTING LOGS -->' , e) - continue - return text - elif mode == 'cmd': - return c.cmd(f"pm2 logs {module}") - else: - raise NotImplementedError(f'mode {mode} not implemented') - - - def kill_many(self, search=None, verbose:bool = True, timeout=10): - futures = [] - for name in c.servers(search=search): - f = c.submit(c.kill, dict(name=name, verbose=verbose), timeout=timeout) - futures.append(f) - return c.wait(futures) - - - def start(self, - fn: str = 'serve', - module:str = None, - name:Optional[str]=None, - args : list = None, - kwargs: dict = None, - interpreter:str='python3', - autorestart: bool = True, - verbose: bool = False , - force:bool = True, - run_fn: str = 'run_fn', - cwd : str = None, - env : Dict[str, str] = None, - refresh:bool=True , - **extra_kwargs): - env = env or {} - if '/' in fn: - module, fn = fn.split('/') - module = module or self.module_name() - name = name or module - if refresh: - self.kill(name) - cmd = f"pm2 start {c.filepath()} --name {name} --interpreter {interpreter}" - cmd = cmd if autorestart else ' --no-autorestart' - cmd = cmd + ' -f ' if force else cmd - kwargs = { - 'module': module , - 'fn': fn, - 'args': args or [], - 'kwargs': kwargs or {} - } - - kwargs_str = json.dumps(kwargs).replace('"', "'") - cmd = cmd + f' -- --fn {run_fn} --kwargs "{kwargs_str}"' - stdout = c.cmd(cmd, env=env, verbose=verbose, cwd=cwd) - return {'success':True, 'msg':f'Launched {module}', 'cmd': cmd, 'stdout':stdout} - - def restart(self, name:str): - assert name in self.processes() - c.print(f'Restarting {name}', color='cyan') - c.cmd(f"pm2 restart {name}", verbose=False) - self.rm_logs(name) - return {'success':True, 'message':f'Restarted {name}'} - - def processes(self, search=None, **kwargs) -> List[str]: - output_string = c.cmd('pm2 status', verbose=False) - module_list = [] - for line in output_string.split('\n')[3:]: - if line.count('│') > 2: - name = line.split('│')[2].strip() - module_list += [name] - if search != None: - module_list = [m for m in module_list if search in m] - module_list = sorted(list(set(module_list))) - return module_list - - def procs(self, **kwargs): - return self.processes(**kwargs) - - def exists(self, name:str, **kwargs) -> bool: - return name in self.processes(**kwargs) - - def ensure_env(self,**kwargs): - '''ensure that the environment variables are set for the process''' - is_pm2_installed = bool( '/bin/pm2' in c.cmd('which pm2', verbose=False)) - if not is_pm2_installed: - c.cmd('npm install -g pm2') - c.cmd('pm2 update') - return {'success':True, 'message':f'Ensured env '} diff --git a/commune/server/network.py b/commune/server/network.py index 907c7ca5..b591dbef 100644 --- a/commune/server/network.py +++ b/commune/server/network.py @@ -1,18 +1,183 @@ -from typing import * + import os import commune as c -class Network(c.Module): +import commune as c +from typing import * +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from sse_starlette.sse import EventSourceResponse +import uvicorn +import os +import json +import asyncio + +class Network: + description = 'Process manager manages processes using pm2' + pm2_dir = os.path.expanduser('~/.pm2') + + def __init__(self, network='local', process_prefix='server', **kwargs): + + self.set_network(network) + self.process_prefix = process_prefix + '/' + network + '/' + print(f'Process prefix: {self.process_prefix}') + self.ensure_env() + + def kill(self, name:str, verbose:bool = True, **kwargs): + try: + if name == 'all': + return self.kill_all(verbose=verbose) + c.cmd(f"pm2 delete {name}", verbose=False) + self.rm_logs(name) + result = {'message':f'Killed {name}', 'success':True} + except Exception as e: + result = {'message':f'Error killing {name}', 'success':False, 'error':e} + c.rm_server(name) + return result + + def kill_all(self, verbose:bool = True, timeout=20): + servers = self.processes() + futures = [c.submit(self.kill, kwargs={'name':s, 'update': False}) for s in servers] + results = c.wait(futures, timeout=timeout) + return results + + def killall(self, **kwargs): + return self.kill_all(**kwargs) + + def logs_path_map(self, name=None): + logs_path_map = {} + for l in c.ls(f'{self.pm2_dir}/logs/'): + key = '-'.join(l.split('/')[-1].split('-')[:-1]).replace('-',':') + logs_path_map[key] = logs_path_map.get(key, []) + [l] + for k in logs_path_map.keys(): + logs_path_map[k] = {l.split('-')[-1].split('.')[0]: l for l in list(logs_path_map[k])} + if name != None: + return logs_path_map.get(name, {}) + return logs_path_map + + + def rm_logs( self, name): + logs_map = self.logs_path_map(name) + for k in logs_map.keys(): + c.rm(logs_map[k]) + + def logs(self, module:str, tail: int =100, mode: str ='cmd', **kwargs): + + if mode == 'local': + text = '' + for m in ['out','error']: + # I know, this is fucked + path = f'{self.pm2_dir}/logs/{module.replace("/", "-")}-{m}.log'.replace(':', '-').replace('_', '-') + try: + text += c.get_text(path, tail=tail) + except Exception as e: + c.print('ERROR GETTING LOGS -->' , e) + continue + return text + elif mode == 'cmd': + return c.cmd(f"pm2 logs {module}") + else: + raise NotImplementedError(f'mode {mode} not implemented') + + + def kill_many(self, search=None, verbose:bool = True, timeout=10): + futures = [] + for name in c.servers(search=search): + f = c.submit(c.kill, dict(name=name, verbose=verbose), timeout=timeout) + futures.append(f) + return c.wait(futures) + + + def servers(self, search=None, **kwargs) -> List[str]: + return list(self.namespace(search=search, **kwargs).keys()) + + def start(self, + fn: str = 'serve', + module:str = None, + name:Optional[str]=None, + args : list = None, + kwargs: dict = None, + interpreter:str='python3', + autorestart: bool = True, + verbose: bool = False , + force:bool = True, + run_fn: str = 'run_fn', + cwd : str = None, + env : Dict[str, str] = None, + refresh:bool=True , + **extra_kwargs): + env = env or {} + if '/' in fn: + module, fn = fn.split('/') + module = module or self.module_name() + name = name or module + if refresh: + self.kill(name) + + name = self.process_prefix + name + cmd = f"pm2 start {c.filepath()} --name {name} --interpreter {interpreter}" + cmd = cmd if autorestart else ' --no-autorestart' + cmd = cmd + ' -f ' if force else cmd + kwargs = { + 'module': module , + 'fn': fn, + 'args': args or [], + 'kwargs': kwargs or {} + } + + kwargs_str = json.dumps(kwargs).replace('"', "'") + cmd = cmd + f' -- --fn {run_fn} --kwargs "{kwargs_str}"' + stdout = c.cmd(cmd, env=env, verbose=verbose, cwd=cwd) + return {'success':True, 'msg':f'Launched {module}', 'cmd': cmd, 'stdout':stdout} + + def restart(self, name:str): + assert name in self.processes() + c.print(f'Restarting {name}', color='cyan') + c.cmd(f"pm2 restart {name}", verbose=False) + self.rm_logs(name) + return {'success':True, 'message':f'Restarted {name}'} + + def processes(self, search=None, **kwargs) -> List[str]: + output_string = c.cmd('pm2 status') + processes = [] + tag = ' default ' + for line in output_string.split('\n'): + if tag in line: + name = line.split(tag)[0].strip() + name = name.split(' ')[-1] + processes += [name] + if search != None: + search = self.resolve_name(search) + processes = [m for m in processes if search in m] + processes = sorted(list(set(processes))) + return processes + + def procs(self, **kwargs): + return self.processes(**kwargs) + + def resolve_name(self, name:str, **kwargs) -> str: + if name == None: + return name + return self.process_prefix + name + + def exists(self, name:str, **kwargs) -> bool: + name = self.resolve_name(name) + return name in self.processes(**kwargs) + + def ensure_env(self,**kwargs): + '''ensure that the environment variables are set for the process''' + is_pm2_installed = bool( '/bin/pm2' in c.cmd('which pm2', verbose=False)) + if not is_pm2_installed: + c.cmd('npm install -g pm2') + c.cmd('pm2 update') + return {'success':True, 'message':f'Ensured env '} + + min_stake = 0 block_time = 8 endpoints = ['namespace'] - def __init__(self, - network:str='local', - tempo=60, - n = 100, - path=None, - **kwargs): - self.set_network(network=network, tempo=tempo, path=path, n=n) + def resolve_path(self, path:str) -> str: + return c.resolve_path('~/.commune/network/' + path) def set_network(self, network:str, tempo:int=60, @@ -21,7 +186,7 @@ def set_network(self, **kwargs): self.network = network self.tempo = tempo - self.n = self.n + self.n = n self.network_path = self.resolve_path(path or f'{self.network}') self.modules_path = f'{self.network_path}/modules' return {'network': self.network, @@ -57,16 +222,16 @@ def modules(self, modules = [m for m in modules if search in m['name']] return modules - def resolve_max_age(self, max_age=None): - return max_age or self.tempo - def namespace(self, search=None, max_age:int = None, update:bool = False, **kwargs) -> dict: - return {m['name']: '0.0.0.0' + ':' + m['url'].split(':')[-1] for m in self.modules(search=search, max_age=self.resolve_max_age(max_age), update=update)} + processes = self.processes(search=search, **kwargs) + modules = self.modules(search=search, max_age=max_age, update=update, **kwargs) + processes = [ p.replace(self.process_prefix, '') for p in processes if p.startswith(self.process_prefix)] + namespace = {m['name']: m['url'] for m in modules if m['name'] in processes} + return namespace def add_server(self, name:str, url:str, key:str) -> None: - data = {'name': name, 'url': url, 'key': key} modules = self.modules() - modules.append(data) + modules.append( {'name': name, 'url': url, 'key': key}) c.put(self.modules_path, modules) return {'success': True, 'msg': f'Block {name}.'} @@ -78,13 +243,6 @@ def rm_server(self, name:str, features=['name', 'key', 'url']) -> Dict: def resolve_network(self, network:str) -> str: return network or self.network - def servers(self, search=None, **kwargs) -> List[str]: - return list( self.namespace(search=search,**kwargs).keys()) - def server_exists(self, name:str, **kwargs) -> bool: servers = self.servers(**kwargs) - return bool(name in servers) - - def infos(self, *args, **kwargs) -> Dict: - return self.modules(*args, **kwargs) - + return bool(name in servers) \ No newline at end of file diff --git a/commune/server/server.py b/commune/server/server.py index d8fb5f54..fd950076 100644 --- a/commune/server/server.py +++ b/commune/server/server.py @@ -10,12 +10,11 @@ class Server: network = 'subspace' - max_user_data_age = 3600 # the lifetime of the user call data + max_user_history_age = 3600 # the lifetime of the user call data max_network_age: int = 60 # (in seconds) the time it takes for. the network to refresh helper_functions = ['info', 'schema', 'functions', 'forward'] # the helper functions function_attributes =['endpoints', 'functions', "exposed_functions",'server_functions', 'public_functions', 'pubfns'] # the attributes for the functions - manager = c.module("server.manager")() - net = {'local': c.module('server.network')()} + server_network = c.module('server.network')() def __init__( self, module: Union[c.Module, object] = None, @@ -92,7 +91,7 @@ def forward(fn: str, request: Request): # add the endpoints to the app self.app.post("/{fn}")(forward) c.print(f'Served(name={self.module.name}, url={self.module.url}, key={self.module.key.key_address})', color='purple') - self.manager.add_server(name=self.module.name, url=self.module.url, key=self.module.key.ss58_address) + self.server_network.add_server(name=self.module.name, url=self.module.url, key=self.module.key.ss58_address) print(f'Network: {self.network}') uvicorn.run(self.app, host='0.0.0.0', port=self.module.port, loop='asyncio') @@ -132,14 +131,13 @@ def set_functions(self, functions:Optional[List[str]] ): return {'success':True, 'message':f'Set functions to {functions}'} def set_port(self, port:Optional[int]=None, port_attributes = ['port', 'server_port']): - m = self.manager name = self.module.name for k in port_attributes: if hasattr(self.module, k): port = getattr(self.module, k) break if port in [None, 'None']: - namespace = self.manager.namespace() + namespace = self.server_network.namespace() if name in namespace: m.kill(name) try: @@ -225,6 +223,9 @@ def generator_wrapper(generator): def resolve_path(self, path): return c.storage_path + '/' + self.module_name() + '/' + path + @classmethod + def processes(cls): + return cls.server_network.processes() state = {} def sync_network(self, network=None): @@ -265,7 +266,7 @@ def wait_for_server(cls, future = c.submit(c.logs, [name]) while time_waiting < timeout: - namespace = cls.net['local'].namespace(network=network, max_age=max_age) + namespace = cls.server_network.namespace(network=network, max_age=max_age) if name in namespace: try: result = c.call(namespace[name]+'/info') @@ -300,7 +301,7 @@ def serve(cls, if remote and isinstance(module, str): rkwargs = {k : v for k, v in c.locals2kwargs(locals()).items() if k not in ['extra_params', 'response', 'namespace']} rkwargs['remote'] = False - cls.manager.start( module="server", fn='serve', name=name, kwargs=rkwargs, cwd=cwd) + cls.server_network.start( module="server", fn='serve', name=name, kwargs=rkwargs, cwd=cwd) return cls.wait_for_server(name) return Server(module=module, name=name, functions=functions, params=params, port=port, key=key, run_api=1) @@ -317,19 +318,23 @@ def test(cls, **kwargs): @classmethod def kill_all(cls): - return cls.manager.kill_all() + return cls.server_network.kill_all() @classmethod def kill(cls, name, **kwargs): - return cls.manager.kill(name, **kwargs) + return cls.server_network.kill(name, **kwargs) @classmethod def server_exists(cls, name): - return cls.net['local'].server_exists(name) + return cls.server_network.server_exists(name) + + @classmethod + def servers(cls, **kwargs): + return cls.server_network.servers(**kwargs) @classmethod def logs(cls, name, **kwargs): - return cls.manager.logs(name, **kwargs) + return cls.server_network.logs(name, **kwargs) def is_admin(self, key_address): return c.is_admin(key_address) @@ -379,11 +384,6 @@ def gate(self, 'cost': self.module.fn2cost.get(fn, 1) } - def users(self): - try: - return os.listdir(self.history_path) - except: - return [] def user_call_path2latency(self, key_address): user_paths = self.call_paths(key_address) @@ -397,13 +397,13 @@ def get_call_data_path(self, key_address): def call_rate(self, key_address, max_age = 60): path2latency = self.user_call_path2latency(key_address) for path, latency in path2latency.items(): - if latency > self.max_user_data_age: - c.print(f'RemovingUserPath(path={path} latency(s)=({latency}/{self.max_user_data_age})') + if latency > self.max_user_history_age: + c.print(f'RemovingUserPath(path={path} latency(s)=({latency}/{self.max_user_history_age})') if os.path.exists(path): os.remove(path) return len(self.call_paths(key_address)) - def user_data(self, key_address, stream=False): + def user_history(self, key_address, stream=False): call_paths = self.call_paths(key_address) if stream: def stream_fn(): @@ -417,8 +417,8 @@ def user2fn2calls(self): user2fn2calls = {} for user in self.users(): user2fn2calls[user] = {} - for user_data in self.user_data(user): - fn = user_data['fn'] + for user_history in self.user_history(user): + fn = user_history['fn'] user2fn2calls[user][fn] = user2fn2calls[user].get(fn, 0) + 1 return user2fn2calls @@ -429,16 +429,19 @@ def call_paths(self, key_address ): def users(self): return os.listdir(self.history_path) - - def history(self, user): - return self.user_data(user) - + def history(self, module=None , simple=True): + module = module or self.module.name + all_history = {} + users = self.users() + for user in users: + all_history[user] = self.user_history(user) + if simple: + all_history[user].pop('output') + return all_history @classmethod def all_history(cls, module=None): self = cls(module=module, run_api=False) all_history = {} - for user in self.users(): - all_history[user] = self.history(user) return all_history def path2time(self, path:str) -> float: diff --git a/commune/utils/os.py b/commune/utils/os.py index 6b229850..5a30628a 100644 --- a/commune/utils/os.py +++ b/commune/utils/os.py @@ -424,23 +424,8 @@ def get_env(key:str): def cwd(): return os.getcwd() -def stream_output(process, verbose=True): - try: - modes = ['stdout', 'stderr'] - for mode in modes: - pipe = getattr(process, mode) - if pipe == None: - continue - # print byte wise - for ch in iter(lambda: pipe.read(1), b''): - ch = ch.decode('utf-8') - if verbose: - print(ch, end='') - yield ch - except Exception as e: - print(e) - finally: - kill_process(process) + + def proc(command:str, *extra_commands, verbose:bool = False, **kwargs): process = subprocess.Popen(shlex.split(command, *extra_commands), @@ -450,56 +435,105 @@ def proc(command:str, *extra_commands, verbose:bool = False, **kwargs): streamer = stream_output(process, verbose=verbose) return streamer -def cmd(command:Union[str, list], - *args, - verbose:bool = False , - env:Dict[str, str] = {}, - sudo:bool = False, - password: bool = None, - bash : bool = False, - return_process: bool = False, - stream: bool = False, - color : str = 'white', - cwd : str = None, - **kwargs) -> 'subprocess.Popen': - - import commune as c - - if len(args) > 0: - command = ' '.join([command] + list(args)) - - sudo = bool(password != None) - if sudo: - command = f'sudo {command}' - - if bash: - command = f'bash -c "{command}"' - - cwd = c.resolve_path(c.pwd() if cwd == None else cwd) - - env = {**os.environ, **env} - process = subprocess.Popen(shlex.split(command), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd = cwd, - env=env, **kwargs) - if return_process: - return process - - streamer = stream_output(process) - - if stream: - return streamer - else: - text = '' - for ch in streamer: - text += ch - - return text +def cmd( + command: Union[str, list], + *args, + verbose: bool = False, + env: Dict[str, str] = None, + sudo: bool = False, + password: str = None, + bash: bool = False, + return_process: bool = False, + stream: bool = False, + color: str = 'white', + cwd: str = None, + **kwargs +) -> 'subprocess.Popen': + """ + Execute a shell command with various options and handle edge cases. + """ + import commune as c + def stream_output(process, verbose=True): + """Stream output from process pipes.""" + try: + modes = ['stdout', 'stderr'] + for mode in modes: + pipe = getattr(process, mode) + if pipe is None: + continue + + # Read bytewise + while True: + ch = pipe.read(1) + if not ch: + break + try: + ch_decoded = ch.decode('utf-8') + if verbose: + print(ch_decoded, end='', flush=True) + yield ch_decoded + except UnicodeDecodeError: + continue + finally: + kill_process(process) + try: + # Handle command construction + if isinstance(command, list): + command = ' '.join(command) + + if args: + command = ' '.join([command] + list(map(str, args))) + + # Handle sudo + if password is not None: + sudo = True + if sudo: + command = f'sudo {command}' + + # Handle bash execution + if bash: + command = f'bash -c "{command}"' + + # Handle working directory + cwd = c.resolve_path(c.pwd() if cwd is None else cwd) + + # Handle environment variables + if env is None: + env = {} + env = {**os.environ, **env} + + # Create process + process = subprocess.Popen( + shlex.split(command), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=cwd, + env=env, + **kwargs + ) + + if return_process: + return process + + # Handle output streaming + streamer = stream_output(process) + + if stream: + return streamer + else: + # Collect all output + text = '' + for ch in streamer: + text += ch + return text + except Exception as e: + if verbose: + print(f"Error executing command: {str(e)}") + raise def determine_type( x): x_type = type(x) x_type_name = x_type.__name__.lower() diff --git a/commune/utils/storage.py b/commune/utils/storage.py index c7de1cf5..f2db5eb1 100644 --- a/commune/utils/storage.py +++ b/commune/utils/storage.py @@ -1,5 +1,6 @@ from typing import * +import os def str2python(input)-> dict: import json diff --git a/modules/agent/agent.py b/modules/agent/agent.py index c57137cf..22b2b011 100644 --- a/modules/agent/agent.py +++ b/modules/agent/agent.py @@ -16,11 +16,11 @@ def __init__(self, def generate(self, text = 'whats 2+2?' , - model= 'anthropic/claude-3.5-sonnet', - temperature= 0.5, + model= 'anthropic/claude-3.5-sonnet', + temperature= 0.5, max_tokens= 1000000, stream=True, ): - # text = self.process_text(text) + text = self.process_text(text) return self.model.generate(text, stream=stream, model=model, max_tokens=max_tokens,temperature=temperature ) forward = generate diff --git a/modules/builder/builder.py b/modules/builder/builder.py index 8873461b..d9599340 100644 --- a/modules/builder/builder.py +++ b/modules/builder/builder.py @@ -12,16 +12,9 @@ def __init__(self, key = None, **kwargs): - self.model = c.module('model.openrouter')(model=model) + self.model = c.module('agent')(model=model) self.models = self.model.models() self.key = c.get_key(key) - - def process_text(self, text): - for ch in text.split(' '): - - if len(ch) > 0 and ch[0] in ['.', '/', '~'] and os.path.exists(ch): - text = text.replace(ch, str(c.file2text(ch))) - return text def build(self, text, @@ -59,7 +52,6 @@ def build(self, """ if len(extra_text) > 0: text = ' '.join(list(map(str, [text] +list(extra_text)))) - text = self.process_text(text) prompt = prompt + text output = self.model.generate(prompt, stream=stream, model=model, max_tokens=max_tokens, temperature=temperature ) return self.process_output(output, path=path) diff --git a/modules/subspace/subspace.py b/modules/subspace/subspace.py index 80cf7f1e..bf0e0467 100644 --- a/modules/subspace/subspace.py +++ b/modules/subspace/subspace.py @@ -2035,7 +2035,6 @@ def get_balances( return balances def my_balance(self, batch_size=128, timeout=120, max_age=6000, update=False, num_connections=10): - self.set_connections(num_connections=num_connections) path = self.resolve_path(f'{self.network}/my_balance') balances = c.get(path, None, update=update, max_age=max_age) if balances == None: @@ -2055,13 +2054,16 @@ def my_balance(self, batch_size=128, timeout=120, max_age=6000, update=False, nu balances = {} progress_bar = c.tqdm(total=n, desc='Getting Balances') for f in c.as_completed(future2hash.keys(), timeout=timeout): - batch_hash = future2hash[f] - balances_batch = f.result() - addresses_batch = hash2batch[batch_hash] - for address, balance in zip(addresses_batch, balances_batch): - balances[address] = balance[1].value['data']['free'] - progress_bar.update(1) - # c.print(f'Balance for {address}: {balance} ({counter}/{len(addresses)})') + try: + batch_hash = future2hash[f] + balances_batch = f.result() + addresses_batch = hash2batch[batch_hash] + for address, balance in zip(addresses_batch, balances_batch): + balances[address] = balance[1].value['data']['free'] + progress_bar.update(1) + # c.print(f'Balance for {address}: {balance} ({counter}/{len(addresses)})') + except Exception as e: + print(e) address2key = c.address2key() balances = {address2key.get(k, k):v for k,v in balances.items()} c.put(path, balances)