Skip to content

Commit

Permalink
Send timestamp with sync data (#83)
Browse files Browse the repository at this point in the history
* Send timestamp with sync data

* Increase random delay
  • Loading branch information
itssimon authored Jan 8, 2025
1 parent fdb9379 commit 60b971a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
15 changes: 7 additions & 8 deletions apitally/client/client_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
from contextlib import suppress
from functools import partial
from typing import Any, AsyncIterator, Dict, Optional, Tuple, Union
from typing import Any, AsyncIterator, Dict, Optional, Union
from uuid import UUID

import backoff
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(
self.proxy = proxy
self._stop_sync_loop = False
self._sync_loop_task: Optional[asyncio.Task] = None
self._sync_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue()
self._sync_data_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
self._set_startup_data_task: Optional[asyncio.Task] = None

def get_http_client(self) -> httpx.AsyncClient:
Expand Down Expand Up @@ -103,20 +103,19 @@ async def send_startup_data(self, client: httpx.AsyncClient) -> None:

async def send_sync_data(self, client: httpx.AsyncClient) -> None:
data = self.get_sync_data()
self._sync_data_queue.put_nowait((time.time(), data))
self._sync_data_queue.put_nowait(data)

i = 0
while not self._sync_data_queue.empty():
timestamp, data = self._sync_data_queue.get_nowait()
data = self._sync_data_queue.get_nowait()
try:
if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME:
if time.time() - data["timestamp"] <= MAX_QUEUE_TIME:
if i > 0:
await asyncio.sleep(random.uniform(0.1, 0.3))
data["time_offset"] = time_offset
await asyncio.sleep(random.uniform(0.1, 0.5))
await self._send_sync_data(client, data)
i += 1
except httpx.HTTPError:
self._sync_data_queue.put_nowait((timestamp, data))
self._sync_data_queue.put_nowait(data)
break
finally:
self._sync_data_queue.task_done()
Expand Down
1 change: 1 addition & 0 deletions apitally/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def add_uuids_to_data(self, data: Dict[str, Any]) -> Dict[str, Any]:

def get_sync_data(self) -> Dict[str, Any]:
data = {
"timestamp": time.time(),
"requests": self.request_counter.get_and_reset_requests(),
"validation_errors": self.validation_error_counter.get_and_reset_validation_errors(),
"server_errors": self.server_error_counter.get_and_reset_server_errors(),
Expand Down
15 changes: 7 additions & 8 deletions apitally/client/client_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from io import BufferedReader
from queue import Queue
from threading import Event, Thread
from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Dict, Optional
from uuid import UUID

import backoff
Expand Down Expand Up @@ -58,7 +58,7 @@ def __init__(
self.proxies = {"https": proxy} if proxy else None
self._thread: Optional[Thread] = None
self._stop_sync_loop = Event()
self._sync_data_queue: Queue[Tuple[float, Dict[str, Any]]] = Queue()
self._sync_data_queue: Queue[Dict[str, Any]] = Queue()

def start_sync_loop(self) -> None:
self._stop_sync_loop.clear()
Expand Down Expand Up @@ -114,20 +114,19 @@ def send_startup_data(self, session: requests.Session) -> None:

def send_sync_data(self, session: requests.Session) -> None:
data = self.get_sync_data()
self._sync_data_queue.put_nowait((time.time(), data))
self._sync_data_queue.put_nowait(data)

i = 0
while not self._sync_data_queue.empty():
timestamp, data = self._sync_data_queue.get_nowait()
data = self._sync_data_queue.get_nowait()
try:
if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME:
if time.time() - data["timestamp"] <= MAX_QUEUE_TIME:
if i > 0:
time.sleep(random.uniform(0.1, 0.3))
data["time_offset"] = time_offset
time.sleep(random.uniform(0.1, 0.5))
self._send_sync_data(session, data)
i += 1
except requests.RequestException:
self._sync_data_queue.put_nowait((timestamp, data))
self._sync_data_queue.put_nowait(data)
break
finally:
self._sync_data_queue.task_done()
Expand Down

0 comments on commit 60b971a

Please sign in to comment.