Skip to content

Commit

Permalink
feat: BatchWorkExecutor batch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanViast committed Jun 5, 2024
1 parent c66b184 commit 872695f
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions ethereumetl/executors/batch_work_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# SOFTWARE.

import logging
import os
import time

from requests.exceptions import Timeout as RequestsTimeout, HTTPError, TooManyRedirects
Expand All @@ -31,13 +32,21 @@
from ethereumetl.misc.retriable_value_error import RetriableValueError
from ethereumetl.progress_logger import ProgressLogger
from ethereumetl.utils import dynamic_batch_iterator
from enum import Enum, auto

RETRY_EXCEPTIONS = (ConnectionError, HTTPError, RequestsTimeout, TooManyRedirects, Web3Timeout, OSError,
RetriableValueError)

BATCH_CHANGE_COOLDOWN_PERIOD_SECONDS = 2 * 60



class BatchMode(Enum):
Concurrency = auto()
Batch = auto()

mode = os.getenv("BATCH_MODE", "Batch")

# Executes the given work in batches, reducing the batch size exponentially in case of errors.
class BatchWorkExecutor:
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=5):
Expand All @@ -55,6 +64,12 @@ def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCE

def execute(self, work_iterable, work_handler, total_items=None):
self.progress_logger.start(total_items=total_items)

if BatchMode[mode.capitalize()] == BatchMode.Concurrency:
for item in work_iterable:
self.executor.submit(self._fail_safe_execute, work_handler, [item])
return

for batch in dynamic_batch_iterator(work_iterable, lambda: self.batch_size):
self.executor.submit(self._fail_safe_execute, work_handler, batch)

Expand Down

0 comments on commit 872695f

Please sign in to comment.