Skip to content

Commit

Permalink
various improvements and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vesellov committed Oct 7, 2024
1 parent ce67572 commit aa687c9
Show file tree
Hide file tree
Showing 33 changed files with 371 additions and 300 deletions.
15 changes: 13 additions & 2 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
Change Log
==========

2024-10-07 Veselin Penev [[email protected]](mailto:[email protected])

* Bump cryptography from 42.0.8 to 43.0.1
* prepare to add uniqueness to Request()/Data() packets for index files
* blocking uploading of a file when there is insufficient storage space expected
* populate details about service_proxy_server() in the api.network_status()
* mulitple improvements related to files management (#207)
* solved issue with keys auto-restore from suppliers data after identity recovery
* various improvements in shared_access_coordinator(), restore_worker() and service_message_history()
* bug fixing group message chats and shared files after identity restore



2024-09-14 Veselin Penev [[email protected]](mailto:[email protected])

* changed one of the seed nodes
* block uploading of a file when there is insufficient storage space expected
* populate details about service_proxy_server() inf the api.network_status()
* keep track of files/folders/backups statistics for individual customers in the catalog
* bug fix in storage_contract.py
* IDURL fix in shared_access_coordinator()
Expand Down
2 changes: 2 additions & 0 deletions bitdust/access/group_participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ def _start():
existing_group_participant.automat('init')
existing_group_participant.automat('connect')
started += 1
if _Debug:
lg.args(_DebugLevel, started=started)

def _on_cached(result):
if _Debug:
Expand Down
42 changes: 40 additions & 2 deletions bitdust/access/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from bitdust.system import bpio

from bitdust.contacts import contactsdb
from bitdust.contacts import identitycache

from bitdust.main import settings

Expand All @@ -115,7 +116,7 @@
def init():
if _Debug:
lg.out(_DebugLevel, 'groups.init')
load_groups()
open_known_groups()


def shutdown():
Expand Down Expand Up @@ -201,7 +202,7 @@ def send_group_pub_key_to_suppliers(group_key_id):
d = key_ring.transfer_key(group_key_id, supplier_idurl, include_private=False, include_signature=False)
d.addCallback(lg.cb, debug=_Debug, debug_level=_DebugLevel, method='groups.write_group_key_to_suppliers')
d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='groups.write_group_key_to_suppliers')
# TODO: build some kind of retry mechanism - in case of a particular supplier did not receive the key
# TODO: build some kind of retry mechanism - in case of a particular supplier did not received the key
# it must be some process with each supplier that first verifies a list of my public keys supplier currently possess
# and then transfer the missing keys or send a note to erase "unused" keys to be able to cleanup old keys
l.append(d)
Expand All @@ -211,6 +212,43 @@ def send_group_pub_key_to_suppliers(group_key_id):
#------------------------------------------------------------------------------


def open_known_groups():
to_be_opened = []
to_be_cached = []
for key_id in my_keys.known_keys():
if not key_id.startswith('group_'):
continue
if not my_keys.is_key_private(key_id):
continue
if not my_keys.is_active(key_id):
continue
to_be_opened.append(key_id)
_, customer_idurl = my_keys.split_key_id(key_id)
if not id_url.is_cached(customer_idurl):
to_be_cached.append(customer_idurl)
if _Debug:
lg.args(_DebugLevel, to_be_opened=to_be_opened, to_be_cached=to_be_cached)
if to_be_cached:
d = identitycache.start_multiple(to_be_cached)
d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='groups.open_known_groups')
d.addBoth(lambda _: prepare_known_groups(to_be_opened))
return
prepare_known_groups(to_be_opened)


def prepare_known_groups(to_be_opened):
for group_key_id in to_be_opened:
_, customer_idurl = my_keys.split_key_id(group_key_id)
if not id_url.is_cached(customer_idurl):
lg.err('not able to open group %r, customer IDURL %r still was not cached' % (group_key_id, customer_idurl))
continue
if not is_group_stored(group_key_id):
set_group_info(group_key_id)
set_group_active(group_key_id, bool(my_keys.is_active(group_key_id)))
save_group_info(group_key_id)
load_groups()


def load_groups():
loaded_groups = 0
service_dir = settings.ServiceDir('service_private_groups')
Expand Down
100 changes: 60 additions & 40 deletions bitdust/access/shared_access_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,47 +179,48 @@ def populate_shares():


