Skip to content

Commit

Permalink
Merge branch 'master' into parallelize_validate
Browse files Browse the repository at this point in the history
  • Loading branch information
ssssarah authored Aug 16, 2024
2 parents b68c35b + a1ae462 commit 1334e32
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions kgforge/specializations/stores/nexus/batch_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import asyncio

from typing import Callable, Dict, List, Optional, Tuple, Type, Any

from kgforge.core.commons.constants import DEFAULT_REQUEST_TIMEOUT
from typing_extensions import Unpack

from aiohttp import ClientSession
from aiohttp import ClientSession, ClientTimeout

from kgforge.core.resource import Resource
from kgforge.core.commons.exceptions import RunException
Expand All @@ -15,6 +17,8 @@
BatchResult = namedtuple("BatchResult", ["resource", "response"])
BatchResults = List[BatchResult]

BATCH_REQUEST_TIMEOUT_PER_REQUEST = DEFAULT_REQUEST_TIMEOUT


class BatchRequestHandler:

Expand All @@ -33,7 +37,7 @@ async def dispatch_action():
semaphore = asyncio.Semaphore(service.max_connection)
loop = asyncio.get_event_loop()

async with ClientSession() as session:
async with ClientSession(timeout=ClientTimeout(total=BATCH_REQUEST_TIMEOUT_PER_REQUEST)) as session:
tasks = task_creator(
semaphore, session, loop, data, service, **kwargs
)
Expand Down Expand Up @@ -82,19 +86,24 @@ async def request(resource: Optional[Resource]) -> BatchResult:
)

async with semaphore:
async with session.request(
method=method,
url=url,
headers=headers,
data=json.dumps(payload, ensure_ascii=True),
params=params
) as response:
content = await response.json()
if response.status < 400:
return BatchResult(resource, content)

error = exception(_error_message(content))
return BatchResult(resource, error)
try:
async with session.request(
method=method,
url=url,
headers=headers,
data=json.dumps(payload, ensure_ascii=True),
params=params,
) as response:
content = await response.json()
if response.status < 400:
return BatchResult(resource, content)

error = exception(_error_message(content))
return BatchResult(resource, error)

except asyncio.exceptions.TimeoutError as timeout_error:

return BatchResult(resource, exception(str(timeout_error)))

tasks = []

Expand Down

0 comments on commit 1334e32

Please sign in to comment.