diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 03e57aa..8dfbb2c 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,11 +1,22 @@ Change Log ========== +2024-10-07 Veselin Penev [penev.veselin@gmail.com](mailto:penev.veselin@gmail.com) + +* Bump cryptography from 42.0.8 to 43.0.1 +* prepare to add uniqueness to Request()/Data() packets for index files +* blocking uploading of a file when there is insufficient storage space expected +* populate details about service_proxy_server() in the api.network_status() +* mulitple improvements related to files management (#207) +* solved issue with keys auto-restore from suppliers data after identity recovery +* various improvements in shared_access_coordinator(), restore_worker() and service_message_history() +* bug fixing group message chats and shared files after identity restore + + + 2024-09-14 Veselin Penev [penev.veselin@gmail.com](mailto:penev.veselin@gmail.com) * changed one of the seed nodes -* block uploading of a file when there is insufficient storage space expected -* populate details about service_proxy_server() inf the api.network_status() * keep track of files/folders/backups statistics for individual customers in the catalog * bug fix in storage_contract.py * IDURL fix in shared_access_coordinator() diff --git a/bitdust/access/group_participant.py b/bitdust/access/group_participant.py index 35d7fd5..779e360 100644 --- a/bitdust/access/group_participant.py +++ b/bitdust/access/group_participant.py @@ -255,6 +255,8 @@ def _start(): existing_group_participant.automat('init') existing_group_participant.automat('connect') started += 1 + if _Debug: + lg.args(_DebugLevel, started=started) def _on_cached(result): if _Debug: diff --git a/bitdust/access/groups.py b/bitdust/access/groups.py index f6b2adf..381ec8b 100644 --- a/bitdust/access/groups.py +++ b/bitdust/access/groups.py @@ -92,6 +92,7 @@ from bitdust.system import bpio from bitdust.contacts import contactsdb +from bitdust.contacts import identitycache from bitdust.main import settings @@ -115,7 +116,7 @@ def init(): if _Debug: lg.out(_DebugLevel, 'groups.init') - load_groups() + open_known_groups() def shutdown(): @@ -201,7 +202,7 @@ def send_group_pub_key_to_suppliers(group_key_id): d = key_ring.transfer_key(group_key_id, supplier_idurl, include_private=False, include_signature=False) d.addCallback(lg.cb, debug=_Debug, debug_level=_DebugLevel, method='groups.write_group_key_to_suppliers') d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='groups.write_group_key_to_suppliers') - # TODO: build some kind of retry mechanism - in case of a particular supplier did not receive the key + # TODO: build some kind of retry mechanism - in case of a particular supplier did not received the key # it must be some process with each supplier that first verifies a list of my public keys supplier currently possess # and then transfer the missing keys or send a note to erase "unused" keys to be able to cleanup old keys l.append(d) @@ -211,6 +212,43 @@ def send_group_pub_key_to_suppliers(group_key_id): #------------------------------------------------------------------------------ +def open_known_groups(): + to_be_opened = [] + to_be_cached = [] + for key_id in my_keys.known_keys(): + if not key_id.startswith('group_'): + continue + if not my_keys.is_key_private(key_id): + continue + if not my_keys.is_active(key_id): + continue + to_be_opened.append(key_id) + _, customer_idurl = my_keys.split_key_id(key_id) + if not id_url.is_cached(customer_idurl): + to_be_cached.append(customer_idurl) + if _Debug: + lg.args(_DebugLevel, to_be_opened=to_be_opened, to_be_cached=to_be_cached) + if to_be_cached: + d = identitycache.start_multiple(to_be_cached) + d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='groups.open_known_groups') + d.addBoth(lambda _: prepare_known_groups(to_be_opened)) + return + prepare_known_groups(to_be_opened) + + +def prepare_known_groups(to_be_opened): + for group_key_id in to_be_opened: + _, customer_idurl = my_keys.split_key_id(group_key_id) + if not id_url.is_cached(customer_idurl): + lg.err('not able to open group %r, customer IDURL %r still was not cached' % (group_key_id, customer_idurl)) + continue + if not is_group_stored(group_key_id): + set_group_info(group_key_id) + set_group_active(group_key_id, bool(my_keys.is_active(group_key_id))) + save_group_info(group_key_id) + load_groups() + + def load_groups(): loaded_groups = 0 service_dir = settings.ServiceDir('service_private_groups') diff --git a/bitdust/access/shared_access_coordinator.py b/bitdust/access/shared_access_coordinator.py index aaca6af..c44383b 100644 --- a/bitdust/access/shared_access_coordinator.py +++ b/bitdust/access/shared_access_coordinator.py @@ -179,47 +179,48 @@ def populate_shares(): def open_known_shares(): - known_offline_shares = [] + to_be_opened = [] + to_be_cached = [] for key_id in my_keys.known_keys(): if not key_id.startswith('share_'): continue + if not my_keys.is_key_private(key_id): + continue if not my_keys.is_active(key_id): continue active_share = get_active_share(key_id) if active_share: continue - known_offline_shares.append(key_id) - to_be_opened = [] - for customer_idurl in backup_fs.known_customers(): - for key_alias in backup_fs.known_keys_aliases(customer_idurl): - if not key_alias.startswith('share_'): - continue - key_id = my_keys.make_key_id(alias=key_alias, creator_idurl=customer_idurl) - if not key_id: - continue - if my_keys.latest_key_id(key_id) != key_id: - continue - if key_id in to_be_opened: - continue - if key_id not in known_offline_shares: - continue - if not my_keys.is_key_private(key_id): - continue - if not my_keys.is_active(key_id): - continue - to_be_opened.append(key_id) + to_be_opened.append(key_id) + _, customer_idurl = my_keys.split_key_id(key_id) + if not id_url.is_cached(customer_idurl): + to_be_cached.append(customer_idurl) if _Debug: - lg.args(_DebugLevel, known_offline_shares=known_offline_shares, to_be_opened=to_be_opened) + lg.args(_DebugLevel, to_be_opened=to_be_opened, to_be_cached=to_be_cached) + if to_be_cached: + d = identitycache.start_multiple(to_be_cached) + d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='shared_access_coordinator.open_known_shares') + d.addBoth(lambda _: start_known_shares(to_be_opened)) + return + start_known_shares(to_be_opened) + + +def start_known_shares(to_be_opened): populate_shared_files = listeners.is_populate_required('shared_file') for key_id in to_be_opened: - active_share = SharedAccessCoordinator(key_id, log_events=True, publish_events=False) + _, customer_idurl = my_keys.split_key_id(key_id) + if not id_url.is_cached(customer_idurl): + lg.err('not able to open share %r, customer IDURL %r still was not cached' % (key_id, customer_idurl)) + continue + try: + active_share = SharedAccessCoordinator(key_id, log_events=True, publish_events=False) + except: + lg.exc() + continue active_share.automat('restart') if populate_shared_files: backup_fs.populate_shared_files(key_id=key_id) - # if populate_shared_files: - # listeners.populate_later().remove('shared_file') if listeners.is_populate_required('shared_location'): - # listeners.populate_later().remove('shared_location') populate_shares() @@ -487,6 +488,7 @@ def __init__(self, key_id, debug_level=_DebugLevel, log_events=_Debug, log_trans self.customer_idurl = self.glob_id['idurl'] self.known_suppliers_list = [] self.known_ecc_map = None + self.critical_suppliers_number = 1 self.dht_lookup_use_cache = True self.received_index_file_revision = {} self.last_time_in_sync = -1 @@ -594,18 +596,12 @@ def A(self, event, *args, **kwargs): if event == 'shutdown': self.state = 'CLOSED' self.doDestroyMe(*args, **kwargs) - elif event == 'timer-30sec' or event == 'all-suppliers-disconnected': - self.state = 'DISCONNECTED' - self.doReportDisconnected(*args, **kwargs) elif event == 'list-files-received': self.doSupplierSendIndexFile(*args, **kwargs) elif event == 'key-not-registered': self.doSupplierTransferPubKey(*args, **kwargs) elif event == 'supplier-connected' or event == 'key-sent': self.doSupplierRequestIndexFile(*args, **kwargs) - elif event == 'all-suppliers-connected': - self.state = 'CONNECTED' - self.doReportConnected(*args, **kwargs) elif event == 'index-sent' or event == 'index-up-to-date' or event == 'index-failed' or event == 'list-files-failed' or event == 'supplier-failed': self.doRemember(event, *args, **kwargs) self.doCheckAllConnected(*args, **kwargs) @@ -613,6 +609,12 @@ def A(self, event, *args, **kwargs): self.doSupplierProcessListFiles(*args, **kwargs) elif event == 'supplier-file-modified' or event == 'index-received' or event == 'index-missing': self.doSupplierRequestListFiles(event, *args, **kwargs) + elif (event == 'timer-30sec' and not self.isEnoughConnected(*args, **kwargs)) or event == 'all-suppliers-disconnected': + self.state = 'DISCONNECTED' + self.doReportDisconnected(*args, **kwargs) + elif (event == 'timer-30sec' and self.isEnoughConnected(*args, **kwargs)) or event == 'all-suppliers-connected': + self.state = 'CONNECTED' + self.doReportConnected(*args, **kwargs) #---DISCONNECTED--- elif self.state == 'DISCONNECTED': if event == 'shutdown': @@ -638,6 +640,14 @@ def A(self, event, *args, **kwargs): pass return None + def isEnoughConnected(self, *args, **kwargs): + """ + Action method. + """ + if _Debug: + lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=self.critical_suppliers_number) + return len(self.suppliers_succeed) >= self.critical_suppliers_number + def doInit(self, *args, **kwargs): """ Action method. @@ -666,6 +676,10 @@ def doConnectCustomerSuppliers(self, *args, **kwargs): self.automat('all-suppliers-disconnected') return self.known_ecc_map = args[0].get('ecc_map') + self.critical_suppliers_number = 1 + if self.known_ecc_map: + from bitdust.raid import eccmap + self.critical_suppliers_number = eccmap.GetCorrectableErrors(eccmap.GetEccMapSuppliersNumber(self.known_ecc_map)) self.suppliers_in_progress.clear() self.suppliers_succeed.clear() for supplier_idurl in self.known_suppliers_list: @@ -693,7 +707,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs): pkt_out = None if event == 'supplier-file-modified': remote_path = kwargs['remote_path'] - if remote_path == settings.BackupIndexFileName(): + if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path): if self.state == 'CONNECTED': self.automat('restart') else: @@ -770,7 +784,7 @@ def doSupplierProcessListFiles(self, *args, **kwargs): is_in_sync = True if _Debug: lg.args(_DebugLevel, rev=supplier_index_file_revision, is_in_sync=is_in_sync) - remote_files_changed, backups2remove, paths2remove, missed_backups = backup_matrix.process_raw_list_files( + remote_files_changed, backups2remove, paths2remove, _ = backup_matrix.process_raw_list_files( supplier_num=kwargs['supplier_pos'], list_files_text_body=kwargs['payload'], customer_idurl=self.customer_idurl, @@ -810,14 +824,10 @@ def doCheckAllConnected(self, *args, **kwargs): """ Action method. """ - critical_suppliers_number = 1 - if self.known_ecc_map: - from bitdust.raid import eccmap - critical_suppliers_number = eccmap.GetCorrectableErrors(eccmap.GetEccMapSuppliersNumber(self.known_ecc_map)) if _Debug: - lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=critical_suppliers_number) + lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=self.critical_suppliers_number) if len(self.suppliers_in_progress) == 0: - if len(self.suppliers_succeed) >= critical_suppliers_number: + if len(self.suppliers_succeed) >= self.critical_suppliers_number: self.automat('all-suppliers-connected') else: self.automat('all-suppliers-disconnected') @@ -880,6 +890,7 @@ def _do_connect_with_supplier(self, supplier_idurl): def _do_retrieve_index_file(self, supplier_idurl): packetID = global_id.MakeGlobalID( key_id=self.key_id, + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) sc = supplier_connector.by_idurl(supplier_idurl, customer_idurl=self.customer_idurl) @@ -920,6 +931,14 @@ def _do_process_index_file(self, wrapped_packet, supplier_idurl): lg.err('incoming Data() is not valid from supplier %r' % supplier_idurl) self.automat('supplier-failed', supplier_idurl=supplier_idurl) return + if id_url.is_cached(supplier_idurl): + self._do_read_index_file(wrapped_packet, supplier_idurl) + else: + d = identitycache.start_one(supplier_idurl) + d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='shared_access_coordinator._do_process_index_file') + d.addBoth(lambda _: self._do_read_index_file(wrapped_packet, supplier_idurl)) + + def _do_read_index_file(self, wrapped_packet, supplier_idurl): supplier_revision = backup_control.IncomingSupplierBackupIndex( wrapped_packet, key_id=self.key_id, @@ -933,6 +952,7 @@ def _do_process_index_file(self, wrapped_packet, supplier_idurl): def _do_send_index_file(self, supplier_idurl): packetID = global_id.MakeGlobalID( key_id=self.key_id, + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) data = bpio.ReadBinaryFile(settings.BackupIndexFilePath(self.customer_idurl, self.key_alias)) diff --git a/bitdust/automats/global_state.py b/bitdust/automats/global_state.py deleted file mode 100644 index ecd8ea9..0000000 --- a/bitdust/automats/global_state.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python -# global_state.py -# -# Copyright (C) 2008 Veselin Penev, https://bitdust.io -# -# This file (global_state.py) is part of BitDust Software. -# -# BitDust is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# BitDust Software is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with BitDust Software. If not, see . -# -# Please contact us if you have any questions at bitdust.io@gmail.com -# -# -# -# -# -""" -.. module:: global_state. - -This module is to keep track of changing states of State Machines. -It also remember the current ``global`` state of the program - this a stats of a several most important automats. -""" - -from __future__ import absolute_import - -#------------------------------------------------------------------------------ - -from bitdust.logs import lg - -from bitdust.automats import automat - -#------------------------------------------------------------------------------ - -_StatesDict = { - 'init at startup': 'beginning', - 'init local': 'local settings initialization', - 'init contacts': 'contacts initialization', - 'init connection': 'initializing connections', - 'init modules': 'starting modules', - 'init install': 'preparing install section', - 'network at startup': 'starting connection', - 'network stun': 'detecting external IP address', - 'network upnp': 'configuring UPnP', - 'network connected': 'internet connection is fine', - 'network disconnected': 'internet connection is not working', - 'network network?': 'checking network interfaces', - 'network google?': 'is www.google.com available?', - 'p2p at startup': 'initial peer-to-peer state', - 'p2p transports': 'starting network transports', - 'p2p propagate': 'propagate my identity', - 'p2p incomming?': 'waiting response from others', - 'p2p connected': 'ready', - 'p2p disconnected': 'starting disconnected', -} - -_GlobalState = 'AT_STARTUP' -_GlobalStateNotifyFunc = None - -#------------------------------------------------------------------------------ - - -def set_global_state(st): - """ - This method is called from State Machines when ``state`` is changed: - global_state.set_global_state('P2P ' + newstate) So ``st`` is a string - like: 'P2P CONNECTED'. - - ``_GlobalStateNotifyFunc`` can be used to keep track of changing - program states. - """ - global _GlobalState - global _GlobalStateNotifyFunc - oldstate = _GlobalState - _GlobalState = st - # lg.out(6, (' ' * 40) + '{%s}->{%s}' % (oldstate, _GlobalState)) - if _GlobalStateNotifyFunc is not None and oldstate != _GlobalState: - try: - _GlobalStateNotifyFunc(_GlobalState) - except: - lg.exc() - - -def get_global_state(): - """ - Return the current ``global state``, for example: P2P CONNECTED. - """ - global _GlobalState - # lg.out(6, 'global_state.get_global_state return [%s]' % _GlobalState) - return _GlobalState - - -def get_global_state_label(): - """ - Return a label describing current global state, for example: 'checking - network interfaces'. - """ - global _GlobalState - global _StatesDict - return _StatesDict.get(_GlobalState.replace('_', ' ').lower(), '') - - -def SetGlobalStateNotifyFunc(f): - """ - Set callback to catch a global state changed event. - """ - global _GlobalStateNotifyFunc - _GlobalStateNotifyFunc = f - - -def SetSingleStateNotifyFunc(f): - """ - Set callback to catch state change of any automat. - """ - automat.SetStateChangedCallback(f) - - -#------------------------------------------------------------------------------ diff --git a/bitdust/chat/message_database.py b/bitdust/chat/message_database.py index 6720539..0f3ef78 100644 --- a/bitdust/chat/message_database.py +++ b/bitdust/chat/message_database.py @@ -158,6 +158,8 @@ def init(filepath=None): _HistoryDB.commit() _HistoryCursor = _HistoryDB.cursor() + check_create_keys() + def shutdown(): global _HistoryDB @@ -665,7 +667,43 @@ def rebuild_conversations(): #------------------------------------------------------------------------------ -def check_create_rename_key(new_public_key, new_key_id, new_local_key_id): +def check_create_keys(): + to_be_opened = [] + to_be_cached = [] + for key_id in my_keys.known_keys(): + if not key_id.startswith('group_'): + continue + if not my_keys.is_key_private(key_id): + continue + if not my_keys.is_active(key_id): + continue + _, customer_idurl = my_keys.split_key_id(key_id) + if not id_url.is_cached(customer_idurl): + to_be_cached.append(customer_idurl) + else: + to_be_opened.append(key_id) + if to_be_cached: + lg.warn('still see %d not cached identities, not able to process those customers: %r' (len(to_be_cached), to_be_cached)) + if _Debug: + lg.args(_DebugLevel, to_be_opened=to_be_opened, to_be_cached=to_be_cached) + for key_id in to_be_opened: + check_create_rename_key(new_key_id=key_id) + + +def check_create_rename_key(new_key_id): + try: + new_public_key = my_keys.get_public_key_raw(new_key_id) + except: + lg.exc() + return False + try: + new_local_key_id = my_keys.get_local_key_id(new_key_id) + except: + lg.exc() + return False + if new_local_key_id is None: + lg.err('did not found local_key_id for %r' % new_key_id) + return False conversation_type = None if new_key_id.startswith('group_'): conversation_type = 'group_message' @@ -680,6 +718,8 @@ def check_create_rename_key(new_public_key, new_key_id, new_local_key_id): new_public_key, ] found_public_keys = list(cur().execute(sql, params)) + if _Debug: + lg.args(_DebugLevel, found_public_keys=found_public_keys) if found_public_keys: if len(found_public_keys) > 1: raise Exception('found multiple records for same public key: %r' % found_public_keys) diff --git a/bitdust/contacts/contactsdb.py b/bitdust/contacts/contactsdb.py index 73e03fd..ba72fc1 100644 --- a/bitdust/contacts/contactsdb.py +++ b/bitdust/contacts/contactsdb.py @@ -847,7 +847,6 @@ def load_contacts(): _CorrespondentsChangedCallback([], correspondents()) AddContactsChangedCallback(on_contacts_changed) if listeners.is_populate_required('correspondent'): - # listeners.populate_later().remove('correspondent') populate_correspondents() diff --git a/bitdust/crypt/encrypted.py b/bitdust/crypt/encrypted.py index a4364f1..c3320a9 100644 --- a/bitdust/crypt/encrypted.py +++ b/bitdust/crypt/encrypted.py @@ -290,7 +290,8 @@ def Unserialize(data, decrypt_key=None): _s = dct['s'] _b = dct['b'] except Exception as exc: - lg.exc('data unserialize failed with %r: %r\n%r\n%s' % ( + lg.exc('data unserialize failed using key %r with %r: %r\n%r\n%s' % ( + decrypt_key, exc, list(dct.keys()), (dct.get('c'), dct.get('b'), dct.get('i'), dct.get('r')), diff --git a/bitdust/crypt/my_keys.py b/bitdust/crypt/my_keys.py index 9fb5670..a05a3a0 100644 --- a/bitdust/crypt/my_keys.py +++ b/bitdust/crypt/my_keys.py @@ -34,7 +34,7 @@ #------------------------------------------------------------------------------ _Debug = False -_DebugLevel = 12 +_DebugLevel = 10 #------------------------------------------------------------------------------ diff --git a/bitdust/customer/fire_hire.py b/bitdust/customer/fire_hire.py index 1a19d07..385ba1e 100644 --- a/bitdust/customer/fire_hire.py +++ b/bitdust/customer/fire_hire.py @@ -120,7 +120,6 @@ from bitdust.logs import lg -from bitdust.automats import global_state from bitdust.automats import automat from bitdust.lib import misc @@ -267,7 +266,6 @@ def state_changed(self, oldstate, newstate, event, *args, **kwargs): """ This method intended to catch the moment when automat's state was changed. """ - global_state.set_global_state('FIREHIRE ' + newstate) if newstate == 'READY' and event != 'instant': self.automat('instant') diff --git a/bitdust/lib/packetid.py b/bitdust/lib/packetid.py index 6fff413..0ceadf6 100644 --- a/bitdust/lib/packetid.py +++ b/bitdust/lib/packetid.py @@ -517,3 +517,17 @@ def SplitQueueMessagePacketID(packet_id): packet_id = packet_id[6:] queue_id, _, unique_id = packet_id.rpartition('_') return queue_id, unique_id + + +def MakeIndexFileNamePacketID(): + return '.index.{}'.format(UniqueID()) + + +def IsIndexFileName(path): + if len(path) < 19 or len(path) > 23: + return False + if not path.startswith('.index.'): + return False + if not path[7:].isdecimal(): + return False + return True diff --git a/bitdust/p2p/network_connector.py b/bitdust/p2p/network_connector.py index b2afaf0..7c7e7fe 100644 --- a/bitdust/p2p/network_connector.py +++ b/bitdust/p2p/network_connector.py @@ -90,7 +90,6 @@ from bitdust.logs import lg from bitdust.automats import automat -from bitdust.automats import global_state from bitdust.lib import net_misc from bitdust.lib import misc @@ -146,6 +145,7 @@ def Destroy(): class NetworkConnector(automat.Automat): + """ Class to monitor Internet connection and reconnect when needed. """ @@ -174,7 +174,6 @@ def init(self): net_misc.SetConnectionFailedCallbackFunc(ConnectionFailedCallback) def state_changed(self, oldstate, newstate, event, *args, **kwargs): - global_state.set_global_state('NETWORK ' + newstate) if driver.is_on('service_p2p_hookups'): from bitdust.p2p import p2p_connector from bitdust.system import tray_icon diff --git a/bitdust/services/service_backups.py b/bitdust/services/service_backups.py index 962d686..eb5634c 100644 --- a/bitdust/services/service_backups.py +++ b/bitdust/services/service_backups.py @@ -73,7 +73,6 @@ def start(self): events.add_subscriber(self._on_my_identity_rotated, 'my-identity-rotated') events.add_subscriber(self._on_key_erased, 'key-erased') if listeners.is_populate_required('remote_version'): - # listeners.populate_later().remove('remote_version') backup_matrix.populate_remote_versions() return True diff --git a/bitdust/services/service_keys_registry.py b/bitdust/services/service_keys_registry.py index 99d8afd..0df67f7 100644 --- a/bitdust/services/service_keys_registry.py +++ b/bitdust/services/service_keys_registry.py @@ -56,7 +56,6 @@ def start(self): callback.add_outbox_callback(self._on_outbox_packet_sent) callback.append_inbox_callback(self._on_inbox_packet_received) if listeners.is_populate_required('key'): - # listeners.populate_later().remove('key') my_keys.populate_keys() return True diff --git a/bitdust/services/service_message_history.py b/bitdust/services/service_message_history.py index 3bf2a54..fd2bd79 100644 --- a/bitdust/services/service_message_history.py +++ b/bitdust/services/service_message_history.py @@ -44,8 +44,7 @@ class MessageHistoryService(LocalService): def dependent_on(self): return [ - 'service_my_data', - 'service_private_messages', + 'service_private_groups', ] def start(self): @@ -60,10 +59,8 @@ def start(self): events.add_subscriber(self.on_key_generated, 'key-generated') events.add_subscriber(self.on_key_erased, 'key-erased') if listeners.is_populate_required('conversation'): - # listeners.populate_later().remove('conversation') message_database.populate_conversations() if listeners.is_populate_required('message'): - # listeners.populate_later().remove('message') message_database.populate_messages() return True @@ -83,13 +80,16 @@ def health_check(self): return True def on_key_generated(self, evt): - self.do_check_create_rename_key(evt.data['key_id']) + from bitdust.chat import message_database + message_database.check_create_rename_key(new_key_id=evt.data['key_id']) def on_key_registered(self, evt): - self.do_check_create_rename_key(evt.data['key_id']) + from bitdust.chat import message_database + message_database.check_create_rename_key(new_key_id=evt.data['key_id']) def on_key_renamed(self, evt): - self.do_check_create_rename_key(evt.data['new_key_id']) + from bitdust.chat import message_database + message_database.check_create_rename_key(new_key_id=evt.data['new_key_id']) def on_key_erased(self, evt): from bitdust.main import listeners @@ -97,26 +97,3 @@ def on_key_erased(self, evt): if evt.data['key_id'].startswith('group_'): conversation_id = message_database.get_conversation_id(evt.data['local_key_id'], evt.data['local_key_id'], 3) listeners.push_snapshot('conversation', snap_id=conversation_id, deleted=True) - - def do_check_create_rename_key(self, new_key_id): - from bitdust.logs import lg - from bitdust.crypt import my_keys - from bitdust.chat import message_database - try: - new_public_key = my_keys.get_public_key_raw(new_key_id) - except: - lg.exc() - return - try: - new_local_key_id = my_keys.get_local_key_id(new_key_id) - except: - lg.exc() - return - if new_local_key_id is None: - lg.err('did not found local_key_id for %r' % new_key_id) - return - message_database.check_create_rename_key( - new_public_key=new_public_key, - new_key_id=new_key_id, - new_local_key_id=new_local_key_id, - ) diff --git a/bitdust/services/service_my_data.py b/bitdust/services/service_my_data.py index de6d7bf..0b1c77d 100644 --- a/bitdust/services/service_my_data.py +++ b/bitdust/services/service_my_data.py @@ -70,7 +70,6 @@ def start(self): if keys_synchronizer.is_synchronized() and index_synchronizer.is_synchronized(): self.confirm_service_started(result=True) if listeners.is_populate_required('private_file'): - # listeners.populate_later().remove('private_file') backup_fs.populate_private_files() else: lg.warn('can not start service_my_data right now, keys_synchronizer.is_synchronized=%r index_synchronizer.is_synchronized=%r' % (keys_synchronizer.is_synchronized(), index_synchronizer.is_synchronized())) @@ -90,7 +89,6 @@ def _on_my_storage_ready(self, evt): if self.starting_deferred: self.confirm_service_started(result=True) if listeners.is_populate_required('private_file'): - # listeners.populate_later().remove('private_file') backup_fs.populate_private_files() if driver.is_enabled('service_my_data'): if not driver.is_started('service_my_data'): diff --git a/bitdust/services/service_p2p_hookups.py b/bitdust/services/service_p2p_hookups.py index a30f68b..0512718 100644 --- a/bitdust/services/service_p2p_hookups.py +++ b/bitdust/services/service_p2p_hookups.py @@ -76,7 +76,6 @@ def start(self): events.add_subscriber(self._on_identity_url_changed, 'identity-url-changed') events.add_subscriber(self._on_my_identity_url_changed, 'my-identity-url-changed') if listeners.is_populate_required('online_status'): - # listeners.populate_later().remove('online_status') online_status.populate_online_statuses() return True diff --git a/bitdust/storage/backup_fs.py b/bitdust/storage/backup_fs.py index dbae219..e7079b3 100644 --- a/bitdust/storage/backup_fs.py +++ b/bitdust/storage/backup_fs.py @@ -242,6 +242,8 @@ def commit(new_revision_number=None, customer_idurl=None, key_alias='master'): new_v = _RevisionNumber[customer_idurl][key_alias] if _Debug: lg.args(_DebugLevel, old=old_v, new=new_v, c=customer_idurl, k=key_alias) + if old_v == -1 and new_v > old_v: + lg.info('committed first revision %r for customer:%s key_alias:%s' % (new_v, customer_idurl, key_alias)) return old_v, new_v @@ -626,7 +628,7 @@ def MakeID(itr, randomized=True): for k in itr.keys(): if k == 0: continue - if k == settings.BackupIndexFileName(): + if k == settings.BackupIndexFileName() or packetid.IsIndexFileName(k): continue try: if isinstance(itr[k], int): @@ -1937,7 +1939,7 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_ cur_revision = revision(customer_idurl, key_alias) if cur_revision >= new_revision: if _Debug: - lg.dbg(_DebugLevel, 'ignore items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision)) + lg.dbg(_DebugLevel, 'ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision)) continue count = 0 count_modified = 0 @@ -1979,7 +1981,9 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_ def _one_item(path_id, path, info): if path_id not in known_items: - if path_id != settings.BackupIndexFileName(): + if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id): + pass + else: to_be_removed_items.add(path_id) TraverseByID(_one_item, iterID=fsID(customer_idurl, key_alias)) @@ -2027,7 +2031,7 @@ def _one_item(path_id, path, info): updated_keys.append(key_alias) if key_alias.startswith('share_'): for new_file_item in new_files: - if new_file_item.path_id == settings.BackupIndexFileName(): + if new_file_item.path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(new_file_item.path_id): continue new_file_path = ToPath(new_file_item.path_id, iterID=fsID(customer_idurl, key_alias)) if new_file_path: diff --git a/bitdust/storage/backup_matrix.py b/bitdust/storage/backup_matrix.py index 3411d01..d467f9d 100644 --- a/bitdust/storage/backup_matrix.py +++ b/bitdust/storage/backup_matrix.py @@ -300,7 +300,7 @@ def process_line_dir(line, current_key_alias=None, customer_idurl=None, is_in_sy pth = line path_id = pth.strip('/') if auto_create and is_in_sync: - if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids: + if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids: if not backup_fs.ExistsID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)): if _Debug: lg.out(_DebugLevel, ' AUTO CREATE DIR "%s" in the index' % pth) @@ -360,7 +360,7 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s filesz = -1 path_id = pth.strip('/') if auto_create and is_in_sync: - if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids: + if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids: if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)): if _Debug: lg.out(_DebugLevel, ' AUTO CREATE FILE "%s" in the index' % pth) @@ -376,12 +376,12 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s modified = True if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)): # remote supplier have some file - but we don't have it in the index - if path_id == settings.BackupIndexFileName(): + if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id): # this is the index file saved on remote supplier # must remember its size and put it in the backup_fs item = backup_fs.FSItemInfo( - name=path_id, - path_id=path_id, + name=settings.BackupIndexFileName(), + path_id=settings.BackupIndexFileName(), typ=backup_fs.FILE, key_id=global_id.MakeGlobalID(idurl=customer_idurl, key_alias=current_key_alias), ) @@ -495,7 +495,15 @@ def process_line_version(line, supplier_num, current_key_alias=None, customer_id lg.out(_DebugLevel, ' VERSION "%s" to be removed, path not found in the index' % backupID) else: if _Debug: - lg.out(_DebugLevel, ' VERSION "%s" skip removing, index not in sync' % backupID) + lg.out(_DebugLevel, ' VERSION "%s" skip removing, path not found, index not in sync' % backupID) + # not in sync yet, and backup FS item do not exist, but there is a data stored on supplier host + # must remember that in the backup matrix + stored_files, is_complete = process_version_data(words, customer_idurl, backupID, supplier_num, maxBlockNum) + if stored_files is None and is_complete is None: + return modified, backups2remove, paths2remove, found_backups, newfiles + newfiles += stored_files + if is_complete: + found_backups.add(backupID) return modified, backups2remove, paths2remove, found_backups, newfiles if auto_create and is_in_sync: if not item.has_version(versionName): @@ -529,25 +537,74 @@ def process_line_version(line, supplier_num, current_key_alias=None, customer_id lg.out(_DebugLevel, ' VERSION "%s" to be removed, version is not found in the index' % backupID) else: if _Debug: - lg.out(_DebugLevel, ' VERSION "%s" skip removing, index not in sync' % backupID) + lg.out(_DebugLevel, ' VERSION "%s" skip removing, version is not found, index not in sync' % backupID) + # not in sync yet, and backup FS version do not exist, but there is a data stored on supplier host + # must remember that in the backup matrix + stored_files, is_complete = process_version_data(words, customer_idurl, backupID, supplier_num, maxBlockNum) + if stored_files is None and is_complete is None: + return modified, backups2remove, paths2remove, found_backups, newfiles + newfiles += stored_files + if is_complete: + found_backups.add(backupID) return modified, backups2remove, paths2remove, found_backups, newfiles - else: - if _Debug: - lg.out(_DebugLevel, ' VERSION "%s" is found in the index' % backupID) + if _Debug: + lg.out(_DebugLevel, ' VERSION "%s" is found in the index' % backupID) item_version_info = item.get_version_info(versionName) + # process single line related to a certain backupID and update backup matrix with actual stats + stored_files, is_complete = process_version_data(words, customer_idurl, backupID, supplier_num, maxBlockNum) + if stored_files is None and is_complete is None: + return modified, backups2remove, paths2remove, found_backups, newfiles + newfiles += stored_files + if is_complete: + found_backups.add(backupID) + # updated backup FS item version + if item_version_info[0] != maxBlockNum or (item_version_info[1] in [None, -1, 0] and versionSize > 0): + if _Debug: + lg.out(_DebugLevel, ' updating version %s info, maxBlockNum %r->%r, size %r->%r' % ( + backupID, + item_version_info[0], + maxBlockNum, + item_version_info[1], + versionSize, + )) + item.set_version_info(versionName, maxBlockNum, versionSize) + modified = True + if file_auto_created or newfiles: + full_remote_path = global_id.MakeGlobalID(path=item.name(), key_id=item.key_id) + full_remote_path_id = global_id.MakeGlobalID(path=item.path_id, key_id=item.key_id) + _, percent, _, weakPercent = GetBackupRemoteStats(backupID) + snapshot = dict( + backup_id=backupID, + max_block=maxBlockNum, + remote_path=full_remote_path, + global_id=full_remote_path_id, + type=item.type, + filesize=item.size, + size=versionSize, + key_id=item.key_id, + delivered=misc.percent2string(percent), + reliable=misc.percent2string(weakPercent), + ) + listeners.push_snapshot('remote_version', snap_id=backupID, data=snapshot) + return modified, backups2remove, paths2remove, found_backups, newfiles + + +def process_version_data(words, customer_idurl, backupID, supplier_num, maxBlockNum): + stored_files = 0 + is_complete = False missingBlocksSet = {'Data': set(), 'Parity': set()} if len(words) > 4: # "0/0/123/4567/F20090709034221PM/0-Data" "3" "0-5" "434353" "missing" "Data:1,3" "Parity:0,1,2" if words[4].strip() != 'missing': - lg.err('incorrect line:[%s]' % line) - return modified, backups2remove, paths2remove, found_backups, newfiles + lg.err('incorrect line: %r' % words) + return None, None for missingBlocksString in words[5:]: try: dp, blocks = missingBlocksString.split(':') missingBlocksSet[dp] = set(blocks.split(',')) except: lg.exc() - return modified, backups2remove, paths2remove, found_backups, newfiles + return None, None if backupID not in remote_files(): remote_files()[backupID] = {} if _Debug: @@ -563,43 +620,17 @@ def process_line_version(line, supplier_num, current_key_alias=None, customer_id # we set -1 if the file is missing and 1 if exist, so 0 mean "no info yet" ... smart! bit = -1 if str(blockNum) in missingBlocksSet[dataORparity] else 1 remote_files()[backupID][blockNum][dataORparity[0]][supplier_num] = bit - newfiles += int((bit + 1)/2) # this should switch -1 or 1 to 0 or 1 + stored_files += int((bit + 1)/2) # this should switch -1 or 1 to 0 or 1 # save max block number for this backup if backupID not in remote_max_block_numbers(): remote_max_block_numbers()[backupID] = -1 if maxBlockNum > remote_max_block_numbers()[backupID]: remote_max_block_numbers()[backupID] = maxBlockNum if len(missingBlocksSet['Data']) == 0 and len(missingBlocksSet['Parity']) == 0: - found_backups.add(backupID) - if item_version_info[0] != maxBlockNum or (item_version_info[1] in [None, -1, 0] and versionSize > 0): - if _Debug: - lg.out(_DebugLevel, ' updating version %s info, maxBlockNum %r->%r, size %r->%r' % ( - backupID, - item_version_info[0], - maxBlockNum, - item_version_info[1], - versionSize, - )) - item.set_version_info(versionName, maxBlockNum, versionSize) - modified = True - if file_auto_created or newfiles: - full_remote_path = global_id.MakeGlobalID(path=item.name(), key_id=item.key_id) - full_remote_path_id = global_id.MakeGlobalID(path=item.path_id, key_id=item.key_id) - _, percent, _, weakPercent = GetBackupRemoteStats(backupID) - snapshot = dict( - backup_id=backupID, - max_block=maxBlockNum, - remote_path=full_remote_path, - global_id=full_remote_path_id, - type=item.type, - filesize=item.size, - size=versionSize, - key_id=item.key_id, - delivered=misc.percent2string(percent), - reliable=misc.percent2string(weakPercent), - ) - listeners.push_snapshot('remote_version', snap_id=backupID, data=snapshot) - return modified, backups2remove, paths2remove, found_backups, newfiles + is_complete = True + if _Debug: + lg.args(_DebugLevel, b=backupID, stored_files=stored_files, is_complete=is_complete) + return stored_files, is_complete def process_raw_list_files(supplier_num, list_files_text_body, customer_idurl=None, is_in_sync=None): @@ -847,7 +878,7 @@ def visit(key_id, realpath, subpath, name): return True if realpath.startswith('newblock-'): return False - if subpath == settings.BackupIndexFileName(): + if subpath == settings.BackupIndexFileName() or packetid.IsIndexFileName(subpath): return False try: version = subpath.split('/')[-2] diff --git a/bitdust/storage/backup_rebuilder.py b/bitdust/storage/backup_rebuilder.py index 79e2a64..0206aac 100644 --- a/bitdust/storage/backup_rebuilder.py +++ b/bitdust/storage/backup_rebuilder.py @@ -93,6 +93,8 @@ from bitdust.main import settings +from bitdust.main import listeners + from bitdust.contacts import contactsdb from bitdust.userid import my_id @@ -182,7 +184,13 @@ def state_changed(self, oldstate, newstate, event, *args, **kwargs): Need to notify backup_monitor() machine about my new state. """ - # global_state.set_global_state('REBUILD ' + newstate) + data = self.to_json() + if newstate in ['STOPPED', 'DONE']: + data['rebuilding'] = False + listeners.push_snapshot('backup_rebuilder', snap_id='1', data=data) + else: + data['rebuilding'] = True + listeners.push_snapshot('backup_rebuilder', snap_id='1', data=data) if newstate in [ 'NEXT_BACKUP', 'REQUEST', diff --git a/bitdust/storage/index_synchronizer.py b/bitdust/storage/index_synchronizer.py index 602be3f..c58ab75 100644 --- a/bitdust/storage/index_synchronizer.py +++ b/bitdust/storage/index_synchronizer.py @@ -225,10 +225,7 @@ def state_changed(self, oldstate, newstate, event, *args, **kwargs): self.last_time_in_sync = time.time() if self.PushAgain: reactor.callLater(0, self.automat, 'instant') # @UndefinedVariable - if newstate == 'NO_INFO' and oldstate in [ - 'REQUEST?', - 'SENDING', - ]: + if newstate == 'NO_INFO' and oldstate in ['REQUEST?', 'SENDING']: events.send('my-backup-index-out-of-sync', data={}) if newstate == 'NO_INFO': self.last_time_in_sync = -1 @@ -380,6 +377,7 @@ def doSuppliersSendIndexFile(self, *args, **kwargs): """ packetID = global_id.MakeGlobalID( customer=my_id.getGlobalID(key_alias='master'), + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) self.sending_suppliers.clear() @@ -516,6 +514,7 @@ def _on_supplier_acked(self, newpacket, info): def _do_retrieve(self, x=None): packetID = global_id.MakeGlobalID( customer=my_id.getGlobalID(key_alias='master'), + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) localID = my_id.getIDURL() @@ -543,4 +542,4 @@ def _do_retrieve(self, x=None): self.requested_suppliers_number += 1 self.requests_packets_sent.append((packetID, supplier_idurl)) if _Debug: - lg.out(_DebugLevel, ' %s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl))) + lg.dbg(_DebugLevel, '%s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl))) diff --git a/bitdust/storage/keys_synchronizer.py b/bitdust/storage/keys_synchronizer.py index ec47ab5..66be1ef 100644 --- a/bitdust/storage/keys_synchronizer.py +++ b/bitdust/storage/keys_synchronizer.py @@ -269,8 +269,10 @@ def doPrepare(self, *args, **kwargs): self.keys_to_erase = {} self.keys_to_rename = {} lookup = backup_fs.ListChildsByPath(path='.keys', recursive=False, backup_info_callback=restore_monitor.GetBackupStatusInfo) + minimum_reliable_percent = eccmap.GetCorrectablePercent(eccmap.Current().suppliers_number) + if _Debug: + lg.args(_DebugLevel, minimum_reliable_percent=minimum_reliable_percent, lookup=lookup) if isinstance(lookup, list): - minimum_reliable_percent = eccmap.GetCorrectablePercent(eccmap.Current().suppliers_number) for i in lookup: if i['path'].endswith('.public'): stored_key_id = i['path'].replace('.public', '').replace('.keys/', '') @@ -292,6 +294,8 @@ def doPrepare(self, *args, **kwargs): if reliable >= minimum_reliable_percent: is_reliable = True break + if _Debug: + lg.args(_DebugLevel, i=i, stored_key_id=stored_key_id, is_reliable=is_reliable) if is_reliable: self.stored_keys[stored_key_id] = is_private else: diff --git a/bitdust/storage/restore_worker.py b/bitdust/storage/restore_worker.py index 7a1ad14..ab59e16 100644 --- a/bitdust/storage/restore_worker.py +++ b/bitdust/storage/restore_worker.py @@ -176,7 +176,7 @@ def __init__(self, BackupID, OutputFile, KeyID=None, ecc_map=None, debug_level=_ self.blockRestoredCallback = None self.Attempts = 0 - super(RestoreWorker, self).__init__(name='restore_worker_%s' % self.version, state='AT_STARTUP', debug_level=debug_level, log_events=log_events, log_transitions=log_transitions, publish_events=publish_events, **kwargs) + super(RestoreWorker, self).__init__(name='restore_%s' % self.version, state='AT_STARTUP', debug_level=debug_level, log_events=log_events, log_transitions=log_transitions, publish_events=publish_events, **kwargs) events.send('restore-started', data=dict(backup_id=self.backup_id)) def set_packet_in_callback(self, cb): @@ -377,7 +377,11 @@ def doInit(self, *args, **kwargs): num_suppliers = settings.DefaultDesiredSuppliers() self.EccMap = eccmap.eccmap(eccmap.GetEccMapName(num_suppliers)) lg.warn('no meta info found, guessed ECC map %r from %d known suppliers' % (self.EccMap, len(self.known_suppliers))) - self.max_errors = eccmap.GetCorrectableErrors(self.EccMap.NumSuppliers()) + # TODO: here we multiply by two because we have always two packets for each fragment: Data and Parity + # so number of possible errors can be two times larger + # however we may also add another check here to identify dead suppliers as well + # and dead suppliers number must be lower than "max_errors" in order restore to continue + self.max_errors = eccmap.GetCorrectableErrors(self.EccMap.NumSuppliers())*2 if data_receiver.A(): data_receiver.A().addStateChangedCallback(self._on_data_receiver_state_changed) diff --git a/bitdust/stream/io_throttle.py b/bitdust/stream/io_throttle.py index f4843b6..87fb98c 100644 --- a/bitdust/stream/io_throttle.py +++ b/bitdust/stream/io_throttle.py @@ -258,6 +258,7 @@ def GetRequestQueueLength(supplierIDURL): class SupplierQueue: + def __init__(self, supplierIdentity, creatorID, customerIDURL=None): self.customerIDURL = customerIDURL if self.customerIDURL is None: @@ -733,10 +734,12 @@ def GetRequestQueueLength(self): class IOThrottle: + """ All of the backup rebuilds will run their data requests through this So it gets throttled, also to reduce duplicate requests. """ + def __init__(self): self.creatorID = my_id.getIDURL() self.supplierQueues = {} @@ -796,7 +799,7 @@ def QueueRequestFile(self, callOnReceived, creatorID, packetID, ownerID, remoteI remoteID = id_url.field(remoteID) ownerID = id_url.field(ownerID) creatorID = id_url.field(creatorID) - if packetID != settings.BackupIndexFileName(): + if packetID != settings.BackupIndexFileName() and not packetid.IsIndexFileName(packetID): customer, pathID = packetid.SplitPacketID(packetID) filename = os.path.join(settings.getLocalBackupsDir(), customer, pathID) if os.path.exists(filename): diff --git a/bitdust/supplier/customer_space.py b/bitdust/supplier/customer_space.py index 2b2f21f..9635015 100644 --- a/bitdust/supplier/customer_space.py +++ b/bitdust/supplier/customer_space.py @@ -274,6 +274,8 @@ def make_filename(customerGlobID, filePath, keyAlias=None): if _Debug: lg.dbg(_DebugLevel, 'making a new folder: %s' % keyAliasDir) bpio._dir_make(keyAliasDir) + if packetid.IsIndexFileName(filePath): + filePath = settings.BackupIndexFileName() filename = os.path.join(keyAliasDir, filePath) return filename @@ -289,7 +291,7 @@ def make_valid_filename(customerIDURL, glob_path): if not customerGlobID: lg.warn('customer id is empty: %r' % glob_path) return '' - if filePath != settings.BackupIndexFileName(): + if filePath != settings.BackupIndexFileName() and not packetid.IsIndexFileName(filePath): if not packetid.Valid(filePath): # SECURITY lg.warn('invalid file path') return '' @@ -415,7 +417,7 @@ def on_data(newpacket): data_exists = not os.path.exists(filename) data_changed = True if not data_exists: - if remote_path == settings.BackupIndexFileName(): + if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path): current_data = bpio.ReadBinaryFile(filename) if current_data == new_data: lg.warn('skip rewriting existing file %s' % filename) @@ -431,8 +433,8 @@ def on_data(newpacket): p2p_service.SendAck(newpacket, response=strng.to_text(sz), remote_idurl=authorized_idurl) reactor.callLater(0, local_tester.TestSpaceTime) # @UndefinedVariable if key_alias != 'master' and data_changed: - if remote_path == settings.BackupIndexFileName(): - do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl) + if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path): + do_notify_supplier_file_modified(key_alias, settings.BackupIndexFileName(), 'write', customer_idurl, authorized_idurl) else: if packetid.BlockNumber(newpacket.PacketID) == 0: do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl) @@ -531,12 +533,15 @@ def on_retrieve(newpacket): # it can be not a new Data(), but the old data returning back as a response to Retreive() packet # to solve the issue we will create a new Data() packet # which will be addressed directly to recipient and "wrap" stored data inside it + return_packet_id = stored_packet.PacketID + if packetid.IsIndexFileName(glob_path['path']): + return_packet_id = newpacket.PacketID payload = stored_packet.Serialize() - routed_packet = signed.Packet( + return_packet = signed.Packet( Command=commands.Data(), OwnerID=stored_packet.OwnerID, CreatorID=my_id.getIDURL(), - PacketID=stored_packet.PacketID, + PacketID=return_packet_id, Payload=payload, RemoteID=recipient_idurl, ) @@ -544,12 +549,12 @@ def on_retrieve(newpacket): lg.args(_DebugLevel, file_size=sz, payload_size=len(payload), fn=filename, recipient=recipient_idurl) if recipient_idurl == stored_packet.OwnerID: if _Debug: - lg.dbg(_DebugLevel, 'from request %r : sending %r back to owner: %s' % (newpacket, stored_packet, recipient_idurl)) - gateway.outbox(routed_packet) + lg.dbg(_DebugLevel, 'from request %r : sending back %r in %r to owner: %s' % (newpacket, stored_packet, return_packet, recipient_idurl)) + gateway.outbox(return_packet) return True if _Debug: - lg.dbg(_DebugLevel, 'from request %r : returning data owned by %s to %s' % (newpacket, stored_packet.OwnerID, recipient_idurl)) - gateway.outbox(routed_packet) + lg.dbg(_DebugLevel, 'from request %r : returning data %r in %r owned by %s to %s' % (newpacket, stored_packet, return_packet, stored_packet.OwnerID, recipient_idurl)) + gateway.outbox(return_packet) return True diff --git a/bitdust/transport/packet_out.py b/bitdust/transport/packet_out.py index f6a0b22..8110b94 100644 --- a/bitdust/transport/packet_out.py +++ b/bitdust/transport/packet_out.py @@ -260,10 +260,7 @@ def search_by_response_packet(newpacket=None, proto=None, host=None, outgoing_co ) matching_packet_ids = [] matching_packet_ids.append(incoming_packet_id.lower()) - if incoming_command and incoming_command in [ - commands.Data(), - commands.Retrieve(), - ] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL(): + if incoming_command and incoming_command in [commands.Data(), commands.Retrieve()] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL(): my_rotated_idurls = id_url.list_known_idurls(my_id.getIDURL(), num_revisions=10, include_revisions=False) # TODO: my_rotated_idurls can be cached for optimization for another_idurl in my_rotated_idurls: diff --git a/bitdust/transport/proxy/proxy_receiver.py b/bitdust/transport/proxy/proxy_receiver.py index f3ac412..60305a6 100644 --- a/bitdust/transport/proxy/proxy_receiver.py +++ b/bitdust/transport/proxy/proxy_receiver.py @@ -756,7 +756,7 @@ def _do_send_identity_to_router(self, identity_source, failed_event): Command=commands.Identity(), OwnerID=my_id.getIDURL(), CreatorID=my_id.getIDURL(), - PacketID=('proxy_receiver:%s' % packetid.UniqueID()), + PacketID='proxy_receiver:%s' % packetid.UniqueID(), Payload=identity_obj.serialize(), RemoteID=self.router_idurl, ) diff --git a/bitdust_forks/Bismuth/mining_heavy3.py b/bitdust_forks/Bismuth/mining_heavy3.py index 9a3c574..0318a21 100644 --- a/bitdust_forks/Bismuth/mining_heavy3.py +++ b/bitdust_forks/Bismuth/mining_heavy3.py @@ -11,6 +11,7 @@ import struct import sys import threading +import time from hashlib import sha224 from hmac_drbg import DRBG @@ -190,6 +191,7 @@ def mining_open(file_name='heavy3a.bin'): print(f'Junction memory-map file already loaded in {threading.current_thread().name}') return print(f'Loading Junction memory-map file from {file_name} in {threading.current_thread().name}') + _t = time.time() if os.path.isfile(file_name): size = os.path.getsize(file_name) if size != 1073741824: @@ -216,6 +218,7 @@ def mining_open(file_name='heavy3a.bin'): # print('error while loading Junction file: {}'.format(e)) # sys.exit() raise e + print(f'Loaded memory-map in {time.time() - _t} seconds, RND_LEN={RND_LEN}') # print('mining_open', file_name, threading.current_thread()) @@ -225,6 +228,7 @@ def mining_close(): """ if (not heavy) and is_regnet: # print('Regnet, no heavy file to close') + print(f'Junction memory-map file already closed in {threading.current_thread().name}') return global F global MMAP @@ -241,3 +245,4 @@ def mining_close(): except: pass F = None + print(f'Junction memory-map file closed in {threading.current_thread().name}') diff --git a/regress/scenarios.py b/regress/scenarios.py index 997539a..3af56a8 100644 --- a/regress/scenarios.py +++ b/regress/scenarios.py @@ -102,6 +102,11 @@ 'supplier-1', 'supplier-2', ] +SUPPLIERS_IDS_123 = [ + 'supplier-1', + 'supplier-2', + 'supplier-3', +] CUSTOMERS_IDS = [ 'customer-1', 'customer-2', @@ -2676,10 +2681,20 @@ def scenario27(): assert customer_1_groupB_messages_before > 0 assert customer_2_groupB_messages_before > 0 + # prepare customers before supplier-1 goes offline, replace supplier-1 with supplier-3 + kw.config_set_v1('customer-1', 'services/employer/candidates', 'http://id-a:8084/supplier-3.xml,http://id-a:8084/supplier-2.xml') + kw.config_set_v1('customer-2', 'services/employer/candidates', 'http://id-a:8084/supplier-3.xml,http://id-a:8084/supplier-2.xml') + kw.config_set_v1('customer-3', 'services/employer/candidates', 'http://id-a:8084/supplier-3.xml,http://id-a:8084/supplier-2.xml') + # stop supplier-1 kw.wait_packets_finished(CUSTOMERS_IDS_123 + SUPPLIERS_IDS_12) kw.config_set_v1('supplier-1', 'services/network/enabled', 'false') - kw.wait_packets_finished(CUSTOMERS_IDS_123 + ['supplier-2', ]) + kw.wait_packets_finished(CUSTOMERS_IDS_123 + ['supplier-2', 'supplier-3', ]) + + # make sure customers are all switched to the new supplier + kw.supplier_list_v1('customer-1', expected_min_suppliers=2, expected_max_suppliers=2) + kw.supplier_list_v1('customer-2', expected_min_suppliers=2, expected_max_suppliers=2) + kw.supplier_list_v1('customer-3', expected_min_suppliers=2, expected_max_suppliers=2) # send again a message to the second group from customer-1 # this should rotate active queue supplier for customer-1 in the second group only diff --git a/regress/tests/stream/conf.json b/regress/tests/stream/conf.json index f4e0cd1..ec48699 100644 --- a/regress/tests/stream/conf.json +++ b/regress/tests/stream/conf.json @@ -90,6 +90,23 @@ "preferred_routers": "" } }, + "supplier-3": { + "links": [ + "dht-2", + "dht-3", + "stun-1", + "id-a" + ], + "ports": "10043:22", + "node": { + "role": "supplier", + "name": "supplier-3", + "join_network": true, + "known_id_servers": "id-a:8084", + "known_dht_seeds": "dht-2:14441,dht-3:14441", + "preferred_routers": "" + } + }, "customer-1": { "links": [ "dht-2", @@ -174,6 +191,7 @@ "stun-1", "supplier-1", "supplier-2", + "supplier-3", "customer-1", "customer-2", "customer-3" diff --git a/regress/tests/stream/docker-compose.yml b/regress/tests/stream/docker-compose.yml index f0ca039..6a3b8c1 100644 --- a/regress/tests/stream/docker-compose.yml +++ b/regress/tests/stream/docker-compose.yml @@ -79,6 +79,16 @@ services: - stun-1 - id-a + supplier-3: + image: bitdust/app + ports: + - "10843:22" + links: + - dht-2 + - dht-3 + - stun-1 + - id-a + customer-1: image: bitdust/app ports: @@ -132,6 +142,7 @@ services: - stun-1 - supplier-1 - supplier-2 + - supplier-3 - customer-1 - customer-2 - customer-3 diff --git a/regress/tests/stream/test_stream.py b/regress/tests/stream/test_stream.py index 5883ba2..3d2182b 100644 --- a/regress/tests/stream/test_stream.py +++ b/regress/tests/stream/test_stream.py @@ -55,10 +55,10 @@ def test_stream(): def prepare(): set_active_scenario('PREPARE') kw.wait_suppliers_connected(scenarios.CUSTOMERS_IDS_123, expected_min_suppliers=2, expected_max_suppliers=2) - kw.wait_service_state(scenarios.SUPPLIERS_IDS_12, 'service_supplier', 'ON') + kw.wait_service_state(scenarios.SUPPLIERS_IDS_123, 'service_supplier', 'ON') kw.wait_service_state(scenarios.CUSTOMERS_IDS_123, 'service_customer', 'ON') kw.wait_service_state(scenarios.CUSTOMERS_IDS_123, 'service_shared_data', 'ON') kw.wait_service_state(scenarios.CUSTOMERS_IDS_123, 'service_private_groups', 'ON') kw.wait_service_state(scenarios.CUSTOMERS_IDS_123, 'service_message_history', 'ON') - kw.wait_service_state(scenarios.SUPPLIERS_IDS_12, 'service_joint_postman', 'ON') - kw.wait_packets_finished(scenarios.CUSTOMERS_IDS_123 + scenarios.SUPPLIERS_IDS_12) + kw.wait_service_state(scenarios.SUPPLIERS_IDS_123, 'service_joint_postman', 'ON') + kw.wait_packets_finished(scenarios.CUSTOMERS_IDS_123 + scenarios.SUPPLIERS_IDS_123) diff --git a/regress/testsupport.py b/regress/testsupport.py index c37cc13..92af97a 100644 --- a/regress/testsupport.py +++ b/regress/testsupport.py @@ -430,8 +430,8 @@ def start_daemon(node, skip_initialize=False, verbose=False): if verbose: dbg('\n' + bitdust_daemon[0].strip()) assert ( - bitdust_daemon[0].strip().startswith('main BitDust process already started') or - bitdust_daemon[0].strip().startswith('new BitDust process will be started in daemon mode') + bitdust_daemon[0].strip().count('main BitDust process already started') or + bitdust_daemon[0].strip().count('new BitDust process will be started in daemon mode') ), bitdust_daemon[0].strip() if verbose: dbg(f'\nstart_daemon [{node}] OK\n') @@ -447,8 +447,8 @@ async def start_daemon_async(node, loop, verbose=False): if verbose: dbg('\n' + bitdust_daemon[0].strip()) assert ( - bitdust_daemon[0].strip().startswith('main BitDust process already started') or - bitdust_daemon[0].strip().startswith('new BitDust process will be started in daemon mode') + bitdust_daemon[0].strip().count('main BitDust process already started') or + bitdust_daemon[0].strip().count('new BitDust process will be started in daemon mode') ), bitdust_daemon[0].strip() if verbose: dbg(f'\nstart_daemon_async [{node}] OK\n') @@ -718,8 +718,8 @@ def stop_daemon(node, skip_checks=False, verbose=False): bitdust_stop = run_ssh_command_and_wait(node, 'bitdust stop', verbose=verbose) if not skip_checks: resp = bitdust_stop[0].strip() - assert ((resp.startswith('BitDust child processes found') and resp.endswith('BitDust stopped')) or - (resp.startswith('found main BitDust process:') and resp.count('finished')) or (resp == 'BitDust is not running at the moment') or (resp == '')) + assert ((resp.count('BitDust child processes found') and resp.count('BitDust stopped')) or + (resp.count('found main BitDust process:') and resp.count('finished')) or resp.count('BitDust is not running at the moment') or (resp == '')) async def stop_daemon_async(node, loop, skip_checks=False, verbose=False): @@ -731,8 +731,8 @@ async def stop_daemon_async(node, loop, skip_checks=False, verbose=False): if verbose: dbg(f'stop_daemon_async [{node}] DONE\n') return - if not ((resp.startswith('BitDust child processes found') and resp.endswith('BitDust stopped')) or - (resp.startswith('found main BitDust process:') and resp.count('finished')) or (resp == 'BitDust is not running at the moment') or (resp == '')): + if not ((resp.count('BitDust child processes found') and resp.count('BitDust stopped')) or + (resp.count('found main BitDust process:') and resp.count('finished')) or resp.count('BitDust is not running at the moment') or (resp == '')): if verbose: warn('process finished with unexpected response: %r' % resp) assert False, resp