Skip to content

Commit

Permalink
Send timestamp with sync data
Browse files Browse the repository at this point in the history
  • Loading branch information
itssimon committed Jan 8, 2025
1 parent fdb9379 commit dea11fc
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
13 changes: 6 additions & 7 deletions apitally/client/client_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
from contextlib import suppress
from functools import partial
from typing import Any, AsyncIterator, Dict, Optional, Tuple, Union
from typing import Any, AsyncIterator, Dict, Optional, Union
from uuid import UUID

import backoff
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(
self.proxy = proxy
self._stop_sync_loop = False
self._sync_loop_task: Optional[asyncio.Task] = None
self._sync_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue()
self._sync_data_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
self._set_startup_data_task: Optional[asyncio.Task] = None

def get_http_client(self) -> httpx.AsyncClient:
Expand Down Expand Up @@ -103,20 +103,19 @@ async def send_startup_data(self, client: httpx.AsyncClient) -> None:

async def send_sync_data(self, client: httpx.AsyncClient) -> None:
data = self.get_sync_data()
self._sync_data_queue.put_nowait((time.time(), data))
self._sync_data_queue.put_nowait(data)

i = 0
while not self._sync_data_queue.empty():
timestamp, data = self._sync_data_queue.get_nowait()
data = self._sync_data_queue.get_nowait()
try:
if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME:
if time.time() - data["timestamp"] <= MAX_QUEUE_TIME:
if i > 0:
await asyncio.sleep(random.uniform(0.1, 0.3))
data["time_offset"] = time_offset
await self._send_sync_data(client, data)
i += 1
except httpx.HTTPError:
self._sync_data_queue.put_nowait((timestamp, data))
self._sync_data_queue.put_nowait(data)
break
finally:
self._sync_data_queue.task_done()
Expand Down
1 change: 1 addition & 0 deletions apitally/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def add_uuids_to_data(self, data: Dict[str, Any]) -> Dict[str, Any]:

def get_sync_data(self) -> Dict[str, Any]:
data = {
"timestamp": time.time(),
"requests": self.request_counter.get_and_reset_requests(),
"validation_errors": self.validation_error_counter.get_and_reset_validation_errors(),
"server_errors": self.server_error_counter.get_and_reset_server_errors(),
Expand Down
13 changes: 6 additions & 7 deletions apitally/client/client_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from io import BufferedReader
from queue import Queue
from threading import Event, Thread
from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Dict, Optional
from uuid import UUID

import backoff
Expand Down Expand Up @@ -58,7 +58,7 @@ def __init__(
self.proxies = {"https": proxy} if proxy else None
self._thread: Optional[Thread] = None
self._stop_sync_loop = Event()
self._sync_data_queue: Queue[Tuple[float, Dict[str, Any]]] = Queue()
self._sync_data_queue: Queue[Dict[str, Any]] = Queue()

def start_sync_loop(self) -> None:
self._stop_sync_loop.clear()
Expand Down Expand Up @@ -114,20 +114,19 @@ def send_startup_data(self, session: requests.Session) -> None:

def send_sync_data(self, session: requests.Session) -> None:
data = self.get_sync_data()
self._sync_data_queue.put_nowait((time.time(), data))
self._sync_data_queue.put_nowait(data)

i = 0
while not self._sync_data_queue.empty():
timestamp, data = self._sync_data_queue.get_nowait()
data = self._sync_data_queue.get_nowait()
try:
if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME:
if time.time() - data["timestamp"] <= MAX_QUEUE_TIME:
if i > 0:
time.sleep(random.uniform(0.1, 0.3))
data["time_offset"] = time_offset
self._send_sync_data(session, data)
i += 1
except requests.RequestException:
self._sync_data_queue.put_nowait((timestamp, data))
self._sync_data_queue.put_nowait(data)
break
finally:
self._sync_data_queue.task_done()
Expand Down

0 comments on commit dea11fc

Please sign in to comment.