Skip to content

Commit

Permalink
Move to semaphores, fix async iterator stop, add pypi badge
Browse files Browse the repository at this point in the history
  • Loading branch information
MichalKarol committed Nov 10, 2024
1 parent e1a35a2 commit f12d401
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 33 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# FuturePool
FuturePool is a package that introduce known concept of multiprocessing Pool to the async/await world. It allows for easy translation from multiprocessing to async/await, while keeping the core principle - specified number of workers. FuturePool allows for more flexible usage by providing starimap/starimap_unordered.
# FuturePool [![PyPI - Version](https://img.shields.io/pypi/v/futurepool?style=for-the-badge)](https://pypi.org/project/futurepool/)

FuturePool is a package that introduce known concept of multiprocessing Pool to the async/await world, resulting in async workers pool library. It allows for easy translation from multiprocessing to async/await, while keeping the core principle - specified number of workers. FuturePool allows for more flexible usage by providing starimap/starimap_unordered.

FuturePool was created to handle web scrapping, where in order to not overwhelm website with connections and comply with website requirements, specified number of workers was used. FuturePool was extended to handle generic scenarios and published.

Expand Down
74 changes: 45 additions & 29 deletions futurepool/futurepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,38 @@ def __aiter__(self):
return self

async def __anext__(self) -> U:
future = next(self.iterator)
future = next(self.iterator, None)
if not future:
raise StopAsyncIteration()
return await future

def __init__(self, number_of_workers: int = (os.cpu_count() or 1)):
""" """
assert number_of_workers > 0, "Number of workers must be a positive number"
self.number_of_workers = number_of_workers
self.loop = get_running_loop()
self.workers_locks = [asyncio.Lock() for _ in range(self.number_of_workers)]
self.workers_lock = asyncio.Semaphore(self.number_of_workers)
self.tasks = set[Task]()
self.tasks_lock = asyncio.Lock()

async def __aenter__(self):
return self

async def __aexit__(self, type, value, traceback):
if self.loop.is_running():
for lock in self.workers_locks:
await lock.acquire()
for task in self.tasks:
task.cancel()
try:
await task
except asyncio.InvalidStateError:
pass
except asyncio.CancelledError:
pass
for lock in self.workers_locks:
lock.release()
for _ in range(self.number_of_workers):
await self.workers_lock.acquire()
async with self.tasks_lock:
for task in self.tasks:
task.cancel()
try:
await task
except asyncio.InvalidStateError:
pass
except asyncio.CancelledError:
pass
for _ in range(self.number_of_workers):
self.workers_lock.release()
return False

def _get_iterator_(
Expand All @@ -70,6 +74,8 @@ def _get_iterator_(
futures = list[Future[U]]()
args = deque[tuple[int, T]]()
not_finished_futures = deque[Future[U]]()
args_lock = asyncio.Lock()
not_finished_futures_lock = asyncio.Lock()

def add_task():
arg_tuple = next(iterator, None)
Expand All @@ -82,34 +88,44 @@ def add_task():
not_finished_futures.append(future)
return True

async def worker(w_id: int):
while len(args) > 0:
async with self.workers_locks[w_id]:
i, arg = args.popleft()
async def worker():
async with self.tasks_lock:
self.tasks.add(asyncio.current_task())
while True:
async with self.workers_lock:
async with args_lock:
if args:
i, arg = args.popleft()
else:
break
try:
result = await fn(*arg)
future = (
futures[i] if ordered else not_finished_futures.popleft()
)
if ordered:
future = futures[i]
else:
async with not_finished_futures_lock:
future = not_finished_futures.popleft()
future.set_result(result)
except asyncio.InvalidStateError:
return
except asyncio.CancelledError:
return
except Exception as e:
future = (
futures[i] if ordered else not_finished_futures.popleft()
)
if ordered:
future = futures[i]
else:
async with not_finished_futures_lock:
future = not_finished_futures.popleft()
future.set_exception(e)
add_task()
self.tasks.remove(asyncio.current_task())
async with self.tasks_lock:
self.tasks.remove(asyncio.current_task())

def create_workers():
for w_id in range(self.number_of_workers):
for _ in range(self.number_of_workers):
if not add_task():
return
task = self.loop.create_task(worker(w_id))
self.tasks.add(task)
self.loop.create_task(worker())

class FutureIterator:
def __init__(self):
Expand All @@ -124,7 +140,7 @@ def __next__(self) -> Awaitable[U]:

if len(futures) == self.current:
if not add_task():
raise StopIteration
raise StopIteration()
future = futures[self.current]
self.current += 1
return future
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "futurepool"
version = "1.0.0"
description = "FuturePool is a package that introduce known concept of multiprocessing Pool to the async/await world. It allows for easy translation from multiprocessing to async/await, while keeping the core principle - specified number of workers. FuturePool allows for more flexible usage by providing starimap/starimap_unordered."
version = "1.0.1"
description = "FuturePool is a package that introduce known concept of multiprocessing Pool to the async/await world, resulting in async workers pool library. It allows for easy translation from multiprocessing to async/await, while keeping the core principle - specified number of workers. FuturePool allows for more flexible usage by providing starimap/starimap_unordered."
authors = ["Michal Karol <[email protected]>"]
license = "MIT License"
readme = "README.md"
Expand Down

0 comments on commit f12d401

Please sign in to comment.