-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SYNPY-1548] Swap to a FIFO queue #1147
[SYNPY-1548] Swap to a FIFO queue #1147
Conversation
This pull request sets up GitHub code scanning for this repository. Once the scans have completed and the checks have passed, the analysis results for this pull request branch will appear on this overview. Once you merge this pull request, the 'Security' tab will show more code scanning analysis results (for example, for the default branch). Depending on your configuration and choice of analysis tool, future pull requests will be annotated with code scanning analysis results. For more information about GitHub code scanning, check out the documentation. |
…ong with TDQM progress bar formatting
Hello @BryanFauble! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:
Comment last updated at 2024-12-17 21:34:38 UTC |
transfer_count: int = getattr(_thread_local, "transfer_count", 0) | ||
transfer_count -= 1 | ||
if transfer_count < 0: | ||
transfer_count = 0 | ||
|
||
_thread_local.transfer_count = transfer_count | ||
if progress_bar is not None and not transfer_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://sagebionetworks.jira.com/browse/SYNPY-1507 - There was an issue where using asyncio.gather
to execute a bunch of download tasks in parallel would cause the progress bar to get closed, and no longer show. By tracking the files being transferred and increment when we open a new bar/decrement when closing a bar we can close the bar when the last transfer occurs. It allows the bar to remain open for the duration of the transfer and maintain it's progress/context throughout.
async def worker( | ||
self, | ||
queue: asyncio.Queue, | ||
failure_strategy: FailureStrategy, | ||
synapse_client: Synapse, | ||
) -> NoReturn: | ||
""" | ||
Coroutine that will process the queue of work items. This will process the | ||
work items until the queue is empty. This will be used to download files in | ||
parallel. | ||
|
||
Arguments: | ||
queue: The queue of work items to process. | ||
failure_strategy: Determines how to handle failures when retrieving items | ||
out of the queue and an exception occurs. | ||
synapse_client: The Synapse client to use to download the files. | ||
""" | ||
while True: | ||
# Get a "work item" out of the queue. | ||
work_item = await queue.get() | ||
|
||
try: | ||
result = await work_item | ||
except asyncio.CancelledError as ex: | ||
raise ex | ||
except Exception as ex: | ||
result = ex | ||
|
||
self._resolve_sync_from_synapse_result( | ||
result=result, | ||
failure_strategy=failure_strategy, | ||
synapse_client=synapse_client, | ||
) | ||
|
||
queue.task_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By taking this worker approach we are limiting the number of concurrent file transfers. Asyncio also provides a semaphore (https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore) - But, this First in, First out queue seemed like the more intuitive approach to solving this problem.
side_effect=[ | ||
mocked_project_rest_api_dict(), | ||
mocked_folder_rest_api_dict(), | ||
mocked_file_rest_api_dict(), | ||
mocked_folder_rest_api_dict(), | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to swapping over to using an AsyncIO queue the files are instantly downloaded as they're added to the queue, instead of waiting for the code to execute all of the tasks as Folder tasks are added to the list. It changes the order of operations slightly, and as a result - Broke these unit tests.
with syn._requests_session_storage.stream( | ||
method="GET", url=presigned_url_provider.get_info().url | ||
) as response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the logic to here aligns with how the logic works when streaming each individual part of the files. This is executing in a different thread so the presigned URL should be instantly used.
I also wrapped this in a retry block as well to ensure it'll remain functional.
else: | ||
client.logger.info( | ||
f"[{file.id}:{file_name}]: Found existing file at {download_path}, skipping download." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In lieu of structured logging setup this is at least a start to get to the point where logging is clearer within the client. I updated a bunch of the messages around the download process so that it's clear what Synapse ID/file/entity produced the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Awesome update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥 Nice use of FIFO queue so that it's not just a completely random ordering of tasks being executed. Does this slow down download speeds a bit?
@thomasyu888 i redid a few of the benchmark tests and they were all within 5% of the previous tests |
Problem:
Solution:
Testing: