Skip to content

Fix silent consumer death after broker events and connection recovery#82

Merged
blckmn merged 1 commit intoblckmn:masterfrom
jkamleh:master
Apr 1, 2026
Merged

Fix silent consumer death after broker events and connection recovery#82
blckmn merged 1 commit intoblckmn:masterfrom
jkamleh:master

Conversation

@jkamleh
Copy link
Copy Markdown
Contributor

@jkamleh jkamleh commented Apr 1, 2026

Summary

Fixes an issue where consumers silently stop receiving messages from RabbitMQ after a random period (hours to days), requiring a service restart to recover.

Root cause appears to be a combination of bugs in the consumer lifecycle:

  • No ConsumerCancelled/Shutdown handlers - when the broker cancels a consumer (HA failover, queue policy change, memory alarm), the consumer goes deaf with zero errors logged
  • Stale channel after connection recovery - AutomaticRecoveryEnabled recovers the connection, but the cached _channel field still points to the old dead channel. No RecoverySucceeded handler existed to invalidate it
  • Race condition in RestartIn() - the _timer.Enabled guard was not atomic, allowing concurrent threads to either both enter or both skip the restart path
  • Unhandled exception in error path - BasicNack after a channel close threw an exception that was caught by the outer handler, which called RestartIn() again, but the timer was already running so the second call was a no-op. The consumer never restarted
  • _stopping guard on event handlers - ConsumerCancelled/Shutdown events also fire during intentional Stop()/Dispose(), which would schedule an unwanted restart. A volatile flag distinguishes intentional teardown from broker-initiated disconnects

All fixes are internal to BasicRabbitService and QueueService. No changes to public interfaces, method signatures, configuration models, or enum values.

Test plan

  • Verify full solution builds with no new errors
  • Run existing test suite - 41/46 pass, same 5 pre-existing test isolation failures as original code (All tests pass individually; failures are caused by shared queue state between tests in the same run)
  • Integration test: kill the RabbitMQ connection mid-consume and verify the consumer recovers automatically
    • Tested, killed the RabbitMQ connection via management API mid-consume and confirmed the consumer detected the drop, recovered, and resumed processing messages automatically, see screenshot:
    image
  • Integration test: delete and recreate the queue while a consumer is active and verify it restarts
    • Tested using example Subscriber.Service locally, deleted MyQueue locally got: Consumer cancelled by broker on queue MyQueue, tags: Subscriber. Scheduling restart.
    • Recreated queue, got: Consumer started on queue MyQueue
  • Verify PublishService still works (uses original single-arg base constructor)
    • The PublishWithExchange, PublishWithProperties, PublishDirect, PublishEmptyMessage, PublishWithException and ToExchange tests all use PublishService and they all passed in our test run.

- Add ConsumerCancelled and Shutdown event handlers on AsyncEventingBasicConsumer so broker-initiated cancellations (HA failover, queue policy, memory alarm) are detected instead of silently dropping the consumer
- Subscribe to IAutorecoveringConnection.RecoverySucceeded to invalidate the stale channel reference after automatic connection recovery, and re-register the consumer via a new OnRecovered() virtual method
- Replace non-thread-safe _timer.Enabled check in RestartIn() with Interlocked.CompareExchange to prevent race conditions where concurrent error threads either both enter or both skip the restart path
- Guard BasicNack in the RestartConnection error path with a try/catch and channel.IsClosed check to prevent an exception cascade that silently kills the restart timer
- Add error handling in TimerActivation() for the NackOnException path so a closed channel during batch nack falls back to a full restart instead of throwing
- Clean up connection event handler subscriptions in ClearConnection() and dispose the timer in Cleanup() to prevent resource leaks
- Add BasicRabbitService(config, logger) constructor overload for connection-level logging without breaking existing subclasses using the single-arg constructor
@blckmn blckmn merged commit fcce097 into blckmn:master Apr 1, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants