Skip to content

Commit

Permalink
Reworked uptime scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Feb 20, 2024
1 parent 1d56944 commit 40f9ee9
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 260 deletions.
210 changes: 119 additions & 91 deletions docker-stack/golem-reputation-backend/reputation-backend/api/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ninja import NinjaAPI, Path
from .models import DiskBenchmark, CpuBenchmark, MemoryBenchmark, Provider, TaskCompletion, PingResult, NodeStatus, Task, Offer
from .models import DiskBenchmark, CpuBenchmark, MemoryBenchmark, Provider, TaskCompletion, PingResult, NodeStatusHistory, Task, Offer
from .schemas import DiskBenchmarkSchema, CpuBenchmarkSchema, MemoryBenchmarkSchema, TaskCompletionSchema, ProviderSuccessRate, TaskCreateSchema, TaskUpdateSchema, ProposalSchema, TaskCostUpdateSchema, BulkTaskCostUpdateSchema, BulkBenchmarkSchema
from django.shortcuts import get_object_or_404
from django.http import JsonResponse
Expand All @@ -11,9 +11,32 @@
from .bulkutils import process_disk_benchmark, process_cpu_benchmark, process_memory_benchmark, process_network_benchmark
api = NinjaAPI()
import redis
from datetime import datetime, timedelta


from .scoring import get_provider_benchmark_scores
from django.db.models.functions import Coalesce
def calculate_uptime(provider_id):
provider = Provider.objects.get(node_id=provider_id)
statuses = NodeStatusHistory.objects.filter(provider=provider).order_by('timestamp')

online_duration = timedelta(0)
last_online_time = None

for status in statuses:
if status.is_online:
last_online_time = status.timestamp
elif last_online_time:
online_duration += status.timestamp - last_online_time
last_online_time = None

# Check if the node is currently online and add the duration
if last_online_time is not None:
online_duration += timezone.now() - last_online_time

total_duration = timezone.now() - provider.created_at
uptime_percentage = (online_duration.total_seconds() / total_duration.total_seconds()) * 100
return uptime_percentage


@api.get("/providers/scores")
Expand All @@ -22,28 +45,33 @@ def list_provider_scores(request):
providers = Provider.objects.annotate(
success_count=Count('taskcompletion', filter=Q(taskcompletion__is_successful=True, taskcompletion__timestamp__gte=ten_days_ago)),
total_count=Count('taskcompletion', filter=Q(taskcompletion__timestamp__gte=ten_days_ago)),
).prefetch_related('nodestatus_set').all()
).all()

response = {"providers": [], "rejected": []}
for provider in providers:
if provider.total_count > 0:
success_ratio = provider.success_count / provider.total_count
scores = {}
# scores = get_provider_benchmark_scores(provider, recent_n=3)
scores.update({"successRate": success_ratio})
uptime = provider.nodestatus_set.first().uptime_percentage if provider.nodestatus_set.exists() else 0
normalized_uptime = uptime / 100 # Normalize uptime to 0-1
scores.update({"uptime": normalized_uptime})
uptime_percentage = calculate_uptime(provider.node_id)

scores.update({
"successRate": success_ratio,
"uptime": uptime_percentage / 100 # Normalize uptime to a 0-1 scale, if desired
})
response["providers"].append({
"providerId": provider.node_id,
"scores": scores
})
# else:
# response["rejected"].append({
# "providerId": provider.node_id,
# "reason": "No activities within the last 10 days"
# })

providers_with_no_tasks = Provider.objects.filter(taskcompletion__isnull=True)
for provider in providers_with_no_tasks:
uptime_percentage = calculate_uptime(provider.node_id)
response["rejected"].append({
"providerId": provider.node_id,
"scores": {
"successRate": 0,
"uptime": uptime_percentage / 100
}
})
return response


Expand Down Expand Up @@ -210,84 +238,84 @@ def provider_success_rate(request):



@api.get("/provider/{node_id}/scores", response=dict)
def get_provider_scores(request, node_id: str):
try:
provider = get_object_or_404(Provider, node_id=node_id)
disk_benchmarks = (
DiskBenchmark.objects.filter(provider=provider)
.values("benchmark_name")
.annotate(
avg_read_throughput=Avg("read_throughput_mb_ps"),
avg_write_throughput=Avg("write_throughput_mb_ps"),
latency_95th_percentile=Avg("latency_95th_percentile_ms"),
)[:5]
)

memory_benchmarks = (
MemoryBenchmark.objects.filter(provider=provider)
.values("benchmark_name")
.annotate(
avg_total_data_transferred=Avg("total_data_transferred_mi_b"),
avg_throughput=Avg("throughput_mi_b_sec"),
avg_latency_95th_percentile=Avg("latency_95th_percentile_ms"),
)[:5]
)

