EventQueue: enqueue items in child queues without blocking#860
EventQueue: enqueue items in child queues without blocking#860
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request modifies the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a non-blocking mechanism for propagating events to child queues in EventQueue, which is a solid improvement to prevent deadlocks with slow consumers. The use of background tasks is well-implemented, and the addition of flush() and updates to close() are necessary and correctly placed. I've identified a potential race condition in the close method and a few areas for minor improvements in both the implementation and tests to enhance robustness and readability. Overall, this is a great change that improves the resilience of the event system.
|
|
||
| if immediate: | ||
| # Cancel all pending background propagation tasks | ||
| for task in self._bg_tasks: |
There was a problem hiding this comment.
|
|
||
| # Clean up to prevent background tasks from leaking or complaining | ||
| await child_queue.dequeue_event() | ||
| await child_queue.dequeue_event() |
There was a problem hiding this comment.
The current cleanup logic might be racy. After the first dequeue_event, the blocked background task for the second event can proceed, but it's not guaranteed to have completed before the second dequeue_event is called. This could lead to a flaky test. Using event_queue.flush() would make this more robust.
| await child_queue.dequeue_event() | |
| await event_queue.flush() # Wait for the background task for event2 to complete | |
| await child_queue.dequeue_event() # Dequeue event2 |
| if tasks: | ||
| await asyncio.gather(*tasks, return_exceptions=True) |
There was a problem hiding this comment.
| child_queue.queue = asyncio.Queue(maxsize=1) | ||
|
|
||
| # Enqueue first event. It should fit in the child queue. | ||
| event1 = create_sample_message('1') |
| event2 = create_sample_message('2') | ||
| try: | ||
| # Give it a short timeout. If it hangs, it means the parent is blocked. | ||
| await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1) |
There was a problem hiding this comment.
The event2 variable is assigned but never used. You can remove it and create the message directly inside the enqueue_event call to make the code cleaner.
| event2 = create_sample_message('2') | |
| try: | |
| # Give it a short timeout. If it hangs, it means the parent is blocked. | |
| await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1) | |
| try: | |
| # Give it a short timeout. If it hangs, it means the parent is blocked. | |
| await asyncio.wait_for(event_queue.enqueue_event(create_sample_message('2')), timeout=0.1) |
This change allows EventQueue.enqueue_event(...) to return to caller immediately without blocking.
The issue: when one of the child queues is blocked it doesn't block enqueue to all the other child. Additionally it doesn't block the caller at all (that in some scenarios and current architectures could lead to deadlock).
Additional cost: We store all the enqueued tasks in _bg_tasks.