def open_known_shares():
known_offline_shares = []
to_be_opened = []
to_be_cached = []
for key_id in my_keys.known_keys():
if not key_id.startswith('share_'):
continue
if not my_keys.is_key_private(key_id):
continue
if not my_keys.is_active(key_id):
continue
active_share = get_active_share(key_id)
if active_share:
continue
known_offline_shares.append(key_id)
to_be_opened = []
for customer_idurl in backup_fs.known_customers():
for key_alias in backup_fs.known_keys_aliases(customer_idurl):
if not key_alias.startswith('share_'):
continue
key_id = my_keys.make_key_id(alias=key_alias, creator_idurl=customer_idurl)
if not key_id:
continue
if my_keys.latest_key_id(key_id) != key_id:
continue
if key_id in to_be_opened:
continue
if key_id not in known_offline_shares:
continue
if not my_keys.is_key_private(key_id):
continue
if not my_keys.is_active(key_id):
continue
to_be_opened.append(key_id)
to_be_opened.append(key_id)
_, customer_idurl = my_keys.split_key_id(key_id)
if not id_url.is_cached(customer_idurl):
to_be_cached.append(customer_idurl)
if _Debug:
lg.args(_DebugLevel, known_offline_shares=known_offline_shares, to_be_opened=to_be_opened)
lg.args(_DebugLevel, to_be_opened=to_be_opened, to_be_cached=to_be_cached)
if to_be_cached:
d = identitycache.start_multiple(to_be_cached)
d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='shared_access_coordinator.open_known_shares')
d.addBoth(lambda _: start_known_shares(to_be_opened))
return
start_known_shares(to_be_opened)


def start_known_shares(to_be_opened):
populate_shared_files = listeners.is_populate_required('shared_file')
for key_id in to_be_opened:
active_share = SharedAccessCoordinator(key_id, log_events=True, publish_events=False)
_, customer_idurl = my_keys.split_key_id(key_id)
if not id_url.is_cached(customer_idurl):
lg.err('not able to open share %r, customer IDURL %r still was not cached' % (key_id, customer_idurl))
continue
try:
active_share = SharedAccessCoordinator(key_id, log_events=True, publish_events=False)
except:
lg.exc()
continue
active_share.automat('restart')
if populate_shared_files:
backup_fs.populate_shared_files(key_id=key_id)
# if populate_shared_files:
# listeners.populate_later().remove('shared_file')
if listeners.is_populate_required('shared_location'):
# listeners.populate_later().remove('shared_location')
populate_shares()


Expand Down Expand Up @@ -487,6 +488,7 @@ def __init__(self, key_id, debug_level=_DebugLevel, log_events=_Debug, log_trans
self.customer_idurl = self.glob_id['idurl']
self.known_suppliers_list = []
self.known_ecc_map = None
self.critical_suppliers_number = 1
self.dht_lookup_use_cache = True
self.received_index_file_revision = {}
self.last_time_in_sync = -1
Expand Down Expand Up @@ -594,25 +596,25 @@ def A(self, event, *args, **kwargs):
if event == 'shutdown':
self.state = 'CLOSED'
self.doDestroyMe(*args, **kwargs)
elif event == 'timer-30sec' or event == 'all-suppliers-disconnected':
self.state = 'DISCONNECTED'
self.doReportDisconnected(*args, **kwargs)
elif event == 'list-files-received':
self.doSupplierSendIndexFile(*args, **kwargs)
elif event == 'key-not-registered':
self.doSupplierTransferPubKey(*args, **kwargs)
elif event == 'supplier-connected' or event == 'key-sent':
self.doSupplierRequestIndexFile(*args, **kwargs)
elif event == 'all-suppliers-connected':
self.state = 'CONNECTED'
self.doReportConnected(*args, **kwargs)
elif event == 'index-sent' or event == 'index-up-to-date' or event == 'index-failed' or event == 'list-files-failed' or event == 'supplier-failed':
self.doRemember(event, *args, **kwargs)
self.doCheckAllConnected(*args, **kwargs)
elif event == 'list-files-verified':
self.doSupplierProcessListFiles(*args, **kwargs)
elif event == 'supplier-file-modified' or event == 'index-received' or event == 'index-missing':
self.doSupplierRequestListFiles(event, *args, **kwargs)
elif (event == 'timer-30sec' and not self.isEnoughConnected(*args, **kwargs)) or event == 'all-suppliers-disconnected':
self.state = 'DISCONNECTED'
self.doReportDisconnected(*args, **kwargs)
elif (event == 'timer-30sec' and self.isEnoughConnected(*args, **kwargs)) or event == 'all-suppliers-connected':
self.state = 'CONNECTED'
self.doReportConnected(*args, **kwargs)
#---DISCONNECTED---
elif self.state == 'DISCONNECTED':
if event == 'shutdown':
Expand All @@ -638,6 +640,14 @@ def A(self, event, *args, **kwargs):
pass
return None

def isEnoughConnected(self, *args, **kwargs):
"""
Action method.
"""
if _Debug:
lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=self.critical_suppliers_number)
return len(self.suppliers_succeed) >= self.critical_suppliers_number

