Fix memory leak due to lingering messages in the message store even after removal of message from queues#109
Open
ashah4 wants to merge 115 commits intoNetflix:masterfrom
Open
Conversation
revert the oss plugin version
[debug] revert gradle version
Refactor queues to use java.time.Clock
gradle updates Upgrade plugins Updating Redis Pipeline Queue v2 with proper names
Cde updates
Remove some unused lines (solves Netflix#29)
unsafePopWithMsgId() checks all 3 shards for a msgId, but only one of them can have it. So make sure that the checks against the other 2 shards do not spam the logs with "Cannot find message with ID ...".
This API looks for a value with a matching predicate in the hashmap and pops the first message ID that matches if it does find one. Also added a atomicPopWithMsgIdHelper() helper function to do a pop in one round trip. This should only be used if the ring size of the underlying Dynomite cluster is 1. Testing: Tested with DynoQueueDemo
Using queues with DC_EACH_SAFE_QUORUM is quite expensive. The bulk pop operation is meant to pop more within a single round trip.
unsafeBulkPop() allows bulk popping from all shards. loaclGet() does a get() with a non quorum connection. TODO: unsafeBulkPop() will return nil if messageCount > size(). Fix this. TODO 2: Do code cleanup.
Previously popWithMsgPredicate() would pop the first item it found in the hashmap that matches the given predicate. With this patch, it will obey the queueing priority.
Tests fail without this fix.
In some weird cases, we find the message in the queue but without a payload in the hashmap. Although this is never expected, it seems to happen, and this patch should stop the bleeding until we find out the root cause.
Attempts to return the items present in the local queue shard but not in the hashmap, if any. (Ideally, we would not require this function, however, in some configurations, especially with multi-region write traffic sharing the same queue, we may find ourselves with stale items in the queue shards)
Note: All items returned MUST be checked at the app level if they've already been processed before acting on them (eg: removing them)
Upgrade nebula.netflixoss to replace bintray publication and update TravisCi secrets
Replace JCenter with Maven Central
Remove TravisCI and use Github Actions
Use this link to re-run the recipe: https://app.moderne.io/recipes/org.openrewrite.github.ChangeActionVersion?organizationId=TmV0ZmxpeA%3D%3D#defaults=W3sidmFsdWUiOiJhY3Rpb25zL2NhY2hlIiwibmFtZSI6ImFjdGlvbiJ9LHsidmFsdWUiOiJ2NCIsIm5hbWUiOiJ2ZXJzaW9uIn1d Co-authored-by: Moderne <team@moderne.io>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When the remove() is invoked for a particular message in queue, it attempts to remove the message from all the shards of the queue, as well as from the unack queue. However, it removes the message from the message store ONLY when if an entry existed before removal in the message queue.
This might not always be true because a message might have been popped from the queue, and when that happens, it is removed from the queue shard, and added to the unack queue. Hence in such situations, if the remove() is invoked, the entry in the message store hash never gets cleaned up, resulting in memory leak.
This is particularly identified when the conductor's reconciler (Sweeper) may have popped the workflow from decider queue, and at the same time, another thread tried to remove the workflow from the queue.
Changes in the PR
Updated the remove() function to remove the entry from the message store as long as the message was found either in message queue or unack queue. This ensures that there is no memory leak due to lingering entries in the hash.