Skip to content

Commit

Permalink
various changes and improvements in the files stats counting (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
vesellov authored Sep 14, 2024
1 parent b00ab87 commit ce67572
Show file tree
Hide file tree
Showing 19 changed files with 341 additions and 241 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Change Log
==========

2024-09-14 Veselin Penev [[email protected]](mailto:[email protected])

* changed one of the seed nodes
* block uploading of a file when there is insufficient storage space expected
* populate details about service_proxy_server() inf the api.network_status()
* keep track of files/folders/backups statistics for individual customers in the catalog
* bug fix in storage_contract.py
* IDURL fix in shared_access_coordinator()



2024-09-04 Veselin Penev [[email protected]](mailto:[email protected])

Expand Down
7 changes: 5 additions & 2 deletions bitdust/access/key_ring.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,11 @@ def on_key_received(newpacket, info, status, error_message):
# but we already have a key with that ID
if my_keys.is_key_private(key_id):
# we should not overwrite existing private key
# TODO: check other scenarios
raise Exception('private key already registered with %r' % key_id)
if my_keys.get_public_key_raw(key_id) != key_object.toPublicString():
raise Exception('private key already registered : %r' % key_id)
lg.warn('received public key, but matching private key is already registered %r' % key_id)
p2p_service.SendAck(newpacket)
return True
if my_keys.get_public_key_raw(key_id) != key_object.toPublicString():
my_keys.erase_key(key_id)
if not my_keys.register_key(key_id, key_object, label=key_label):
Expand Down
8 changes: 4 additions & 4 deletions bitdust/access/shared_access_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def doConnectCustomerSuppliers(self, *args, **kwargs):
self.suppliers_in_progress.clear()
self.suppliers_succeed.clear()
for supplier_idurl in self.known_suppliers_list:
self.suppliers_in_progress.append(supplier_idurl)
self.suppliers_in_progress.append(id_url.field(supplier_idurl))
if id_url.is_cached(supplier_idurl):
self._do_connect_with_supplier(supplier_idurl)
else:
Expand All @@ -689,7 +689,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs):
"""
Action method.
"""
supplier_idurl = kwargs['supplier_idurl']
supplier_idurl = id_url.field(kwargs['supplier_idurl'])
pkt_out = None
if event == 'supplier-file-modified':
remote_path = kwargs['remote_path']
Expand Down Expand Up @@ -797,8 +797,8 @@ def doRemember(self, event, *args, **kwargs):
"""
Action method.
"""
supplier_idurl = kwargs['supplier_idurl']
if supplier_idurl in self.suppliers_in_progress:
supplier_idurl = id_url.field(kwargs['supplier_idurl'])
if id_url.is_in(supplier_idurl, self.suppliers_in_progress):
self.suppliers_in_progress.remove(supplier_idurl)
if event in ['index-sent', 'index-up-to-date']:
if supplier_idurl not in self.suppliers_succeed:
Expand Down
2 changes: 1 addition & 1 deletion bitdust/crypt/encrypted.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def Unserialize(data, decrypt_key=None):
exc,
list(dct.keys()),
(dct.get('c'), dct.get('b'), dct.get('i'), dct.get('r')),
traceback.format_tb(),
'\n'.join(traceback.format_stack()),
))
if _Debug:
lg.out(_DebugLevel, repr(dct))
Expand Down
71 changes: 35 additions & 36 deletions bitdust/interface/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,14 @@ def process_info():
}
if driver.is_on('service_backup_db'):
from bitdust.storage import backup_fs
v = backup_fs.total_stats()
result['file'] = {
'items': backup_fs.counter(),
'files': backup_fs.numberfiles(),
'files_size': backup_fs.sizefiles(),
'folders': backup_fs.numberfolders(),
'folders_size': backup_fs.sizefolders(),
'backups_size': backup_fs.sizebackups(),
'items': v['items'],
'files': v['files'],
'folders': v['folders'],
'files_size': v['size_files'],
'folders_size': v['size_folders'],
'backups_size': v['size_backups'],
'customers': len(backup_fs.known_customers()),
}
if driver.is_on('service_shared_data'):
Expand Down Expand Up @@ -693,6 +694,7 @@ def network_select(name):

