From ce675720c27d3647da57fb253d06206876c77615 Mon Sep 17 00:00:00 2001 From: Veselin Penev Date: Sat, 14 Sep 2024 23:09:11 +0200 Subject: [PATCH] various changes and improvements in the files stats counting (#208) --- CHANGELOG.txt | 10 + bitdust/access/key_ring.py | 7 +- bitdust/access/shared_access_coordinator.py | 8 +- bitdust/crypt/encrypted.py | 2 +- bitdust/interface/api.py | 71 ++++--- bitdust/p2p/p2p_service.py | 92 +++++++++ bitdust/services/driver.py | 2 +- bitdust/services/service_p2p_hookups.py | 80 +------- bitdust/services/service_p2p_notifications.py | 1 - bitdust/storage/accounting.py | 30 ++- bitdust/storage/backup_control.py | 38 ++-- bitdust/storage/backup_fs.py | 175 +++++++++++------- bitdust/storage/backup_monitor.py | 2 +- bitdust/supplier/family_member.py | 2 +- bitdust/supplier/storage_contract.py | 16 +- bitdust/transport/packet_out.py | 3 +- bitdust/transport/proxy/proxy_router.py | 35 +++- default_network.json | 6 +- import | 2 +- 19 files changed, 341 insertions(+), 241 deletions(-) diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 5cf3aea..03e57aa 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,6 +1,16 @@ Change Log ========== +2024-09-14 Veselin Penev [penev.veselin@gmail.com](mailto:penev.veselin@gmail.com) + +* changed one of the seed nodes +* block uploading of a file when there is insufficient storage space expected +* populate details about service_proxy_server() inf the api.network_status() +* keep track of files/folders/backups statistics for individual customers in the catalog +* bug fix in storage_contract.py +* IDURL fix in shared_access_coordinator() + + 2024-09-04 Veselin Penev [penev.veselin@gmail.com](mailto:penev.veselin@gmail.com) diff --git a/bitdust/access/key_ring.py b/bitdust/access/key_ring.py index 3dccb4f..d6d5eb8 100644 --- a/bitdust/access/key_ring.py +++ b/bitdust/access/key_ring.py @@ -462,8 +462,11 @@ def on_key_received(newpacket, info, status, error_message): # but we already have a key with that ID if my_keys.is_key_private(key_id): # we should not overwrite existing private key - # TODO: check other scenarios - raise Exception('private key already registered with %r' % key_id) + if my_keys.get_public_key_raw(key_id) != key_object.toPublicString(): + raise Exception('private key already registered : %r' % key_id) + lg.warn('received public key, but matching private key is already registered %r' % key_id) + p2p_service.SendAck(newpacket) + return True if my_keys.get_public_key_raw(key_id) != key_object.toPublicString(): my_keys.erase_key(key_id) if not my_keys.register_key(key_id, key_object, label=key_label): diff --git a/bitdust/access/shared_access_coordinator.py b/bitdust/access/shared_access_coordinator.py index 4c2af58..aaca6af 100644 --- a/bitdust/access/shared_access_coordinator.py +++ b/bitdust/access/shared_access_coordinator.py @@ -669,7 +669,7 @@ def doConnectCustomerSuppliers(self, *args, **kwargs): self.suppliers_in_progress.clear() self.suppliers_succeed.clear() for supplier_idurl in self.known_suppliers_list: - self.suppliers_in_progress.append(supplier_idurl) + self.suppliers_in_progress.append(id_url.field(supplier_idurl)) if id_url.is_cached(supplier_idurl): self._do_connect_with_supplier(supplier_idurl) else: @@ -689,7 +689,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs): """ Action method. """ - supplier_idurl = kwargs['supplier_idurl'] + supplier_idurl = id_url.field(kwargs['supplier_idurl']) pkt_out = None if event == 'supplier-file-modified': remote_path = kwargs['remote_path'] @@ -797,8 +797,8 @@ def doRemember(self, event, *args, **kwargs): """ Action method. """ - supplier_idurl = kwargs['supplier_idurl'] - if supplier_idurl in self.suppliers_in_progress: + supplier_idurl = id_url.field(kwargs['supplier_idurl']) + if id_url.is_in(supplier_idurl, self.suppliers_in_progress): self.suppliers_in_progress.remove(supplier_idurl) if event in ['index-sent', 'index-up-to-date']: if supplier_idurl not in self.suppliers_succeed: diff --git a/bitdust/crypt/encrypted.py b/bitdust/crypt/encrypted.py index cc2bc1d..a4364f1 100644 --- a/bitdust/crypt/encrypted.py +++ b/bitdust/crypt/encrypted.py @@ -294,7 +294,7 @@ def Unserialize(data, decrypt_key=None): exc, list(dct.keys()), (dct.get('c'), dct.get('b'), dct.get('i'), dct.get('r')), - traceback.format_tb(), + '\n'.join(traceback.format_stack()), )) if _Debug: lg.out(_DebugLevel, repr(dct)) diff --git a/bitdust/interface/api.py b/bitdust/interface/api.py index 225c089..69a0048 100644 --- a/bitdust/interface/api.py +++ b/bitdust/interface/api.py @@ -515,13 +515,14 @@ def process_info(): } if driver.is_on('service_backup_db'): from bitdust.storage import backup_fs + v = backup_fs.total_stats() result['file'] = { - 'items': backup_fs.counter(), - 'files': backup_fs.numberfiles(), - 'files_size': backup_fs.sizefiles(), - 'folders': backup_fs.numberfolders(), - 'folders_size': backup_fs.sizefolders(), - 'backups_size': backup_fs.sizebackups(), + 'items': v['items'], + 'files': v['files'], + 'folders': v['folders'], + 'files_size': v['size_files'], + 'folders_size': v['size_folders'], + 'backups_size': v['size_backups'], 'customers': len(backup_fs.known_customers()), } if driver.is_on('service_shared_data'): @@ -693,6 +694,7 @@ def network_select(name): def _on_network_disconnected(x): cur_base_dir = deploy.current_base_dir() + # TODO: must wait shutdown and init to complete with defered shutdowner.shutdown_services() shutdowner.shutdown_local() shutdowner.shutdown_automats() @@ -969,6 +971,20 @@ def network_status(suppliers=False, customers=False, cache=False, tcp=False, udp i['queue'] = len(s.pending_packets) sessions.append(i) r['proxy']['sessions'] = sessions + if driver.is_on('service_proxy_server'): + from bitdust.transport.proxy import proxy_router + if proxy_router.A(): + r['proxy']['routes'] = [] + for v in proxy_router.A().routes.values(): + _r = v['connection_info'].copy() + _r['contacts'] = ', '.join(['{}:{}'.format(c[0], c[1]) for c in v['contacts']]) + _r['address'] = ', '.join(['{}:{}'.format(a[0], a[1]) for a in v['address']]) + _r.pop('id', None) + _r.pop('index', None) + r['proxy']['routes'].append(_r) + r['proxy']['closed_routes'] = [(strng.to_text(k), strng.to_text(v)) for k, v in proxy_router.A().closed_routes.items()] + r['proxy']['acks'] = len(proxy_router.A().acks) + r['proxy']['hosts'] = ', '.join([('{}://{}:{}'.format(strng.to_text(k), strng.to_text(v[0]), strng.to_text(v[1]))) for k, v in proxy_router.A().my_hosts.items()]) if dht: from bitdust.dht import dht_service r['dht'] = {} @@ -2292,7 +2308,7 @@ def file_delete(remote_path): backup_fs.DeleteLocalDir(settings.getLocalBackupsDir(), pathIDfull) backup_fs.DeleteByID(pathID, iter=backup_fs.fs(customer_idurl, key_alias), iterID=backup_fs.fsID(customer_idurl, key_alias)) backup_fs.Scan(customer_idurl=customer_idurl, key_alias=key_alias) - backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias)) + backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias) if key_alias != 'master': if driver.is_on('service_shared_data'): from bitdust.access import shared_access_coordinator @@ -2428,24 +2444,10 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events if not pathID: return ERROR('path %s was not registered yet' % remote_path) keyID = my_keys.make_key_id(alias=key_alias, creator_glob_id=parts['customer']) - # customerID = global_id.MakeGlobalID(customer=parts['customer'], key_alias=key_alias) pathIDfull = packetid.MakeBackupID(keyID, pathID) if key_alias != 'master': if not driver.is_on('service_shared_data'): return ERROR('service_shared_data() is not started') - - -# def _restart_active_share(result): -# if _Debug: -# lg.args(_DebugLevel, result=result, key_id=keyID, path=path, pathID=pathID) -# if key_alias != 'master': -# from bitdust.access import shared_access_coordinator -# active_share = shared_access_coordinator.get_active_share(keyID) -# if not active_share: -# active_share = shared_access_coordinator.SharedAccessCoordinator(key_id=keyID, publish_events=publish_events) -# active_share.automat('restart') -# return result - if wait_result: task_created_defer = Deferred() tsk = backup_control.StartSingle( @@ -2453,8 +2455,6 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events localPath=local_path, keyID=keyID, ) - # if key_alias != 'master': - # tsk.result_defer.addCallback(_restart_active_share) tsk.result_defer.addCallback( lambda result: task_created_defer.callback( OK( @@ -2470,15 +2470,16 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events ) ) ) - tsk.result_defer.addErrback(lambda result: task_created_defer.callback(ERROR( - 'uploading task %d for %s failed: %s' % ( - tsk.number, - tsk.pathID, - result[1], - ), - api_method='file_upload_start', - ), ), ) - backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias)) + tsk.result_defer.addErrback(lambda result: task_created_defer.callback(ERROR(result, api_method='file_upload_start'))) + # tsk.result_defer.addErrback(lambda result: task_created_defer.callback(ERROR( + # 'uploading task %d for %s failed: %s' % ( + # tsk.number, + # tsk.pathID, + # result, + # ), + # api_method='file_upload_start', + # ), ), ) + backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias) backup_control.SaveFSIndex(customer_idurl, key_alias) if _Debug: lg.out(_DebugLevel, 'api.file_upload_start %s with %s, wait_result=True' % (remote_path, pathIDfull)) @@ -2489,10 +2490,8 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events localPath=local_path, keyID=keyID, ) - # if key_alias != 'master': - # tsk.result_defer.addCallback(_restart_active_share) - tsk.result_defer.addErrback(lambda result: lg.err('errback from api.file_upload_start.task(%s) failed with %s' % (result[0], result[1]))) - backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias)) + tsk.result_defer.addErrback(lambda result: lg.err('errback from api.file_upload_start.task() failed with %r' % result)) + backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias) backup_control.SaveFSIndex(customer_idurl, key_alias) if _Debug: lg.out(_DebugLevel, 'api.file_upload_start %s with %s' % (remote_path, pathIDfull)) diff --git a/bitdust/p2p/p2p_service.py b/bitdust/p2p/p2p_service.py index 91ac847..91a9254 100644 --- a/bitdust/p2p/p2p_service.py +++ b/bitdust/p2p/p2p_service.py @@ -51,6 +51,10 @@ #------------------------------------------------------------------------------ +from twisted.internet.defer import Deferred + +#------------------------------------------------------------------------------ + from bitdust.logs import lg from bitdust.contacts import contactsdb @@ -66,8 +70,11 @@ from bitdust.crypt import signed from bitdust.crypt import my_keys +from bitdust.services import driver + from bitdust.transport import gateway from bitdust.transport import callback +from bitdust.transport import packet_out from bitdust.userid import my_id @@ -195,6 +202,91 @@ def outbox(outpacket, wide, callbacks, target=None, route=None, response_timeout #------------------------------------------------------------------------------ +def on_request_service_received(newpacket, info): + if len(newpacket.Payload) > 1024*10: + lg.warn('too long payload') + SendFail(newpacket, 'too long payload') + return True + try: + json_payload = serialization.BytesToDict(newpacket.Payload, keys_to_text=True, values_to_text=True) + json_payload['name'] + json_payload['payload'] + except: + lg.warn('json payload invalid') + SendFail(newpacket, 'json payload invalid') + return True + service_name = str(json_payload['name']) + if _Debug: + lg.out(_Debug, 'service_p2p_hookups.RequestService {%s} from %s' % (service_name, newpacket.OwnerID)) + if not driver.is_exist(service_name): + lg.warn('got wrong payload in %s' % service_name) + SendFail(newpacket, 'service %s not exist' % service_name) + return True + if not driver.is_on(service_name): + SendFail(newpacket, 'service %s is off' % service_name) + return True + try: + result = driver.request(service_name, json_payload['payload'], newpacket, info) + except: + lg.exc() + SendFail(newpacket, 'request processing failed with exception') + return True + if not result: + if _Debug: + lg.out(_Debug, 'service_p2p_hookups._send_request_service SKIP request %s' % service_name) + return False + if isinstance(result, Deferred): + if _Debug: + lg.out(_Debug, 'service_p2p_hookups._send_request_service fired delayed execution') + elif isinstance(result, packet_out.PacketOut): + if _Debug: + lg.out(_Debug, 'service_p2p_hookups._send_request_service outbox packet sent') + return True + + +def on_cancel_service_received(newpacket, info): + if len(newpacket.Payload) > 1024*10: + SendFail(newpacket, 'too long payload') + return True + try: + json_payload = serialization.BytesToDict(newpacket.Payload, keys_to_text=True, values_to_text=True) + json_payload['name'] + json_payload['payload'] + except: + SendFail(newpacket, 'json payload invalid') + return True + service_name = json_payload['name'] + if _Debug: + lg.out(_Debug, 'service_p2p_hookups.CancelService {%s} from %s' % (service_name, newpacket.OwnerID)) + if not driver.is_exist(service_name): + lg.warn('got wrong payload in %s' % newpacket) + SendFail(newpacket, 'service %s not exist' % service_name) + return True + if not driver.is_on(service_name): + SendFail(newpacket, 'service %s is off' % service_name) + return True + try: + result = driver.cancel(service_name, json_payload['payload'], newpacket, info) + except: + lg.exc() + SendFail(newpacket, 'request processing failed with exception') + return True + if not result: + if _Debug: + lg.out(_Debug, 'service_p2p_hookups._send_cancel_service SKIP request %s' % service_name) + return False + if isinstance(result, Deferred): + if _Debug: + lg.out(_Debug, 'service_p2p_hookups._send_cancel_service fired delayed execution') + elif isinstance(result, packet_out.PacketOut): + if _Debug: + lg.out(_Debug, 'service_p2p_hookups._send_cancel_service outbox packet sent') + return True + + +#------------------------------------------------------------------------------ + + def Ack(newpacket, info): if _Debug: lg.out(_DebugLevel, 'p2p_service.Ack %s from [%s] at %s://%s with %d bytes payload' % (newpacket.PacketID, nameurl.GetName(newpacket.CreatorID), info.proto, info.host, len(newpacket.Payload))) diff --git a/bitdust/services/driver.py b/bitdust/services/driver.py index b83c128..a218ca4 100644 --- a/bitdust/services/driver.py +++ b/bitdust/services/driver.py @@ -474,7 +474,7 @@ def _on_started(start_result, stop_result, dependencies_results): def _on_failed(err): lg.warn('failed service %s in driver.restart() : %r' % (service_name, err)) - restart_result.errback(str(err)) + restart_result.errback(Exception(str(err))) return None def _do_start(stop_result=None, dependencies_results=None): diff --git a/bitdust/services/service_p2p_hookups.py b/bitdust/services/service_p2p_hookups.py index 8d8f567..a30f68b 100644 --- a/bitdust/services/service_p2p_hookups.py +++ b/bitdust/services/service_p2p_hookups.py @@ -110,88 +110,12 @@ def _on_inbox_packet_received(self, newpacket, info, status, error_message): return False def _on_request_service_received(self, newpacket, info): - from twisted.internet.defer import Deferred - from bitdust.logs import lg - from bitdust.lib import serialization - from bitdust.services import driver from bitdust.p2p import p2p_service - from bitdust.transport import packet_out - if len(newpacket.Payload) > 1024*10: - lg.warn('too long payload') - p2p_service.SendFail(newpacket, 'too long payload') - return True - try: - json_payload = serialization.BytesToDict(newpacket.Payload, keys_to_text=True, values_to_text=True) - json_payload['name'] - json_payload['payload'] - except: - lg.warn('json payload invalid') - p2p_service.SendFail(newpacket, 'json payload invalid') - return True - service_name = str(json_payload['name']) - lg.out(self.debug_level, 'service_p2p_hookups.RequestService {%s} from %s' % (service_name, newpacket.OwnerID)) - if not driver.is_exist(service_name): - lg.warn('got wrong payload in %s' % service_name) - p2p_service.SendFail(newpacket, 'service %s not exist' % service_name) - return True - if not driver.is_on(service_name): - p2p_service.SendFail(newpacket, 'service %s is off' % service_name) - return True - try: - result = driver.request(service_name, json_payload['payload'], newpacket, info) - except: - lg.exc() - p2p_service.SendFail(newpacket, 'request processing failed with exception') - return True - if not result: - lg.out(self.debug_level, 'service_p2p_hookups._send_request_service SKIP request %s' % service_name) - return False - if isinstance(result, Deferred): - lg.out(self.debug_level, 'service_p2p_hookups._send_request_service fired delayed execution') - elif isinstance(result, packet_out.PacketOut): - lg.out(self.debug_level, 'service_p2p_hookups._send_request_service outbox packet sent') - return True + return p2p_service.on_request_service_received(newpacket, info) def _on_cancel_service_received(self, newpacket, info): - from twisted.internet.defer import Deferred - from bitdust.logs import lg - from bitdust.lib import serialization - from bitdust.services import driver from bitdust.p2p import p2p_service - from bitdust.transport import packet_out - if len(newpacket.Payload) > 1024*10: - p2p_service.SendFail(newpacket, 'too long payload') - return True - try: - json_payload = serialization.BytesToDict(newpacket.Payload, keys_to_text=True, values_to_text=True) - json_payload['name'] - json_payload['payload'] - except: - p2p_service.SendFail(newpacket, 'json payload invalid') - return True - service_name = json_payload['name'] - lg.out(self.debug_level, 'service_p2p_hookups.CancelService {%s} from %s' % (service_name, newpacket.OwnerID)) - if not driver.is_exist(service_name): - lg.warn('got wrong payload in %s' % newpacket) - p2p_service.SendFail(newpacket, 'service %s not exist' % service_name) - return True - if not driver.is_on(service_name): - p2p_service.SendFail(newpacket, 'service %s is off' % service_name) - return True - try: - result = driver.cancel(service_name, json_payload['payload'], newpacket, info) - except: - lg.exc() - p2p_service.SendFail(newpacket, 'request processing failed with exception') - return True - if not result: - lg.out(self.debug_level, 'service_p2p_hookups._send_cancel_service SKIP request %s' % service_name) - return False - if isinstance(result, Deferred): - lg.out(self.debug_level, 'service_p2p_hookups._send_cancel_service fired delayed execution') - elif isinstance(result, packet_out.PacketOut): - lg.out(self.debug_level, 'service_p2p_hookups._send_cancel_service outbox packet sent') - return True + return p2p_service.on_cancel_service_received(newpacket, info) def _on_p2p_connector_switched(self, oldstate, newstate, evt, *args, **kwargs): if newstate == 'INCOMMING?': diff --git a/bitdust/services/service_p2p_notifications.py b/bitdust/services/service_p2p_notifications.py index f8caa57..f3e8da3 100644 --- a/bitdust/services/service_p2p_notifications.py +++ b/bitdust/services/service_p2p_notifications.py @@ -141,7 +141,6 @@ def request(self, json_payload, newpacket, info): resp['result'] = 'denied' resp['reason'] = str(exc) service_responses_list.append(resp) - lg.out(self.debug_level, 'service_p2p_notifications.request %s:%s is [%s] : %s' % (r_scope, r_action, resp['result'], resp.get('reason', 'OK'))) payload = serialization.DictToBytes({'items': service_responses_list}, values_to_text=True) return p2p_service.SendAck(newpacket, payload) diff --git a/bitdust/storage/accounting.py b/bitdust/storage/accounting.py index 23bf759..e6a1b50 100644 --- a/bitdust/storage/accounting.py +++ b/bitdust/storage/accounting.py @@ -67,6 +67,8 @@ from bitdust.storage import backup_fs +from bitdust.userid import my_id + #------------------------------------------------------------------------------ @@ -183,28 +185,38 @@ def calculate_customers_usage_ratio(space_dict=None, used_dict=None): def report_consumed_storage(): + my_own_stats = backup_fs.total_stats() + shared_stats = backup_fs.total_stats(customer_idurl=my_id.getIDURL(), exclude=True) result = {} result['suppliers_num'] = contactsdb.num_suppliers() result['needed'] = settings.getNeededBytes() - # result['needed_str'] = diskspace.MakeStringFromBytes(result['needed']) - result['used'] = int(backup_fs.sizebackups()) - # result['used_str'] = diskspace.MakeStringFromBytes(result['used']) + result['used'] = my_own_stats['size_backups'] result['available'] = result['needed'] - result['used'] - # result['available_str'] = diskspace.MakeStringFromBytes(result['available']) result['needed_per_supplier'] = 0 result['used_per_supplier'] = 0 result['available_per_supplier'] = 0 if result['suppliers_num'] > 0: - result['needed_per_supplier'] = int(math.ceil(2.0*result['needed']/result['suppliers_num'])) - result['used_per_supplier'] = int(math.ceil(2.0*result['used']/result['suppliers_num'])) + result['needed_per_supplier'] = int(math.ceil(result['needed']/result['suppliers_num'])) + result['used_per_supplier'] = int(math.ceil(result['used']/result['suppliers_num'])) result['available_per_supplier'] = result['needed_per_supplier'] - result['used_per_supplier'] - # result['needed_per_supplier_str'] = diskspace.MakeStringFromBytes(result['needed_per_supplier']) - # result['used_per_supplier_str'] = diskspace.MakeStringFromBytes(result['used_per_supplier']) - # result['available_per_supplier_str'] = diskspace.MakeStringFromBytes(result['available_per_supplier']) try: result['used_percent'] = misc.value2percent(float(result['used']), float(result['needed'])) except: result['used_percent'] = '0%' + result['my_catalog_items'] = my_own_stats['items'] + result['my_files'] = my_own_stats['files'] + result['my_folders'] = my_own_stats['folders'] + result['my_files_size'] = my_own_stats['size_files'] + result['my_folders_size'] = my_own_stats['size_folders'] + result['my_backups_size'] = my_own_stats['size_backups'] + result['my_keys'] = my_own_stats['keys'] + result['shared_catalog_items'] = shared_stats['items'] + result['shared_files'] = shared_stats['files'] + result['shared_folders'] = shared_stats['folders'] + result['shared_files_size'] = shared_stats['size_files'] + result['shared_folders_size'] = shared_stats['size_folders'] + result['shared_backups_size'] = shared_stats['size_backups'] + result['shared_keys'] = shared_stats['keys'] return result diff --git a/bitdust/storage/backup_control.py b/bitdust/storage/backup_control.py index ecb402a..4961fd2 100644 --- a/bitdust/storage/backup_control.py +++ b/bitdust/storage/backup_control.py @@ -396,7 +396,7 @@ def DeleteBackup(backupID, removeLocalFilesToo=True, saveDB=True, calculate=True # check and calculate used space if calculate or key_alias != 'master': backup_fs.Scan(customer_idurl=customer_idurl, key_alias=key_alias) - backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias)) + backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias) # in some cases we want to save the DB later if saveDB or key_alias != 'master': SaveFSIndex(customer_idurl, key_alias) @@ -448,7 +448,7 @@ def DeletePathBackups(pathID, removeLocalFilesToo=True, saveDB=True, calculate=T # check and calculate used space if calculate or key_alias != 'master': backup_fs.Scan(customer_idurl=customer_idurl, key_alias=key_alias) - backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias)) + backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias) # save the index if needed if saveDB or key_alias != 'master': SaveFSIndex(customer_idurl, key_alias) @@ -629,12 +629,12 @@ def on_folder_size_counted(self, pth, sz, itemInfo): # TODO: need to rethink that approach # here not taking in account compressing rate of the local files # but taking in account remote version size - it is always doubled - if int(backup_fs.sizebackups()) + self.totalSize*2 > settings.getNeededBytes(): - err = 'insufficient storage space expected' + if backup_fs.total_stats()['size_backups'] + self.totalSize*2 > settings.getNeededBytes(): + err_str = 'insufficient storage space expected' pth_id = self.pathID - self.result_defer.errback((pth_id, err)) - reactor.callLater(0, OnTaskFailed, pth_id, err) # @UndefinedVariable - self.destroy(err) + self.result_defer.errback(Exception(err_str)) + reactor.callLater(0, OnTaskFailed, pth_id, err_str) # @UndefinedVariable + self.destroy(err_str) return None try: backup_fs.MakeLocalDir(settings.getLocalBackupsDir(), self.backupID) @@ -642,11 +642,11 @@ def on_folder_size_counted(self, pth, sz, itemInfo): lg.exc() if _Debug: lg.out(_DebugLevel, 'backup_control.Task.on_folder_size_counted ERROR creating destination folder for %s' % self.pathID) - err = 'failed creating destination folder for "%s"' % self.backupID + err_str = 'failed creating destination folder for "%s"' % self.backupID pth_id = self.pathID - self.result_defer.errback((pth_id, err)) - reactor.callLater(0, OnTaskFailed, pth_id, err) # @UndefinedVariable - self.destroy(err) + self.result_defer.errback(Exception(err_str)) + reactor.callLater(0, OnTaskFailed, pth_id, err_str) # @UndefinedVariable + self.destroy(err_str) return None itemInfo.set_size(self.totalSize) @@ -677,7 +677,7 @@ def on_folder_size_counted(self, pth, sz, itemInfo): job.totalSize = self.totalSize jobs()[self.backupID] = job - backup_fs.Calculate(iterID=backup_fs.fsID(self.customerIDURL, self.keyAlias)) + backup_fs.Calculate(customer_idurl=self.customerIDURL, key_alias=self.keyAlias) SaveFSIndex(customer_idurl=self.customerIDURL, key_alias=self.keyAlias) jobs()[self.backupID].automat('start') @@ -748,7 +748,7 @@ def RunTask(): path_id=T.pathID, message=message, )) - T.result_defer.errback((T.pathID, message)) + T.result_defer.errback(Exception(message)) T.destroy(message) return True @@ -801,8 +801,8 @@ def OnFoundFolderSize(pth, sz, arg): item = backup_fs.GetByID(pathID, iterID=backup_fs.fsID(customerIDURL, keyAlias)) if item: item.set_size(sz) - backup_fs.Calculate(iterID=backup_fs.fsID(customerIDURL, keyAlias)) - SaveFSIndex(customerIDURL, keyAlias) + backup_fs.Calculate(customer_idurl=customerIDURL, key_alias=keyAlias) + SaveFSIndex(customer_idurl=customerIDURL, key_alias=keyAlias) if version: backupID = packetid.MakeBackupID(customerGlobID, pathID, version, key_alias=keyAlias) job = GetRunningBackupObject(backupID) @@ -839,9 +839,9 @@ def OnJobDone(backupID, result): backup_fs.DeleteLocalBackup(settings.getLocalBackupsDir(), backupID) backup_matrix.EraseBackupLocalInfo(backupID) backup_matrix.EraseBackupLocalInfo(backupID) - backup_fs.ScanID(remotePath) - backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, keyAlias)) - SaveFSIndex(customer_idurl, keyAlias) + backup_fs.ScanID(remotePath, customer_idurl=customer_idurl, key_alias=keyAlias) + backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=keyAlias) + SaveFSIndex(customer_idurl=customer_idurl, key_alias=keyAlias) # TODO: check used space, if we have over use - stop all tasks immediately elif result == 'abort': DeleteBackup(backupID) @@ -893,7 +893,7 @@ def OnTaskExecutedCallback(result): def OnTaskFailedCallback(result): - lg.err('pathID: %s, error: %s' % (result[0], result[1])) + lg.err(str(result)) return result diff --git a/bitdust/storage/backup_fs.py b/bitdust/storage/backup_fs.py index bdcef4c..dbae219 100644 --- a/bitdust/storage/backup_fs.py +++ b/bitdust/storage/backup_fs.py @@ -104,9 +104,9 @@ from bitdust.interface import api -from bitdust.userid import my_id from bitdust.userid import global_id from bitdust.userid import id_url +from bitdust.userid import my_id #------------------------------------------------------------------------------ @@ -125,12 +125,7 @@ _FileSystemIndexByName = {} _FileSystemIndexByID = {} _RevisionNumber = {} -_ItemsCount = 0 -_FilesCount = 0 -_DirsCount = 0 -_SizeFiles = 0 -_SizeFolders = 0 -_SizeBackups = 0 +_Stats = {} #------------------------------------------------------------------------------ @@ -204,7 +199,7 @@ def fsID(customer_idurl=None, key_alias='master'): def revision(customer_idurl=None, key_alias='master'): """ - Mutator method to access current software revision number. + Method to access current revision number of the corresponding catalogue. """ global _RevisionNumber if customer_idurl is None: @@ -292,55 +287,115 @@ def known_keys_aliases(customer_idurl): #------------------------------------------------------------------------------ -def counter(): +def stats(customer_idurl=None, key_alias='master'): + global _Stats + if customer_idurl is None: + customer_idurl = my_id.getIDURL() + customer_idurl = id_url.field(customer_idurl) + if customer_idurl not in _Stats: + return {} + if key_alias not in _Stats[customer_idurl]: + return {} + return _Stats[customer_idurl][key_alias] + + +def set_stat(dict_value, customer_idurl=None, key_alias='master'): + global _Stats + if customer_idurl is None: + customer_idurl = my_id.getIDURL() + customer_idurl = id_url.field(customer_idurl) + if customer_idurl not in _Stats: + _Stats[customer_idurl] = {} + if key_alias not in _Stats[customer_idurl]: + _Stats[customer_idurl][key_alias] = { + 'items': 0, + 'files': 0, + 'folders': 0, + 'size_files': 0, + 'size_folders': 0, + 'size_backups': 0, + } + v = _Stats[customer_idurl][key_alias] + v.update(dict_value) + _Stats[customer_idurl][key_alias] = v + + +def total_stats(customer_idurl=None, exclude=False): + global _Stats + if customer_idurl is None: + customer_idurl = my_id.getIDURL() + customer_idurl = id_url.field(customer_idurl) + ret = { + 'items': 0, + 'files': 0, + 'folders': 0, + 'size_files': 0, + 'size_folders': 0, + 'size_backups': 0, + 'keys': 0, + } + if exclude: + for another_customer_idurl in known_customers(): + if id_url.is_the_same(customer_idurl, another_customer_idurl): + continue + for val in _Stats.get(another_customer_idurl, {}).values(): + for k in val.keys(): + ret[k] += val[k] + ret['keys'] += 1 + else: + for val in _Stats.get(customer_idurl, {}).values(): + for k in val.keys(): + ret[k] += val[k] + ret['keys'] += 1 + return ret + + +#------------------------------------------------------------------------------ + + +def counter(customer_idurl=None, key_alias='master'): """ Software keeps track of total number of indexed items, this returns that value. """ - global _ItemsCount - return _ItemsCount + return stats(customer_idurl=customer_idurl, key_alias=key_alias).get('items') -def numberfiles(): +def numberfiles(customer_idurl=None, key_alias='master'): """ Number of indexed files. """ - global _FilesCount - return _FilesCount + return stats(customer_idurl=customer_idurl, key_alias=key_alias).get('files') -def numberfolders(): +def numberfolders(customer_idurl=None, key_alias='master'): """ Number of indexed files. """ - global _DirsCount - return _DirsCount + return stats(customer_idurl=customer_idurl, key_alias=key_alias).get('folders') -def sizefiles(): +def sizefiles(customer_idurl=None, key_alias='master'): """ Total size of all indexed files. """ - global _SizeFiles - return _SizeFiles + return stats(customer_idurl=customer_idurl, key_alias=key_alias).get('size_files') -def sizefolders(): +def sizefolders(customer_idurl=None, key_alias='master'): """ Total size of all indexed folders. May be incorrect, because folder size is not calculated regular yet. """ - global _SizeFolders - return _SizeFolders + return stats(customer_idurl=customer_idurl, key_alias=key_alias).get('size_folders') -def sizebackups(): +def sizebackups(customer_idurl=None, key_alias='master'): """ Total size of all indexed backups. """ - global _SizeBackups - return _SizeBackups + return stats(customer_idurl=customer_idurl, key_alias=key_alias).get('size_backups') #------------------------------------------------------------------------------ @@ -1700,10 +1755,7 @@ def Scan(basedir=None, customer_idurl=None, key_alias='master'): if basedir is None: basedir = settings.getLocalBackupsDir() iterID = fsID(customer_idurl, key_alias=key_alias) - summ = [ - 0, - 0, - ] + summ = [0, 0] def visitor(path_id, path, info): info.read_stats(path) @@ -1744,29 +1796,23 @@ def ScanID(pathID, basedir=None, customer_idurl=None, key_alias='master'): itr.read_versions(bpio.portablePath(os.path.join(basedir, customer_id))) -def Calculate(iterID=None): +def Calculate(customer_idurl=None, key_alias='master'): """ Scan all items in the index and calculate folder and backups sizes. """ - global _SizeFiles - global _SizeFolders - global _SizeBackups - global _ItemsCount - global _FilesCount - _ItemsCount = 0 - _FilesCount = 0 - _DirsCount = 0 - _SizeFiles = 0 - _SizeFolders = 0 - _SizeBackups = 0 + if customer_idurl is None: + customer_idurl = my_id.getIDURL() + customer_idurl = id_url.field(customer_idurl) + val = { + 'items': 0, + 'files': 0, + 'folders': 0, + 'size_files': 0, + 'size_folders': 0, + 'size_backups': 0, + } def recursive_calculate(i): - global _SizeFiles - global _SizeFolders - global _SizeBackups - global _ItemsCount - global _FilesCount - global _DirsCount folder_size = 0 for id in i.keys(): if id == INFO_KEY: @@ -1775,18 +1821,18 @@ def recursive_calculate(i): if i[id].exist(): folder_size += i[id].size if i[id].type == FILE: - _FilesCount += 1 + val['files'] += 1 if i[id].exist(): - _SizeFiles += i[id].size + val['size_files'] += i[id].size if i[id].type == DIR: - _DirsCount += 1 + val['folders'] += 1 if i[id].exist(): - _SizeFolders += i[id].size + val['size_folders'] += i[id].size for version in i[id].list_versions(): versionSize = i[id].get_version_info(version)[1] if versionSize > 0: - _SizeBackups += versionSize - _ItemsCount += 1 + val['size_backups'] += versionSize + val['items'] += 1 elif isinstance(i[id], dict): sub_folder_size = recursive_calculate(i[id]) if sub_folder_size != -1: @@ -1796,25 +1842,26 @@ def recursive_calculate(i): if INFO_KEY in i: i[INFO_KEY].size = folder_size if i[INFO_KEY].type == FILE: - _FilesCount += 1 + val['files'] += 1 if i[INFO_KEY].exist(): - _SizeFiles += i[INFO_KEY].size + val['size_files'] += i[INFO_KEY].size if i[INFO_KEY].type == DIR: - _DirsCount += 1 + val['folders'] += 1 if i[INFO_KEY].exist(): - _SizeFolders += i[INFO_KEY].size + val['size_folders'] += i[INFO_KEY].size for version in i[INFO_KEY].list_versions(): versionSize = i[INFO_KEY].get_version_info(version)[1] if versionSize > 0: - _SizeBackups += versionSize - _ItemsCount += 1 + val['size_backups'] += versionSize + val['items'] += 1 return folder_size - if iterID is None: - iterID = fsID() + iterID = fsID(customer_idurl=customer_idurl, key_alias=key_alias) ret = recursive_calculate(iterID) + set_stat(val, customer_idurl=customer_idurl, key_alias=key_alias) + if _Debug: - lg.out(_DebugLevel, 'backup_fs.Calculate %d %d %d %d' % (_ItemsCount, _FilesCount, _SizeFiles, _SizeBackups)) + lg.out(_DebugLevel, 'backup_fs.Calculate %r %r : %r' % (customer_idurl, key_alias, val)) return ret @@ -2066,7 +2113,7 @@ def ReadIndex(text_data, new_revision=None, deleted_path_ids=[], encoding='utf-8 total_modified_count += modified_count if updated_keys: for key_alias in updated_keys: - Calculate(iterID=fsID(customer_idurl, key_alias)) + Calculate(customer_idurl=customer_idurl, key_alias=key_alias) updated_customers_keys.append((customer_idurl, key_alias)) if _Debug: lg.out(_DebugLevel, 'backup_fs.ReadIndex %d items loaded for %d keys' % (total_count, len(updated_customers_keys))) diff --git a/bitdust/storage/backup_monitor.py b/bitdust/storage/backup_monitor.py index fe2ae81..2f716c9 100644 --- a/bitdust/storage/backup_monitor.py +++ b/bitdust/storage/backup_monitor.py @@ -331,7 +331,7 @@ def doCleanUpBackups(self, *args, **kwargs): if not contactsdb.num_suppliers(): bytesUsed = 0 else: - bytesUsed = backup_fs.sizebackups()/contactsdb.num_suppliers() + bytesUsed = backup_fs.total_stats()['size_backups']/contactsdb.num_suppliers() bytesNeeded = diskspace.GetBytesFromString(settings.getNeededString(), 0) if _Debug: lg.out(_DebugLevel, 'backup_monitor.doCleanUpBackups backupsToKeep=%d used=%d needed=%d' % (versionsToKeep, bytesUsed, bytesNeeded)) diff --git a/bitdust/supplier/family_member.py b/bitdust/supplier/family_member.py index 34959e4..539b7f2 100644 --- a/bitdust/supplier/family_member.py +++ b/bitdust/supplier/family_member.py @@ -629,7 +629,7 @@ def _do_detect_latest_revision(self, dht_info, my_info): try: my_revision = int(my_info['revision']) except: - lg.warn('my own info is unknown or invalid, assume my revision is 0') + lg.warn('my own info is unknown or invalid in %r, assume my revision is 0' % self) my_revision = 0 try: dht_revision = int(dht_info['revision']) diff --git a/bitdust/supplier/storage_contract.py b/bitdust/supplier/storage_contract.py index 2e8133c..1626ca9 100644 --- a/bitdust/supplier/storage_contract.py +++ b/bitdust/supplier/storage_contract.py @@ -475,11 +475,11 @@ def verify_all_current_customers_contracts(): if not os.path.isdir(settings.ServiceDir('service_supplier_contracts')): return [] for customer_unique_name in os.listdir(settings.ServiceDir('service_supplier_contracts')): - try: - customer_idurl = id_url.field(id_url.unique_names(customer_unique_name)[0]) - except: - lg.exc() + known_idurls = id_url.unique_names(customer_unique_name) + if not known_idurls: + lg.warn('not possible to verify customer contract, unknown customer name found: %r' % customer_unique_name) continue + customer_idurl = id_url.field(known_idurls[0]) contracts_list = list_customer_contracts(customer_idurl) latest_contract = contracts_list['latest'] if contracts_list['current']: @@ -514,11 +514,11 @@ def verify_accept_storage_payment(tx): except: return 0 recently_completed_contracts = [] - try: - customer_idurl = id_url.field(id_url.unique_names(customer_prefix)[0]) - except: - lg.exc() + known_idurls = id_url.unique_names(customer_prefix) + if not known_idurls: + lg.warn('not possible to accept storage payment, unknown customer name found: %r' % customer_prefix) return 0 + customer_idurl = id_url.field(known_idurls[0]) if _Debug: lg.args(_DebugLevel, c=customer_idurl, unique_name=customer_idurl.unique_name()) if customer_idurl.unique_name() != customer_prefix: diff --git a/bitdust/transport/packet_out.py b/bitdust/transport/packet_out.py index 2e47792..f6a0b22 100644 --- a/bitdust/transport/packet_out.py +++ b/bitdust/transport/packet_out.py @@ -271,8 +271,7 @@ def search_by_response_packet(newpacket=None, proto=None, host=None, outgoing_co if another_packet_id not in matching_packet_ids: matching_packet_ids.append(another_packet_id) if len(matching_packet_ids) > 1: - if _Debug: - lg.dbg(_DebugLevel, 'multiple packet IDs expecting to match for that packet: %r' % matching_packet_ids) + lg.warn('multiple packet IDs expecting to match for %r: %r' % (newpacket, matching_packet_ids)) matching_packet_ids_count = 0 matching_command_ack_count = 0 for p in queue(): diff --git a/bitdust/transport/proxy/proxy_router.py b/bitdust/transport/proxy/proxy_router.py index b795006..3391fbd 100644 --- a/bitdust/transport/proxy/proxy_router.py +++ b/bitdust/transport/proxy/proxy_router.py @@ -141,10 +141,12 @@ def A(event=None, *args, **kwargs): class ProxyRouter(automat.Automat): + """ This class implements all the functionality of the ``proxy_router()`` state machine. """ + def init(self): """ Method to initialize additional variables and flags at creation phase @@ -293,10 +295,7 @@ def doSaveRouteProtoHost(self, *args, **kwargs): """ idurl, _, item, _, _, _ = args[0] idurl = id_url.field(idurl).original() - new_address = ( - strng.to_text(item.proto), - strng.to_text(item.host), - ) + new_address = (strng.to_text(item.proto), strng.to_text(item.host)) if idurl not in self.routes: lg.exc(exc_value=Exception('route with %r is not registered yet' % idurl)) else: @@ -384,10 +383,7 @@ def _get_session_proto_host(self, sender_idurl, info=None): return None, None hosts = [] try: - hosts.append(( - active_user_session_machine.get_proto(), - active_user_session_machine.get_host(), - )) + hosts.append((active_user_session_machine.get_proto(), active_user_session_machine.get_host())) except: lg.exc() if not hosts: @@ -712,7 +708,17 @@ def _do_verify_routed_data(self, newpacket, info, sender_idurl, receiver_idurl, routes_keys = list(self.routes.keys()) closed_route_keys = list(self.closed_routes.keys()) if _Debug: - lg.args(_DebugLevel, newpacket=newpacket, info=info, sender_idurl=sender_idurl, receiver_idurl=receiver_idurl, route_contacts=route['contacts'], closed_routes=closed_route_keys, is_retry=is_retry, route_changed=route_changed) + lg.args( + _DebugLevel, + newpacket=newpacket, + info=info, + sender_idurl=sender_idurl, + receiver_idurl=receiver_idurl, + route_contacts=route['contacts'], + closed_routes=closed_route_keys, + is_retry=is_retry, + route_changed=route_changed, + ) routed_packet = signed.Unserialize(routed_data) #--- invalid packet if not routed_packet: @@ -980,7 +986,16 @@ def _on_routed_in_packet_failed(self, pkt_out, msg, newpacket, info, receiver_id def _on_routed_out_packet_sent(self, pkt_out, msg, newpacket, info, sender_idurl, routed_command, routed_packet_id, routed_remote_id, wide, response_timeout, keep_alive): if _Debug: - lg.args(_DebugLevel, pkt_out=pkt_out, msg=msg, newpacket=newpacket, sender_idurl=sender_idurl, routed_command=routed_command, routed_packet_id=routed_packet_id, routed_remote_id=routed_remote_id) + lg.args( + _DebugLevel, + pkt_out=pkt_out, + msg=msg, + newpacket=newpacket, + sender_idurl=sender_idurl, + routed_command=routed_command, + routed_packet_id=routed_packet_id, + routed_remote_id=routed_remote_id, + ) publickey = identitycache.GetPublicKey(newpacket.CreatorID) if not publickey: lg.err('routed packet sent but can not send RelayAck(), identity %r is not cached' % newpacket.CreatorID) diff --git a/default_network.json b/default_network.json index 22a47a2..7caa6c3 100644 --- a/default_network.json +++ b/default_network.json @@ -14,7 +14,7 @@ "explorer_http_port": 19080 }, { - "host": "bahamas.ai", + "host": "bitdust.app", "node_tcp_port": 15658, "mining_pool_tcp_port": 18525, "explorer_http_port": 19080 @@ -42,7 +42,7 @@ "udp_port": 14441 }, { - "host": "bahamas.ai", + "host": "bitdust.app", "udp_port": 14441 }, { @@ -81,7 +81,7 @@ "http_port": 80 }, { - "host": "bahamas.ai", + "host": "bitdust.app", "http_port": 80 }, { diff --git a/import b/import index 8f0801d..c50a1e7 100755 --- a/import +++ b/import @@ -17,7 +17,7 @@ curpath=`pwd` git pull cd "$1" sourcepath=`pwd` -mkdir -p "$tmppath" +mkdir "$tmppath" git checkout-index -a -f --prefix="$tmppath/" cd "$tmppath" cp -r -v * "$curpath"