feat(servicebus): add listSessions to session receiver clients#48956
Open
EldertGrootenboer wants to merge 32 commits intoAzure:mainfrom
Open
feat(servicebus): add listSessions to session receiver clients#48956EldertGrootenboer wants to merge 32 commits intoAzure:mainfrom
EldertGrootenboer wants to merge 32 commits intoAzure:mainfrom
Conversation
- Add OPERATION_GET_MESSAGE_SESSIONS and related constants - Add getMessageSessions to ServiceBusManagementNode interface - Implement AMQP request/response in ManagementChannel with year-cap and 404 handling - Add listSessions() overloads on ServiceBusSessionReceiverAsyncClient with Flux pagination - Add sync wrappers on ServiceBusSessionReceiverClient - Add unit tests for request, response, 204, 404, year-cap, and active-sessions mode
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds a Track 2 public API for listing Service Bus session IDs via the AMQP management operation com.microsoft:get-message-sessions, supporting both “active messages” mode and “updated-since” mode.
Changes:
- Added management constants and a new management-node operation (
getMessageSessions) with AMQP request/response handling. - Introduced new public
listSessions()overloads on session receiver async + sync clients, including pagination logic. - Added unit tests for
ManagementChannel.getMessageSessionsrequest construction and response handling.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementConstants.java | Adds new operation/body keys for get-message-sessions. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java | Exposes a new internal management-node method for listing session IDs. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java | Implements AMQP request + response parsing for get-message-sessions, including year capping and 204/404 handling. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java | Adds public async listSessions APIs with repeated paging calls. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java | Adds sync wrappers for listSessions by blocking on the async implementation. |
| sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ManagementChannelTests.java | Adds unit tests covering get-message-sessions request/response scenarios. |
- ManagementChannel: validate lastUpdatedTime is non-null and cap year-9999 sentinel to UTC so the wire timestamp matches DateTime.MaxValue regardless of source offset. - ServiceBusManagementNode: drop fully-qualified types in signature; document the UTC sentinel in JavaDoc. - ServiceBusSessionReceiverAsyncClient: use fluxError(LOGGER,...) for null updatedAfter, drop unused imports. - ManagementChannelTests: set the SESSION_NOT_FOUND error-condition so the 404 test exercises the actual sendWithVerify pass-through path; remove unused List import. - Apply spotless formatting.
…dFlux Round 2 review feedback. Aligns with Track 1's SessionBrowser/MiscRequestResponseOperationHandler so the implementation tracks proven service behaviour, and adopts PagedFlux/PagedIterable per Azure SDK guidelines for paginated collection APIs. Service-side alignment: - Active-messages sentinel now matches Track 1's SessionBrowser.MAXDATE (new Date(253402300800000L), 10000-01-01T00:00:00Z UTC). The wire value is the one the broker has been validated against for years; using 9999-12-31T23:59:59.999Z (1 ms earlier) risked the broker not switching into active-messages mode. - Pagination cursor now uses the server-returned skip plus the last session ID of the previous page, matching Track 1. Track 2 previously incremented skip locally and always passed lastSessionId=null, which can skip or duplicate sessions when the broker filters expired entries between pages. Surface change: - ServiceBusManagementNode.getMessageSessions now returns Mono<MessageSessionsResult> (sessionIds + nextSkip) so callers can thread the cursor. - listSessions / listSessions(OffsetDateTime) now return PagedFlux<String> (async) and PagedIterable<String> (sync), matching the Azure SDK conventions enforced by the ServiceClient checkstyle rule. - Renamed ManagementConstants.SESSIONS_IDS to SESSION_IDS (wire value 'sessions-ids' unchanged) so the constant reads consistently with SESSION_ID and SEQUENCE_NUMBERS. Tests: - Updated assertions to MessageSessionsResult and the Track 1 sentinel value (253402300800000L). - Added getMessageSessionsHonoursServerReturnedSkip covering the case where the server returns a skip greater than requestSkip + page.size().
Round 4 review feedback. - listSessionsInternal now wraps continuation-token parsing so a caller passing an invalid or opaque token via PagedFlux.byPage(String) sees a clear monoError(LOGGER, IllegalArgumentException) instead of NumberFormatException / StringIndexOutOfBoundsException. - Renamed getMessageSessionsHonoursServerReturnedSkip to getMessageSessionsHonorsServerReturnedSkip to match the American-English spelling used in the rest of the module.
Round 5 review feedback. - ManagementChannel.getMessageSessions now defensively guards the AmqpValue body so a malformed broker/proxy response (null body, non-AmqpValue, non-Map value) returns an empty page instead of NPE/CCE breaking pagination. - readResponseSkip now takes the parsed page size and falls back to requestSkip + pageSize when the server omits the skip field, so a malformed response advances the cursor instead of stalling on the same value. - listSessionsInternal cursor encoding switched to <decimal-skip>|<base64url-utf8(lastSessionId)>. The URL-safe Base64 alphabet does not contain '|', so any byte sequence in the session ID round-trips without escaping. Decode failures now also surface through monoError(LOGGER, IllegalArgumentException) with descriptive messages (no embedded control characters). - ACTIVE_MESSAGES_SENTINEL moved to ManagementConstants so the implementation and the public client reference a single source of truth for the broker-contract sentinel.
…sult immutability Round 28 review feedback: - ManagementChannel.getMessageSessions: a 200 OK with an unexpected body type (null body, non-AmqpValue body, or AmqpValue whose payload isn't a Map) now fails the operation with a clear IllegalStateException instead of returning an empty page. Silently turning a protocol/ compatibility issue into an empty page would terminate pagination and drop any remaining results. The "no sessions-ids key" branch still returns an empty page, which is the legitimate "no more pages" signal. Added test getMessageSessionsRejectsUnexpectedBodyType. - MessageSessionsResult: the ctor now defensively copies the incoming sessionIds list and exposes it via Collections.unmodifiableList(...), so external mutation of the source list cannot alter the result after construction. A null sessionIds argument is now rejected eagerly with NullPointerException instead of NPE'ing at iteration time.
Round 29 review feedback. ManagementChannel.getMessageSessions now fails the operation with IllegalStateException (logged via logExceptionAsWarning, matching the null-entry path) when the sessions-ids value is non-null but neither Object[] nor Iterable<?>, e.g. a String. Previously this silently fell back to an empty list, which would have terminated pagination and dropped any remaining results on a broker/library payload-shape change. Mirrors the round-28 fix for the body-shape branch. Added test getMessageSessionsRejectsUnexpectedSessionIdsPayloadType.
…e skip Round 30 review feedback. ManagementChannel.readResponseSkip now requires the broker-returned skip to be strictly greater than requestSkip; equal would re-fetch the same page, smaller would cursor backwards, and either case risks infinite loops or duplicate results. Both cases now route through computeFallbackSkip(requestSkip, pageSize) just like the missing/non-numeric/negative/out-of-range cases. (The previous check was responseSkip >= 0; the new check responseSkip > requestSkip subsumes that guard since requestSkip is validated as non-negative at the call site.) Added test getMessageSessionsFallsBackWhenServerSkipIsNotMonotonic covering the equal-skip stall case.
Round 31 review feedback. Replaced the hand-rolled "year 99999" value with OffsetDateTime.MAX so the test would actually fail (with ArithmeticException from Date.from) if the clamp were moved after the java.util.Date conversion or removed altogether. Year 99999 doesn't overflow Date.from, so the previous setup didn't actually exercise the overflow protection that the test was meant to lock in. The clamping assertion (cap to 253_402_300_800_000 ms) is unchanged.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Expose GetMessageSessions operation
Adds public API to list session IDs from session-enabled queues and subscriptions. This implements the
com.microsoft:get-message-sessionsAMQP management operation, which was available in Track 1 SDKs (IMessageSessionEntity.getMessageSessions()) but not yet exposed in Track 2.Two modes
The operation has two distinct modes, controlled by the
lastUpdatedTimeparameter on the wire:listSessions()no-arg): Returns sessions that have active messages in the entity. Sessions on the dead-letter queue or sessions with only a session state (but no messages) are NOT returned. The implementation passes the Track 1 active-messages sentinelDate(253402300800000L)(10000-01-01T00:00:00ZUTC), matchingSessionBrowser.MAXDATE.listSessions(OffsetDateTime updatedAfter)): Returns sessions whose session state was updated after the given timestamp.API
PagedFlux/PagedIterableare the Azure SDK conventions for paginated collection APIs (enforced by theServiceClientcheckstyle rule). Pages are fetched lazily as the iterator advances.Pagination
Cursor semantics match Track 1's
SessionBrowser/MiscRequestResponseOperationHandler:skip(server-returned from the previous page) andlastSessionId(the last entry of the previous page).skipvalue to use for the next request — propagated verbatim instead of computingpreviousSkip + page.size()locally so the client does not skip or duplicate sessions when the broker filters expired entries between pages.What changed
ManagementConstants.java: AddedOPERATION_GET_MESSAGE_SESSIONS,SESSION_IDS(wire value"sessions-ids"),LAST_UPDATED_TIME,LAST_SESSION_IDconstants.ServiceBusManagementNode.java: AddedgetMessageSessions()returningMono<MessageSessionsResult>(sessionIds + nextSkip cursor).MessageSessionsResult.java: New internal result type pairing the page with the server-returned skip.ManagementChannel.java: Full AMQP implementation with null-arg validation, Track 1 active-messages sentinel cap (clampsOffsetDateTime.MAXand similar overflows to the sentinel),last-session-idcursor in the request, server-returned skip in the response, and 404+SessionNotFound handling.ServiceBusSessionReceiverAsyncClient.java: PubliclistSessions()overloads asPagedFlux<String>, fetching pages viaPagedResponseBaseand encoding the cursor as<decimal-skip>|<base64url-utf8(lastSessionId)>. The URL-safe Base64 alphabet (A-Z a-z 0-9 - _) does not contain|, so any byte sequence in the session ID round-trips through the cursor without escaping.ServiceBusSessionReceiverClient.java: Sync wrappers asPagedIterable<String>over the asyncPagedFlux.ManagementChannelTests.java: Unit tests for request construction, response parsing, 204, 404+SessionNotFound, sentinel cap, server-returned skip, andlastSessionIdcursor.Related