def _on_network_disconnected(x):
cur_base_dir = deploy.current_base_dir()
# TODO: must wait shutdown and init to complete with defered
shutdowner.shutdown_services()
shutdowner.shutdown_local()
shutdowner.shutdown_automats()
Expand Down Expand Up @@ -969,6 +971,20 @@ def network_status(suppliers=False, customers=False, cache=False, tcp=False, udp
i['queue'] = len(s.pending_packets)
sessions.append(i)
r['proxy']['sessions'] = sessions
if driver.is_on('service_proxy_server'):
from bitdust.transport.proxy import proxy_router
if proxy_router.A():
r['proxy']['routes'] = []
for v in proxy_router.A().routes.values():
_r = v['connection_info'].copy()
_r['contacts'] = ', '.join(['{}:{}'.format(c[0], c[1]) for c in v['contacts']])
_r['address'] = ', '.join(['{}:{}'.format(a[0], a[1]) for a in v['address']])
_r.pop('id', None)
_r.pop('index', None)
r['proxy']['routes'].append(_r)
r['proxy']['closed_routes'] = [(strng.to_text(k), strng.to_text(v)) for k, v in proxy_router.A().closed_routes.items()]
r['proxy']['acks'] = len(proxy_router.A().acks)
r['proxy']['hosts'] = ', '.join([('{}://{}:{}'.format(strng.to_text(k), strng.to_text(v[0]), strng.to_text(v[1]))) for k, v in proxy_router.A().my_hosts.items()])
if dht:
from bitdust.dht import dht_service
r['dht'] = {}
Expand Down Expand Up @@ -2292,7 +2308,7 @@ def file_delete(remote_path):
backup_fs.DeleteLocalDir(settings.getLocalBackupsDir(), pathIDfull)
backup_fs.DeleteByID(pathID, iter=backup_fs.fs(customer_idurl, key_alias), iterID=backup_fs.fsID(customer_idurl, key_alias))
backup_fs.Scan(customer_idurl=customer_idurl, key_alias=key_alias)
backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias))
backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias)
if key_alias != 'master':
if driver.is_on('service_shared_data'):
from bitdust.access import shared_access_coordinator
Expand Down Expand Up @@ -2428,33 +2444,17 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events
if not pathID:
return ERROR('path %s was not registered yet' % remote_path)
keyID = my_keys.make_key_id(alias=key_alias, creator_glob_id=parts['customer'])
# customerID = global_id.MakeGlobalID(customer=parts['customer'], key_alias=key_alias)
pathIDfull = packetid.MakeBackupID(keyID, pathID)
if key_alias != 'master':
if not driver.is_on('service_shared_data'):
return ERROR('service_shared_data() is not started')


# def _restart_active_share(result):
# if _Debug:
# lg.args(_DebugLevel, result=result, key_id=keyID, path=path, pathID=pathID)
# if key_alias != 'master':
# from bitdust.access import shared_access_coordinator
# active_share = shared_access_coordinator.get_active_share(keyID)
# if not active_share:
# active_share = shared_access_coordinator.SharedAccessCoordinator(key_id=keyID, publish_events=publish_events)
# active_share.automat('restart')
# return result

if wait_result:
task_created_defer = Deferred()
tsk = backup_control.StartSingle(
pathID=pathIDfull,
localPath=local_path,
keyID=keyID,
)
# if key_alias != 'master':
# tsk.result_defer.addCallback(_restart_active_share)
tsk.result_defer.addCallback(
lambda result: task_created_defer.callback(
OK(
Expand All @@ -2470,15 +2470,16 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events
)
)
)
tsk.result_defer.addErrback(lambda result: task_created_defer.callback(ERROR(
'uploading task %d for %s failed: %s' % (
tsk.number,
tsk.pathID,
result[1],
),
api_method='file_upload_start',
), ), )
backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias))
tsk.result_defer.addErrback(lambda result: task_created_defer.callback(ERROR(result, api_method='file_upload_start')))
# tsk.result_defer.addErrback(lambda result: task_created_defer.callback(ERROR(
# 'uploading task %d for %s failed: %s' % (
# tsk.number,
# tsk.pathID,
# result,
# ),
# api_method='file_upload_start',
# ), ), )
backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias)
backup_control.SaveFSIndex(customer_idurl, key_alias)
if _Debug:
lg.out(_DebugLevel, 'api.file_upload_start %s with %s, wait_result=True' % (remote_path, pathIDfull))
Expand All @@ -2489,10 +2490,8 @@ def file_upload_start(local_path, remote_path, wait_result=False, publish_events
localPath=local_path,
keyID=keyID,
)
# if key_alias != 'master':
# tsk.result_defer.addCallback(_restart_active_share)
tsk.result_defer.addErrback(lambda result: lg.err('errback from api.file_upload_start.task(%s) failed with %s' % (result[0], result[1])))
backup_fs.Calculate(iterID=backup_fs.fsID(customer_idurl, key_alias))
tsk.result_defer.addErrback(lambda result: lg.err('errback from api.file_upload_start.task() failed with %r' % result))
backup_fs.Calculate(customer_idurl=customer_idurl, key_alias=key_alias)
backup_control.SaveFSIndex(customer_idurl, key_alias)
if _Debug:
lg.out(_DebugLevel, 'api.file_upload_start %s with %s' % (remote_path, pathIDfull))
Expand Down
92 changes: 92 additions & 0 deletions bitdust/p2p/p2p_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@