def doInit(self, *args, **kwargs):
"""
Action method.
Expand Down Expand Up @@ -666,6 +676,10 @@ def doConnectCustomerSuppliers(self, *args, **kwargs):
self.automat('all-suppliers-disconnected')
return
self.known_ecc_map = args[0].get('ecc_map')
self.critical_suppliers_number = 1
if self.known_ecc_map:
from bitdust.raid import eccmap
self.critical_suppliers_number = eccmap.GetCorrectableErrors(eccmap.GetEccMapSuppliersNumber(self.known_ecc_map))
self.suppliers_in_progress.clear()
self.suppliers_succeed.clear()
for supplier_idurl in self.known_suppliers_list:
Expand Down Expand Up @@ -693,7 +707,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs):
pkt_out = None
if event == 'supplier-file-modified':
remote_path = kwargs['remote_path']
if remote_path == settings.BackupIndexFileName():
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
if self.state == 'CONNECTED':
self.automat('restart')
else:
Expand Down Expand Up @@ -770,7 +784,7 @@ def doSupplierProcessListFiles(self, *args, **kwargs):
is_in_sync = True
if _Debug:
lg.args(_DebugLevel, rev=supplier_index_file_revision, is_in_sync=is_in_sync)
remote_files_changed, backups2remove, paths2remove, missed_backups = backup_matrix.process_raw_list_files(
remote_files_changed, backups2remove, paths2remove, _ = backup_matrix.process_raw_list_files(
supplier_num=kwargs['supplier_pos'],
list_files_text_body=kwargs['payload'],
customer_idurl=self.customer_idurl,
Expand Down Expand Up @@ -810,14 +824,10 @@ def doCheckAllConnected(self, *args, **kwargs):
"""
Action method.
"""
critical_suppliers_number = 1
if self.known_ecc_map:
from bitdust.raid import eccmap
critical_suppliers_number = eccmap.GetCorrectableErrors(eccmap.GetEccMapSuppliersNumber(self.known_ecc_map))
if _Debug:
lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=critical_suppliers_number)
lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=self.critical_suppliers_number)
if len(self.suppliers_in_progress) == 0:
if len(self.suppliers_succeed) >= critical_suppliers_number:
if len(self.suppliers_succeed) >= self.critical_suppliers_number:
self.automat('all-suppliers-connected')
else:
self.automat('all-suppliers-disconnected')
Expand Down Expand Up @@ -880,6 +890,7 @@ def _do_connect_with_supplier(self, supplier_idurl):
def _do_retrieve_index_file(self, supplier_idurl):
packetID = global_id.MakeGlobalID(
key_id=self.key_id,
# path=packetid.MakeIndexFileNamePacketID(),
path=settings.BackupIndexFileName(),
)
sc = supplier_connector.by_idurl(supplier_idurl, customer_idurl=self.customer_idurl)
Expand Down Expand Up @@ -920,6 +931,14 @@ def _do_process_index_file(self, wrapped_packet, supplier_idurl):
lg.err('incoming Data() is not valid from supplier %r' % supplier_idurl)
self.automat('supplier-failed', supplier_idurl=supplier_idurl)
return
if id_url.is_cached(supplier_idurl):
self._do_read_index_file(wrapped_packet, supplier_idurl)
else:
d = identitycache.start_one(supplier_idurl)
d.addErrback(lg.errback, debug=_Debug, debug_level=_DebugLevel, method='shared_access_coordinator._do_process_index_file')
d.addBoth(lambda _: self._do_read_index_file(wrapped_packet, supplier_idurl))

def _do_read_index_file(self, wrapped_packet, supplier_idurl):
supplier_revision = backup_control.IncomingSupplierBackupIndex(
wrapped_packet,
key_id=self.key_id,
Expand All @@ -933,6 +952,7 @@ def _do_process_index_file(self, wrapped_packet, supplier_idurl):
def _do_send_index_file(self, supplier_idurl):
packetID = global_id.MakeGlobalID(
key_id=self.key_id,
# path=packetid.MakeIndexFileNamePacketID(),
path=settings.BackupIndexFileName(),
)
data = bpio.ReadBinaryFile(settings.BackupIndexFilePath(self.customer_idurl, self.key_alias))
Expand Down
Loading

0 comments on commit aa687c9

Please sign in to comment.