diff --git a/docker-stack/golem-reputation-backend/reputation-backend/api/api.py b/docker-stack/golem-reputation-backend/reputation-backend/api/api.py index 6fc178c..01e5ed7 100644 --- a/docker-stack/golem-reputation-backend/reputation-backend/api/api.py +++ b/docker-stack/golem-reputation-backend/reputation-backend/api/api.py @@ -1,5 +1,5 @@ from ninja import NinjaAPI, Path -from .models import DiskBenchmark, CpuBenchmark, MemoryBenchmark, Provider, TaskCompletion, PingResult, NodeStatus, Task, Offer +from .models import DiskBenchmark, CpuBenchmark, MemoryBenchmark, Provider, TaskCompletion, PingResult, NodeStatusHistory, Task, Offer from .schemas import DiskBenchmarkSchema, CpuBenchmarkSchema, MemoryBenchmarkSchema, TaskCompletionSchema, ProviderSuccessRate, TaskCreateSchema, TaskUpdateSchema, ProposalSchema, TaskCostUpdateSchema, BulkTaskCostUpdateSchema, BulkBenchmarkSchema from django.shortcuts import get_object_or_404 from django.http import JsonResponse @@ -11,9 +11,32 @@ from .bulkutils import process_disk_benchmark, process_cpu_benchmark, process_memory_benchmark, process_network_benchmark api = NinjaAPI() import redis +from datetime import datetime, timedelta from .scoring import get_provider_benchmark_scores +from django.db.models.functions import Coalesce +def calculate_uptime(provider_id): + provider = Provider.objects.get(node_id=provider_id) + statuses = NodeStatusHistory.objects.filter(provider=provider).order_by('timestamp') + + online_duration = timedelta(0) + last_online_time = None + + for status in statuses: + if status.is_online: + last_online_time = status.timestamp + elif last_online_time: + online_duration += status.timestamp - last_online_time + last_online_time = None + + # Check if the node is currently online and add the duration + if last_online_time is not None: + online_duration += timezone.now() - last_online_time + + total_duration = timezone.now() - provider.created_at + uptime_percentage = (online_duration.total_seconds() / total_duration.total_seconds()) * 100 + return uptime_percentage @api.get("/providers/scores") @@ -22,28 +45,33 @@ def list_provider_scores(request): providers = Provider.objects.annotate( success_count=Count('taskcompletion', filter=Q(taskcompletion__is_successful=True, taskcompletion__timestamp__gte=ten_days_ago)), total_count=Count('taskcompletion', filter=Q(taskcompletion__timestamp__gte=ten_days_ago)), - ).prefetch_related('nodestatus_set').all() + ).all() response = {"providers": [], "rejected": []} for provider in providers: if provider.total_count > 0: success_ratio = provider.success_count / provider.total_count scores = {} - # scores = get_provider_benchmark_scores(provider, recent_n=3) - scores.update({"successRate": success_ratio}) - uptime = provider.nodestatus_set.first().uptime_percentage if provider.nodestatus_set.exists() else 0 - normalized_uptime = uptime / 100 # Normalize uptime to 0-1 - scores.update({"uptime": normalized_uptime}) + uptime_percentage = calculate_uptime(provider.node_id) + + scores.update({ + "successRate": success_ratio, + "uptime": uptime_percentage / 100 # Normalize uptime to a 0-1 scale, if desired + }) response["providers"].append({ "providerId": provider.node_id, "scores": scores }) - # else: - # response["rejected"].append({ - # "providerId": provider.node_id, - # "reason": "No activities within the last 10 days" - # }) - + providers_with_no_tasks = Provider.objects.filter(taskcompletion__isnull=True) + for provider in providers_with_no_tasks: + uptime_percentage = calculate_uptime(provider.node_id) + response["rejected"].append({ + "providerId": provider.node_id, + "scores": { + "successRate": 0, + "uptime": uptime_percentage / 100 + } + }) return response @@ -210,84 +238,84 @@ def provider_success_rate(request): -@api.get("/provider/{node_id}/scores", response=dict) -def get_provider_scores(request, node_id: str): - try: - provider = get_object_or_404(Provider, node_id=node_id) - disk_benchmarks = ( - DiskBenchmark.objects.filter(provider=provider) - .values("benchmark_name") - .annotate( - avg_read_throughput=Avg("read_throughput_mb_ps"), - avg_write_throughput=Avg("write_throughput_mb_ps"), - latency_95th_percentile=Avg("latency_95th_percentile_ms"), - )[:5] - ) - - memory_benchmarks = ( - MemoryBenchmark.objects.filter(provider=provider) - .values("benchmark_name") - .annotate( - avg_total_data_transferred=Avg("total_data_transferred_mi_b"), - avg_throughput=Avg("throughput_mi_b_sec"), - avg_latency_95th_percentile=Avg("latency_95th_percentile_ms"), - )[:5] - ) - - cpu_benchmarks = ( - CpuBenchmark.objects.filter(provider=provider) - .values("benchmark_name") - .annotate( - avg_events_per_second=Avg("events_per_second"), - total_events=Avg("total_events"), - threads=Avg("threads"), - )[:5] - ) - - avg_ping = ( - PingResult.objects.filter(provider=provider) - .aggregate( - avg_ping_tcp=Avg("ping_tcp"), - avg_ping_udp=Avg("ping_udp") - ) - ) - - uptime_percentage = ( - NodeStatus.objects.filter(provider=provider) - .aggregate(uptime=Avg("uptime_percentage")) - .get("uptime", 0) - ) - task_counts = TaskCompletion.objects.filter(provider=provider).aggregate( - total=Count('id'), - successful=Count('id', filter=Q(is_successful=True))) - - success_rate = (task_counts['successful'] / task_counts['total'] * 100) if task_counts['total'] > 0 else 0 - - disk_scores = [{"benchmark_name": db["benchmark_name"], "avg_read_throughput": db["avg_read_throughput"], - "avg_write_throughput": db["avg_write_throughput"], "latency_95th_percentile": db["latency_95th_percentile"]} - for db in disk_benchmarks] - - memory_scores = [{"benchmark_name": mb["benchmark_name"], "avg_total_data_transferred": mb["avg_total_data_transferred"], - "avg_throughput": mb["avg_throughput"], "avg_latency_95th_percentile": mb["avg_latency_95th_percentile"]} - for mb in memory_benchmarks] - - cpu_scores = [{"benchmark_name": cb["benchmark_name"], "avg_events_per_second": cb["avg_events_per_second"], - "total_events": cb["total_events"], "threads": cb["threads"]} - for cb in cpu_benchmarks] - - scores = { - "disk": disk_scores, - "memory": memory_scores, - "cpu": cpu_scores, - "ping": avg_ping, - "uptime_percentage": uptime_percentage, - "task_success_rate": success_rate - } - - return scores - except Exception as e: - print(e) - return {"status": "error", "message": "Error retrieving provider scores",} +# @api.get("/provider/{node_id}/scores", response=dict) +# def get_provider_scores(request, node_id: str): +# try: +# provider = get_object_or_404(Provider, node_id=node_id) +# disk_benchmarks = ( +# DiskBenchmark.objects.filter(provider=provider) +# .values("benchmark_name") +# .annotate( +# avg_read_throughput=Avg("read_throughput_mb_ps"), +# avg_write_throughput=Avg("write_throughput_mb_ps"), +# latency_95th_percentile=Avg("latency_95th_percentile_ms"), +# )[:5] +# ) + +# memory_benchmarks = ( +# MemoryBenchmark.objects.filter(provider=provider) +# .values("benchmark_name") +# .annotate( +# avg_total_data_transferred=Avg("total_data_transferred_mi_b"), +# avg_throughput=Avg("throughput_mi_b_sec"), +# avg_latency_95th_percentile=Avg("latency_95th_percentile_ms"), +# )[:5] +# ) + +# cpu_benchmarks = ( +# CpuBenchmark.objects.filter(provider=provider) +# .values("benchmark_name") +# .annotate( +# avg_events_per_second=Avg("events_per_second"), +# total_events=Avg("total_events"), +# threads=Avg("threads"), +# )[:5] +# ) + +# avg_ping = ( +# PingResult.objects.filter(provider=provider) +# .aggregate( +# avg_ping_tcp=Avg("ping_tcp"), +# avg_ping_udp=Avg("ping_udp") +# ) +# ) + +# uptime_percentage = ( +# NodeStatus.objects.filter(provider=provider) +# .aggregate(uptime=Avg("uptime_percentage")) +# .get("uptime", 0) +# ) +# task_counts = TaskCompletion.objects.filter(provider=provider).aggregate( +# total=Count('id'), +# successful=Count('id', filter=Q(is_successful=True))) + +# success_rate = (task_counts['successful'] / task_counts['total'] * 100) if task_counts['total'] > 0 else 0 + +# disk_scores = [{"benchmark_name": db["benchmark_name"], "avg_read_throughput": db["avg_read_throughput"], +# "avg_write_throughput": db["avg_write_throughput"], "latency_95th_percentile": db["latency_95th_percentile"]} +# for db in disk_benchmarks] + +# memory_scores = [{"benchmark_name": mb["benchmark_name"], "avg_total_data_transferred": mb["avg_total_data_transferred"], +# "avg_throughput": mb["avg_throughput"], "avg_latency_95th_percentile": mb["avg_latency_95th_percentile"]} +# for mb in memory_benchmarks] + +# cpu_scores = [{"benchmark_name": cb["benchmark_name"], "avg_events_per_second": cb["avg_events_per_second"], +# "total_events": cb["total_events"], "threads": cb["threads"]} +# for cb in cpu_benchmarks] + +# scores = { +# "disk": disk_scores, +# "memory": memory_scores, +# "cpu": cpu_scores, +# "ping": avg_ping, +# "uptime_percentage": uptime_percentage, +# "task_success_rate": success_rate +# } + +# return scores +# except Exception as e: +# print(e) +# return {"status": "error", "message": "Error retrieving provider scores",} from django.utils import timezone diff --git a/docker-stack/golem-reputation-backend/reputation-backend/api/migrations/0030_nodestatushistory_provider_created_at_and_more.py b/docker-stack/golem-reputation-backend/reputation-backend/api/migrations/0030_nodestatushistory_provider_created_at_and_more.py new file mode 100644 index 0000000..4ed513a --- /dev/null +++ b/docker-stack/golem-reputation-backend/reputation-backend/api/migrations/0030_nodestatushistory_provider_created_at_and_more.py @@ -0,0 +1,35 @@ +# Generated by Django 4.1.7 on 2024-02-20 14:22 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('api', '0029_cpubenchmark_created_at_diskbenchmark_created_at_and_more'), + ] + + operations = [ + migrations.CreateModel( + name='NodeStatusHistory', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('is_online', models.BooleanField()), + ('timestamp', models.DateTimeField(auto_now_add=True)), + ], + ), + migrations.AddField( + model_name='provider', + name='created_at', + field=models.DateTimeField(auto_now_add=True, null=True), + ), + migrations.DeleteModel( + name='NodeStatus', + ), + migrations.AddField( + model_name='nodestatushistory', + name='provider', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='api.provider'), + ), + ] diff --git a/docker-stack/golem-reputation-backend/reputation-backend/api/models.py b/docker-stack/golem-reputation-backend/reputation-backend/api/models.py index 91b7e18..83607e6 100644 --- a/docker-stack/golem-reputation-backend/reputation-backend/api/models.py +++ b/docker-stack/golem-reputation-backend/reputation-backend/api/models.py @@ -17,6 +17,7 @@ class Provider(models.Model): storage = models.FloatField(blank=True, null=True) payment_addresses = models.JSONField(default=dict, blank=True, null=True) # JSON object with payment addresses network = models.CharField(max_length=50, default='mainnet') # 'mainnet' or 'testnet' + created_at = models.DateTimeField(auto_now_add=True, null=True) class Offer(models.Model): provider = models.ForeignKey('Provider', on_delete=models.CASCADE) # Link to a Provider @@ -144,52 +145,14 @@ async def async_save(obj): -class NodeStatus(models.Model): - provider = models.ForeignKey(Provider, on_delete=models.CASCADE) # Link to a Provider - is_online = models.BooleanField(default=False) # Whether the node is currently online - last_seen = models.DateTimeField(default=timezone.now) - total_online_scans = models.IntegerField(default=0) - uptime_percentage = models.FloatField(default=0.0) # Default uptime percentage as 0% - first_seen_scan_count = models.IntegerField(null=True) # New field to track the scan count at first seen - - - async def update_status(self, is_online_now, total_scanned_times_overall): - if is_online_now: - self.total_online_scans += 1 - - self.is_online = is_online_now - self.last_seen = timezone.now() - - # Set first_seen_scan_count on first successful scan update if it's None - - # Instead of min() use directly the first_seen_scan_count - # since it should have been set correctly at the node's first appearance. - self.first_seen_scan_count = self.first_seen_scan_count or total_scanned_times_overall - - # Ensure that the total scanned times is at least the same as the first time seen - total_scanned_times_overall = max(self.first_seen_scan_count, total_scanned_times_overall) - - # Calculate effective scans - effective_scans = total_scanned_times_overall - self.first_seen_scan_count + 1 - - # Ensure effective_scans is never zero or negative - effective_scans = max(effective_scans, 1) - - # Calculate the uptime percentage - self.uptime_percentage = (self.total_online_scans / effective_scans) * 100 - - # Ensure the uptime percentage never goes beyond 100% - self.uptime_percentage = min(self.uptime_percentage, 100.0) - - await async_save(self) - - +class NodeStatusHistory(models.Model): + provider = models.ForeignKey(Provider, on_delete=models.CASCADE) + is_online = models.BooleanField() + timestamp = models.DateTimeField(auto_now_add=True) def __str__(self): - status = "Online" if self.is_online else "Offline" - return f"Node {self.provider.node_id} is {status} - Last seen: {self.last_seen}" - + return f"{self.provider.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}" class ScanCount(models.Model): scan_count = models.IntegerField(default=0) diff --git a/docker-stack/golem-reputation-backend/reputation-backend/api/ping.py b/docker-stack/golem-reputation-backend/reputation-backend/api/ping.py index faec478..bc278fc 100644 --- a/docker-stack/golem-reputation-backend/reputation-backend/api/ping.py +++ b/docker-stack/golem-reputation-backend/reputation-backend/api/ping.py @@ -1,7 +1,7 @@ import asyncio import json import subprocess -from .models import NodeStatus, PingResult, PingResultP2P +from .models import NodeStatusHistory, PingResult, PingResultP2P from asgiref.sync import sync_to_async @@ -9,13 +9,20 @@ async def async_fetch_node_ids(): # Define the synchronous part as an inner function def get_node_ids(): - return [provider.provider.node_id for provider in NodeStatus.objects.filter(is_online=True).select_related('provider').only('provider__node_id')] + # Fetch the latest status for each provider and filter those that are online + latest_statuses = NodeStatusHistory.objects.filter( + provider_id__in=NodeStatusHistory.objects.order_by('provider', '-timestamp').distinct('provider').values_list('provider_id', flat=True) + ).order_by('provider', '-timestamp').distinct('provider') + + # Return provider IDs where the latest status is online + return [status.provider.node_id for status in latest_statuses if status.is_online] # Use sync_to_async to convert it and immediately invoke node_ids = await sync_to_async(get_node_ids, thread_sensitive=True)() return node_ids + async def async_bulk_create_ping_results(all_data, p2p): # Define the synchronous part as an inner function def bulk_create(p2p): diff --git a/docker-stack/golem-reputation-backend/reputation-backend/api/scanner.py b/docker-stack/golem-reputation-backend/reputation-backend/api/scanner.py index 3d9fd0f..5eff64b 100644 --- a/docker-stack/golem-reputation-backend/reputation-backend/api/scanner.py +++ b/docker-stack/golem-reputation-backend/reputation-backend/api/scanner.py @@ -7,15 +7,52 @@ import subprocess from datetime import datetime, timedelta from django.utils import timezone -from .models import Provider, NodeStatus, ScanCount, async_get_or_create, async_save +from .models import Provider, NodeStatusHistory, ScanCount, async_get_or_create, async_save from asgiref.sync import sync_to_async from yapapi import props as yp from yapapi.config import ApiConfig from yapapi.log import enable_default_logger from yapapi.props.builder import DemandBuilder from yapapi.rest import Configuration, Market - +from core.celery import app from django.db.models import Q +from django.db.models import Case, When, Value, F +from django.db import transaction + +@app.task() +def update_providers_info(node_props): + node_ids = [prop['node_id'] for prop in node_props] + existing_providers = Provider.objects.filter(node_id__in=node_ids) + existing_providers_dict = {provider.node_id: provider for provider in existing_providers} + + create_batch = [] + + for props in node_props: + prop_data = {key: value for key, value in props.items() if key.startswith("golem.com.payment.platform.") and key.endswith(".address")} + provider_data = { + "payment_addresses": prop_data, + "network": 'testnet' if any(key in TESTNET_KEYS for key in props.keys()) else 'mainnet', + "cores": props.get("golem.inf.cpu.cores"), + "memory": props.get("golem.inf.mem.gib"), + "cpu": props.get("golem.inf.cpu.brand"), + "runtime": props.get("golem.runtime.name"), + "runtime_version": props.get("golem.runtime.version"), + "threads": props.get("golem.inf.cpu.threads"), + "storage": props.get("golem.inf.storage.gib"), + "name": props.get("golem.node.id.name"), + } + + issuer_id = props['node_id'] + if issuer_id in existing_providers_dict: + provider_instance = existing_providers_dict[issuer_id] + for key, value in provider_data.items(): + setattr(provider_instance, key, value) + update_fields_list = [field for field in provider_data.keys() if field != 'node_id'] + provider_instance.save(update_fields=update_fields_list) + else: + create_batch.append(Provider(node_id=issuer_id, **provider_data)) + + Provider.objects.bulk_create(create_batch) TESTNET_KEYS = [ @@ -31,97 +68,72 @@ sys.path.append(str(examples_dir)) from .utils import build_parser, print_env_info, format_usage # noqa: E402 -async def update_db(issuer_id, is_online, scanned_times, props=None): - if props is not None and props.get("golem.runtime.name") == "vm": - # Extracting information from props - payment_addresses = { - key: value for key, value in props.items() - if key.startswith("golem.com.payment.platform.") and key.endswith(".address") - } - network = 'testnet' if any(key in TESTNET_KEYS for key in payment_addresses.keys()) else 'mainnet' - cores = props.get("golem.inf.cpu.cores") - memory = props.get("golem.inf.mem.gib") - cpu = props.get("golem.inf.cpu.brand") - runtime = props.get("golem.runtime.name") - runtime_version = props.get("golem.runtime.version") - threads = props.get("golem.inf.cpu.threads") - storage = props.get("golem.inf.storage.gib") - name = props.get("golem.node.id.name") - # Database operations - provider, created = await async_get_or_create(Provider, node_id=issuer_id) - - - # Update provider properties if it already exists and has changes - updated = False - if not created or provider.payment_addresses != payment_addresses: # If new or there are changes - provider.payment_addresses = payment_addresses - provider.network = network - updated = True - - # Check each field for changes and update if necessary - - if provider.cores != cores: - provider.cores = cores - updated = True - if provider.memory != memory: - provider.memory = memory - updated = True - if provider.cpu != cpu: - provider.cpu = cpu - updated = True - if provider.runtime != runtime: - provider.runtime = runtime - updated = True - if provider.runtime_version != runtime_version: - provider.runtime_version = runtime_version - updated = True - if provider.threads != threads: - provider.threads = threads - updated = True - if provider.storage != storage: - provider.storage = storage - updated = True - if provider.name != name: - provider.name = name - updated = True - - if updated: # If any field was updated, save the changes - await async_save(provider) - - # Now handle the NodeStatus - node_status, _ = await async_get_or_create(NodeStatus, provider=provider, defaults={'first_seen_scan_count': scanned_times}) - await node_status.update_status(is_online_now=is_online, total_scanned_times_overall=scanned_times) - await async_save(node_status) - else: - # When props are not available, only update the online status - provider, _ = await async_get_or_create(Provider, node_id=issuer_id) - node_status, _ = await async_get_or_create(NodeStatus, provider=provider) - - await node_status.update_status(is_online_now=is_online, total_scanned_times_overall=scanned_times) - await async_save(node_status) - - - - -async def check_node_status(issuer_id): +import redis + +def update_nodes_status(provider_id, is_online_now): + provider, created = Provider.objects.get_or_create(node_id=provider_id) + + # Check the last status in the NodeStatusHistory + last_status = NodeStatusHistory.objects.filter(provider=provider).last() + + if not last_status or last_status.is_online != is_online_now: + # Create a new status entry if there's a change in status + NodeStatusHistory.objects.create(provider=provider, is_online=is_online_now) + + + +@app.task(queue='uptime', options={'queue': 'uptime', 'routing_key': 'uptime'}) +def update_nodes_data(nodes_data, scanned_times): + r = redis.Redis(host='redis', port=6379, db=0) + + # Updating nodes based on the current scan + for issuer_id, details in nodes_data.items(): + is_online_now = check_node_status(issuer_id) + print(f"Updating NodeStatus for {issuer_id} with is_online_now={is_online_now}") + try: + update_nodes_status(issuer_id, is_online_now) + r.set(f"provider:{issuer_id}:status", str(is_online_now)) + except Exception as e: + print(f"Error updating NodeStatus for {issuer_id}: {e}") + + # Identifying providers previously online not in the current scan + previously_online_providers_ids = Provider.objects.filter( + nodestatushistory__is_online=True + ).distinct().values_list('node_id', flat=True) + + provider_ids_not_in_scan = set(previously_online_providers_ids) - set(nodes_data.keys()) + + # Verifying and updating status for those previously online but not in the current nodes_data + for issuer_id in provider_ids_not_in_scan: + is_online_now = check_node_status(issuer_id) + print(f"Verifying NodeStatus for {issuer_id} with is_online_now={is_online_now}") + try: + update_nodes_status(issuer_id, is_online_now) + r.set(f"provider:{issuer_id}:status", str(is_online_now)) + except Exception as e: + print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}") + + + +def check_node_status(issuer_id): try: - # Use asyncio.create_subprocess_exec to run the command asynchronously - process = await asyncio.create_subprocess_exec( - "yagna", "net", "find", issuer_id, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + process = subprocess.run( + ["yagna", "net", "find", issuer_id], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5 # 5-second timeout for the subprocess ) - # Wait for the command to complete and capture the output - stdout, stderr = await process.communicate() - # Process finished, return True if it was successful and "seen:" is in the output - return process.returncode == 0 and "seen:" in stdout.decode() - except asyncio.TimeoutError as e: + return process.returncode == 0 and "seen:" in process.stdout.decode() + except subprocess.TimeoutExpired as e: print("Timeout reached while checking node status", e) return False + except Exception as e: + print(f"Unexpected error checking node status: {e}") + return False -async def list_offers(conf: Configuration, subnet_tag: str, nodes_data, scanned_times, current_scan_providers): +async def list_offers(conf: Configuration, subnet_tag: str, nodes_data, scanned_times, current_scan_providers, node_props): async with conf.market() as client: market_api = Market(client) dbuild = DemandBuilder() @@ -134,30 +146,33 @@ async def list_offers(conf: Configuration, subnet_tag: str, nodes_data, scanned_ current_scan_providers.add(event.issuer) if event.issuer not in nodes_data: - nodes_data[event.issuer] = { - "last_seen": datetime.now(), - "is_online": False - } - await update_db(event.issuer, True, scanned_times, event.props) + event.props['node_id'] = event.issuer + node_props.append(event.props) else: - continue + # Check if there is an existing 'wasmtime' entry for the same issuer + existing_entry = next((item for item in node_props if item['node_id'] == event.issuer and item.get("golem.runtime.name") == "wasmtime"), None) + if existing_entry and event.props.get("golem.runtime.name") == "vm": + # Replace the existing 'wasmtime' entry with the 'vm' entry + node_props[node_props.index(existing_entry)] = event.props + async def monitor_nodes_status(subnet_tag: str = "public"): nodes_data = {} - ScanCount.increment() # Increment the scan count - scanned_times = ScanCount.get_current_count() # Load the current scan count - current_scan_providers = set() # Initialize an empty set for the current scan - + node_props = [] + current_scan_providers = set() + scanned_times = ScanCount.get_current_count() + # Call list_offers with a timeout try: await asyncio.wait_for( list_offers( Configuration(api_config=ApiConfig()), subnet_tag=subnet_tag, nodes_data=nodes_data, + node_props=node_props, scanned_times=scanned_times, current_scan_providers=current_scan_providers ), @@ -166,27 +181,8 @@ async def monitor_nodes_status(subnet_tag: str = "public"): except asyncio.TimeoutError: print("Scan timeout reached") - RECENT_TIMEFRAME = timedelta(seconds=30) - - # Get the current time - current_time = timezone.now() - - # Query NodeStatus objects that were last seen within the RECENT_TIMEFRAME and are marked as online - recent_nodes = await sync_to_async(NodeStatus.objects.filter)( - last_seen__lt=current_time - RECENT_TIMEFRAME, - is_online=True - ) - - # Iterate over these nodes and check their current status - for node_status in recent_nodes: - # Ensure the node is not in the current scan data - if node_status.provider.node_id not in nodes_data: - # Perform the online check - is_online = await check_node_status(node_status.provider.node_id) - - if not is_online: - # If the node is not online, update the database - await update_db(node_status.provider.node_id, False, scanned_times) - - + # Delay update_nodes_data call using Celery + update_nodes_data.delay(nodes_data, scanned_times) + update_providers_info.delay(node_props) + \ No newline at end of file diff --git a/docker-stack/golem-reputation-backend/reputation-backend/api/tasks.py b/docker-stack/golem-reputation-backend/reputation-backend/api/tasks.py index 1769dc6..78bd463 100644 --- a/docker-stack/golem-reputation-backend/reputation-backend/api/tasks.py +++ b/docker-stack/golem-reputation-backend/reputation-backend/api/tasks.py @@ -11,6 +11,7 @@ from .models import Task, Provider, Offer redis_client = redis.Redis(host='redis', port=6379, db=0) # Update with your Redis configuration + @app.task def monitor_nodes_task(subnet_tag='public'): # Run the asyncio function using asyncio.run() @@ -22,12 +23,8 @@ def ping_providers_task(p2p): asyncio.run(ping_providers(p2p)) - - @app.task(queue='benchmarker', options={'queue': 'benchmarker', 'routing_key': 'benchmarker'}) def benchmark_providers_task(): - if os.environ.get('BENCHMARK', 'false') != 'true': - return 0 budget_per_provider = os.environ.get('BUDGET_PER_PROVIDER', 0.1) mainnet_provider_count = Provider.objects.filter(network='mainnet').count() print(f"Found {mainnet_provider_count} online providers on the mainnet") diff --git a/docker-stack/golem-reputation-backend/reputation-backend/core/celery.py b/docker-stack/golem-reputation-backend/reputation-backend/core/celery.py index ce74bbc..2869e39 100644 --- a/docker-stack/golem-reputation-backend/reputation-backend/core/celery.py +++ b/docker-stack/golem-reputation-backend/reputation-backend/core/celery.py @@ -18,7 +18,7 @@ def setup_periodic_tasks(sender, **kwargs): from api.tasks import monitor_nodes_task, ping_providers_task, benchmark_providers_task, process_offers_from_redis sender.add_periodic_task( - 40.0, + 60.0, monitor_nodes_task.s(), queue="uptime", options={"queue": "uptime", "routing_key": "uptime"},