cpu_benchmarks = (
CpuBenchmark.objects.filter(provider=provider)
.values("benchmark_name")
.annotate(
avg_events_per_second=Avg("events_per_second"),
total_events=Avg("total_events"),
threads=Avg("threads"),
)[:5]
)

avg_ping = (
PingResult.objects.filter(provider=provider)
.aggregate(
avg_ping_tcp=Avg("ping_tcp"),
avg_ping_udp=Avg("ping_udp")
)
)

uptime_percentage = (
NodeStatus.objects.filter(provider=provider)
.aggregate(uptime=Avg("uptime_percentage"))
.get("uptime", 0)
)
task_counts = TaskCompletion.objects.filter(provider=provider).aggregate(
total=Count('id'),
successful=Count('id', filter=Q(is_successful=True)))

success_rate = (task_counts['successful'] / task_counts['total'] * 100) if task_counts['total'] > 0 else 0

disk_scores = [{"benchmark_name": db["benchmark_name"], "avg_read_throughput": db["avg_read_throughput"],
"avg_write_throughput": db["avg_write_throughput"], "latency_95th_percentile": db["latency_95th_percentile"]}
for db in disk_benchmarks]

memory_scores = [{"benchmark_name": mb["benchmark_name"], "avg_total_data_transferred": mb["avg_total_data_transferred"],
"avg_throughput": mb["avg_throughput"], "avg_latency_95th_percentile": mb["avg_latency_95th_percentile"]}
for mb in memory_benchmarks]

cpu_scores = [{"benchmark_name": cb["benchmark_name"], "avg_events_per_second": cb["avg_events_per_second"],
"total_events": cb["total_events"], "threads": cb["threads"]}
for cb in cpu_benchmarks]

scores = {
"disk": disk_scores,
"memory": memory_scores,
"cpu": cpu_scores,
"ping": avg_ping,
"uptime_percentage": uptime_percentage,
"task_success_rate": success_rate
}

return scores
except Exception as e:
print(e)
return {"status": "error", "message": "Error retrieving provider scores",}
# @api.get("/provider/{node_id}/scores", response=dict)
# def get_provider_scores(request, node_id: str):
# try:
# provider = get_object_or_404(Provider, node_id=node_id)
# disk_benchmarks = (
# DiskBenchmark.objects.filter(provider=provider)
# .values("benchmark_name")
# .annotate(
# avg_read_throughput=Avg("read_throughput_mb_ps"),
# avg_write_throughput=Avg("write_throughput_mb_ps"),
# latency_95th_percentile=Avg("latency_95th_percentile_ms"),
# )[:5]
# )

# memory_benchmarks = (
# MemoryBenchmark.objects.filter(provider=provider)
# .values("benchmark_name")
# .annotate(
# avg_total_data_transferred=Avg("total_data_transferred_mi_b"),
# avg_throughput=Avg("throughput_mi_b_sec"),
# avg_latency_95th_percentile=Avg("latency_95th_percentile_ms"),
# )[:5]
# )

# cpu_benchmarks = (
# CpuBenchmark.objects.filter(provider=provider)
# .values("benchmark_name")
# .annotate(
# avg_events_per_second=Avg("events_per_second"),
# total_events=Avg("total_events"),
# threads=Avg("threads"),
# )[:5]
# )

# avg_ping = (
# PingResult.objects.filter(provider=provider)
# .aggregate(
# avg_ping_tcp=Avg("ping_tcp"),
# avg_ping_udp=Avg("ping_udp")
# )
# )

# uptime_percentage = (
# NodeStatus.objects.filter(provider=provider)
# .aggregate(uptime=Avg("uptime_percentage"))
# .get("uptime", 0)
# )
# task_counts = TaskCompletion.objects.filter(provider=provider).aggregate(
# total=Count('id'),
# successful=Count('id', filter=Q(is_successful=True)))

# success_rate = (task_counts['successful'] / task_counts['total'] * 100) if task_counts['total'] > 0 else 0

# disk_scores = [{"benchmark_name": db["benchmark_name"], "avg_read_throughput": db["avg_read_throughput"],
# "avg_write_throughput": db["avg_write_throughput"], "latency_95th_percentile": db["latency_95th_percentile"]}
# for db in disk_benchmarks]

# memory_scores = [{"benchmark_name": mb["benchmark_name"], "avg_total_data_transferred": mb["avg_total_data_transferred"],
# "avg_throughput": mb["avg_throughput"], "avg_latency_95th_percentile": mb["avg_latency_95th_percentile"]}
# for mb in memory_benchmarks]

# cpu_scores = [{"benchmark_name": cb["benchmark_name"], "avg_events_per_second": cb["avg_events_per_second"],
# "total_events": cb["total_events"], "threads": cb["threads"]}
# for cb in cpu_benchmarks]