#------------------------------------------------------------------------------

from twisted.internet.defer import Deferred

#------------------------------------------------------------------------------

from bitdust.logs import lg

from bitdust.contacts import contactsdb
Expand All @@ -66,8 +70,11 @@
from bitdust.crypt import signed
from bitdust.crypt import my_keys

from bitdust.services import driver

from bitdust.transport import gateway
from bitdust.transport import callback
from bitdust.transport import packet_out

from bitdust.userid import my_id

Expand Down Expand Up @@ -195,6 +202,91 @@ def outbox(outpacket, wide, callbacks, target=None, route=None, response_timeout
#------------------------------------------------------------------------------


def on_request_service_received(newpacket, info):
if len(newpacket.Payload) > 1024*10:
lg.warn('too long payload')
SendFail(newpacket, 'too long payload')
return True
try:
json_payload = serialization.BytesToDict(newpacket.Payload, keys_to_text=True, values_to_text=True)
json_payload['name']
json_payload['payload']
except:
lg.warn('json payload invalid')
SendFail(newpacket, 'json payload invalid')
return True
service_name = str(json_payload['name'])
if _Debug:
lg.out(_Debug, 'service_p2p_hookups.RequestService {%s} from %s' % (service_name, newpacket.OwnerID))
if not driver.is_exist(service_name):
lg.warn('got wrong payload in %s' % service_name)
SendFail(newpacket, 'service %s not exist' % service_name)
return True
if not driver.is_on(service_name):
SendFail(newpacket, 'service %s is off' % service_name)
return True
try:
result = driver.request(service_name, json_payload['payload'], newpacket, info)
except:
lg.exc()
SendFail(newpacket, 'request processing failed with exception')
return True
if not result:
if _Debug:
lg.out(_Debug, 'service_p2p_hookups._send_request_service SKIP request %s' % service_name)
return False
if isinstance(result, Deferred):
if _Debug:
lg.out(_Debug, 'service_p2p_hookups._send_request_service fired delayed execution')
elif isinstance(result, packet_out.PacketOut):
if _Debug:
lg.out(_Debug, 'service_p2p_hookups._send_request_service outbox packet sent')
return True


def on_cancel_service_received(newpacket, info):
if len(newpacket.Payload) > 1024*10:
SendFail(newpacket, 'too long payload')
return True
try:
json_payload = serialization.BytesToDict(newpacket.Payload, keys_to_text=True, values_to_text=True)
json_payload['name']
json_payload['payload']
except:
SendFail(newpacket, 'json payload invalid')
return True
service_name = json_payload['name']
if _Debug:
lg.out(_Debug, 'service_p2p_hookups.CancelService {%s} from %s' % (service_name, newpacket.OwnerID))
if not driver.is_exist(service_name):
lg.warn('got wrong payload in %s' % newpacket)
SendFail(newpacket, 'service %s not exist' % service_name)
return True
if not driver.is_on(service_name):
SendFail(newpacket, 'service %s is off' % service_name)
return True
try:
result = driver.cancel(service_name, json_payload['payload'], newpacket, info)
except:
lg.exc()
SendFail(newpacket, 'request processing failed with exception')
return True
if not result:
if _Debug:
lg.out(_Debug, 'service_p2p_hookups._send_cancel_service SKIP request %s' % service_name)
return False
if isinstance(result, Deferred):
if _Debug:
lg.out(_Debug, 'service_p2p_hookups._send_cancel_service fired delayed execution')
elif isinstance(result, packet_out.PacketOut):
if _Debug:
lg.out(_Debug, 'service_p2p_hookups._send_cancel_service outbox packet sent')
return True


#------------------------------------------------------------------------------


def Ack(newpacket, info):
if _Debug:
lg.out(_DebugLevel, 'p2p_service.Ack %s from [%s] at %s://%s with %d bytes payload' % (newpacket.PacketID, nameurl.GetName(newpacket.CreatorID), info.proto, info.host, len(newpacket.Payload)))
Expand Down
2 changes: 1 addition & 1 deletion bitdust/services/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def _on_started(start_result, stop_result, dependencies_results):

def _on_failed(err):
lg.warn('failed service %s in driver.restart() : %r' % (service_name, err))
restart_result.errback(str(err))
restart_result.errback(Exception(str(err)))
return None

def _do_start(stop_result=None, dependencies_results=None):
Expand Down
Loading

0 comments on commit ce67572

Please sign in to comment.