Skip to content

Commit

Permalink
add progress bar into executor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bhlieberman committed May 14, 2024
1 parent 3ea5afc commit 98adaef
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions biobricks/dvc_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ class DownloadManager:
progress_bar : tqdm = None
active_threads : int = 0
interrupt_event : threading.Event = threading.Event()
total_progress_bar: tqdm = tqdm(desc="total", total=100.0, disable=False)

def exec_task(self, url, path):
def exec_task(self, url, total_progress_bar, path):
path.parent.mkdir(parents=True, exist_ok=True)
response = requests.get(url, stream=True, headers=self.headers)
response.raise_for_status()
Expand All @@ -36,19 +37,22 @@ def exec_task(self, url, path):
if data:
file.write(data)
progress.update(len(data))
# total_progress_bar.update(len(data))
total_progress_bar.update(len(data))

def download_exec(self, urls, paths, max_threads=4):
def download_exec(self, urls, paths, total_size, max_threads=4):
self.progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True, position=0, desc="Overall Progress")
signal.signal(signal.SIGINT, lambda signum, frame: signal_handler(signum, frame, self.interrupt_event))
with ThreadPoolExecutor(max_threads) as exec:
exec_args = dict(zip(urls, paths))
futures = {exec.submit(self.exec_task, url, path) for url, path in exec_args.items()}
futures = {exec.submit(self.exec_task, url, self.progress_bar, path) for url, path in exec_args.items()}
for future in as_completed(futures):
try:
data = future.result()
except Exception as e:
logger.error(e)
logger.warning("Exception occurred while downloading brick.")
self.progress_bar.close()
print(f"\n{len(paths)} files downloaded successfully!")


class DVCFetcher:
Expand Down Expand Up @@ -129,7 +133,7 @@ def fetch_outs(self, brick, prefixes=['brick/', 'data/']) -> tuple[list[dict], i
# download files
cache_paths = [self._remote_url_to_cache_path(url) for url in urls]
downloader = DownloadManager(headers = {'BBToken': biobricks.config.token()})
downloader.download_exec(urls, cache_paths, 4)
downloader.download_exec(urls, cache_paths, total_size=total_size, max_threads=4)

# build a symlink between each cache_path and its corresponding path
brick_paths = [brick.path() / path for path in paths]
Expand Down

0 comments on commit 98adaef

Please sign in to comment.