# scores = {
# "disk": disk_scores,
# "memory": memory_scores,
# "cpu": cpu_scores,
# "ping": avg_ping,
# "uptime_percentage": uptime_percentage,
# "task_success_rate": success_rate
# }

# return scores
# except Exception as e:
# print(e)
# return {"status": "error", "message": "Error retrieving provider scores",}


from django.utils import timezone
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 4.1.7 on 2024-02-20 14:22

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('api', '0029_cpubenchmark_created_at_diskbenchmark_created_at_and_more'),
]

operations = [
migrations.CreateModel(
name='NodeStatusHistory',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('is_online', models.BooleanField()),
('timestamp', models.DateTimeField(auto_now_add=True)),
],
),
migrations.AddField(
model_name='provider',
name='created_at',
field=models.DateTimeField(auto_now_add=True, null=True),
),
migrations.DeleteModel(
name='NodeStatus',
),
migrations.AddField(
model_name='nodestatushistory',
name='provider',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='api.provider'),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Provider(models.Model):
storage = models.FloatField(blank=True, null=True)
payment_addresses = models.JSONField(default=dict, blank=True, null=True) # JSON object with payment addresses
network = models.CharField(max_length=50, default='mainnet') # 'mainnet' or 'testnet'
created_at = models.DateTimeField(auto_now_add=True, null=True)

class Offer(models.Model):
provider = models.ForeignKey('Provider', on_delete=models.CASCADE) # Link to a Provider
Expand Down Expand Up @@ -144,52 +145,14 @@ async def async_save(obj):



class NodeStatus(models.Model):
provider = models.ForeignKey(Provider, on_delete=models.CASCADE) # Link to a Provider
is_online = models.BooleanField(default=False) # Whether the node is currently online
last_seen = models.DateTimeField(default=timezone.now)
total_online_scans = models.IntegerField(default=0)
uptime_percentage = models.FloatField(default=0.0) # Default uptime percentage as 0%
first_seen_scan_count = models.IntegerField(null=True) # New field to track the scan count at first seen


async def update_status(self, is_online_now, total_scanned_times_overall):
if is_online_now:
self.total_online_scans += 1

self.is_online = is_online_now
self.last_seen = timezone.now()

# Set first_seen_scan_count on first successful scan update if it's None

# Instead of min() use directly the first_seen_scan_count
# since it should have been set correctly at the node's first appearance.
self.first_seen_scan_count = self.first_seen_scan_count or total_scanned_times_overall

# Ensure that the total scanned times is at least the same as the first time seen
total_scanned_times_overall = max(self.first_seen_scan_count, total_scanned_times_overall)

# Calculate effective scans
effective_scans = total_scanned_times_overall - self.first_seen_scan_count + 1

# Ensure effective_scans is never zero or negative
effective_scans = max(effective_scans, 1)

# Calculate the uptime percentage
self.uptime_percentage = (self.total_online_scans / effective_scans) * 100

# Ensure the uptime percentage never goes beyond 100%
self.uptime_percentage = min(self.uptime_percentage, 100.0)

await async_save(self)



class NodeStatusHistory(models.Model):
provider = models.ForeignKey(Provider, on_delete=models.CASCADE)
is_online = models.BooleanField()
timestamp = models.DateTimeField(auto_now_add=True)

def __str__(self):
status = "Online" if self.is_online else "Offline"
return f"Node {self.provider.node_id} is {status} - Last seen: {self.last_seen}"

return f"{self.provider.node_id} - {'Online' if self.is_online else 'Offline'} at {self.timestamp}"

class ScanCount(models.Model):
scan_count = models.IntegerField(default=0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
import asyncio
import json
import subprocess
from .models import NodeStatus, PingResult, PingResultP2P
from .models import NodeStatusHistory, PingResult, PingResultP2P
from asgiref.sync import sync_to_async



async def async_fetch_node_ids():
# Define the synchronous part as an inner function
def get_node_ids():
return [provider.provider.node_id for provider in NodeStatus.objects.filter(is_online=True).select_related('provider').only('provider__node_id')]
# Fetch the latest status for each provider and filter those that are online
latest_statuses = NodeStatusHistory.objects.filter(
provider_id__in=NodeStatusHistory.objects.order_by('provider', '-timestamp').distinct('provider').values_list('provider_id', flat=True)
).order_by('provider', '-timestamp').distinct('provider')

# Return provider IDs where the latest status is online
return [status.provider.node_id for status in latest_statuses if status.is_online]

# Use sync_to_async to convert it and immediately invoke
node_ids = await sync_to_async(get_node_ids, thread_sensitive=True)()
return node_ids



async def async_bulk_create_ping_results(all_data, p2p):
# Define the synchronous part as an inner function
def bulk_create(p2p):
Expand Down
Loading

0 comments on commit 40f9ee9

Please sign in to comment.