From e0edddd6b1a34e7b525dfb1574fcdd8ff1a0884c Mon Sep 17 00:00:00 2001 From: crazywen Date: Fri, 27 Mar 2026 11:58:25 +0800 Subject: [PATCH] [Java] feat: add empty-queue skip scheduling for SimpleConsumer long-polling When using SimpleConsumer with long-polling, if message distribution across queues is uneven (some queues empty, others with messages), the consumer gets blocked by long-polling on empty queues, causing significant delay in receiving messages from queues that actually have data. Enhanced SubscriptionLoadBalancer with empty-queue skip scheduling: - After each receiveMessage call, the result is fed back to the load balancer - Queues that consecutively return empty are temporarily skipped in takeMessageQueue() - Skip duration is proportional to consecutive empty count (capped at 1000 rounds) - When a queue returns messages again, its empty counter resets immediately - Fully backward-compatible: without feedback, behavior is identical to round-robin Co-Authored-By: crazywen --- .../java/impl/consumer/ConsumerImpl.java | 24 +++ .../impl/consumer/SimpleConsumerImpl.java | 2 +- .../consumer/SubscriptionLoadBalancer.java | 163 ++++++++++++++- .../SubscriptionLoadBalancerTest.java | 193 ++++++++++++++++++ 4 files changed, 376 insertions(+), 6 deletions(-) create mode 100644 java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancerTest.java diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java index 38c017012..c061dad5f 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java @@ -78,6 +78,23 @@ public abstract class ConsumerImpl extends ClientImpl { @SuppressWarnings("SameParameterValue") protected ListenableFuture receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq, Duration awaitDuration) { + return receiveMessage(request, mq, awaitDuration, null); + } + + /** + * Receive messages from the specified queue with optional load balancer feedback. + * + *

When a {@link SubscriptionLoadBalancer} is provided, the receive result will be fed back + * to the load balancer to track empty queues, enabling the skip scheduling optimization. + * + * @param request the receive message request + * @param mq the message queue to receive from + * @param awaitDuration the long-polling await duration + * @param loadBalancer optional load balancer for empty-queue tracking, may be null + * @return future of receive message result + */ + protected ListenableFuture receiveMessage(ReceiveMessageRequest request, + MessageQueueImpl mq, Duration awaitDuration, SubscriptionLoadBalancer loadBalancer) { List messages = new ArrayList<>(); try { final Endpoints endpoints = mq.getBroker().getEndpoints(); @@ -113,6 +130,13 @@ protected ListenableFuture receiveMessage(ReceiveMessageRe final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp); messages.add(view); } + if (loadBalancer != null) { + if (messageList.isEmpty()) { + loadBalancer.markEmptyResult(mq); + } else { + loadBalancer.markNonEmptyResult(mq); + } + } StatusChecker.check(status, future); final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages); return Futures.immediateFuture(receiveMessageResult); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java index a67955062..8b837476a 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java @@ -170,7 +170,7 @@ public ListenableFuture> receive0(int maxMessageNum, Duration final MessageQueueImpl mq = result.takeMessageQueue(); final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration, awaitDuration); - return receiveMessage(request, mq, awaitDuration); + return receiveMessage(request, mq, awaitDuration, result); }, MoreExecutors.directExecutor()); return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()), clientCallbackExecutor); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java index e9b3c5193..81ba79441 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java @@ -20,7 +20,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.math.IntMath; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.stream.Collectors; import javax.annotation.concurrent.Immutable; import org.apache.commons.lang3.RandomUtils; @@ -28,23 +31,54 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.route.TopicRouteData; +/** + * Subscription load balancer with empty-queue skip scheduling. + * + *

When using long-polling to receive messages, if some queues are empty, the consumer will be + * blocked by long-polling on those empty queues, causing delayed message reception on queues that + * actually have messages. This enhanced load balancer tracks empty queue results and temporarily + * skips queues that have been returning empty results consecutively, so that queues with messages + * can be polled more frequently. + */ @Immutable public class SubscriptionLoadBalancer { + /** + * Maximum number of consecutive skip rounds for an empty queue. + */ + private static final int MAX_SKIP_ROUNDS = 1000; + + /** + * Base multiplier: each consecutive empty result increases skip rounds by this amount. + */ + private static final int SKIP_ROUNDS_PER_EMPTY = 50; + + /** + * Maximum consecutive empty results tracked before resetting the counter. + */ + private static final int MAX_EMPTY_RESULTS = 200; + /** * Message queues to receive message. */ protected final ImmutableList messageQueues; + /** * Index for round-robin. */ private final AtomicInteger index; + /** + * Tracks empty-result state per queue for skip scheduling. + */ + private volatile Map emptyStateMap; + public SubscriptionLoadBalancer(TopicRouteData topicRouteData) { - this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData); + this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData, null); } - private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) { + private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData, + Map emptyStateMap) { this.index = index; final List mqs = topicRouteData.getMessageQueues().stream() .filter(SubscriptionLoadBalancer::isReadableMasterQueue) @@ -53,6 +87,7 @@ private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteD throw new IllegalArgumentException("No readable message queue found, topiRouteData=" + topicRouteData); } this.messageQueues = ImmutableList.builder().addAll(mqs).build(); + this.emptyStateMap = emptyStateMap != null ? emptyStateMap : new ConcurrentHashMap<>(); } /** @@ -63,11 +98,129 @@ public static boolean isReadableMasterQueue(MessageQueueImpl mq) { } SubscriptionLoadBalancer update(TopicRouteData topicRouteData) { - return new SubscriptionLoadBalancer(index, topicRouteData); + return new SubscriptionLoadBalancer(index, topicRouteData, emptyStateMap); } + /** + * Select the next message queue, skipping queues that have been returning empty results. + * + *

When a queue has been marked as empty, it will be temporarily skipped for a number of + * rounds proportional to its consecutive empty results count. This prevents long-polling from + * blocking on empty queues while other queues have messages waiting. + * + * @return the next message queue to poll + */ public MessageQueueImpl takeMessageQueue() { + return selectNext(0); + } + + private MessageQueueImpl selectNext(int attempt) { final int next = index.getAndIncrement(); - return messageQueues.get(IntMath.mod(next, messageQueues.size())); + int idx = IntMath.mod(next, messageQueues.size()); + MessageQueueImpl mq = messageQueues.get(idx); + if (attempt < messageQueues.size() && shouldSkip(mq)) { + return selectNext(attempt + 1); + } + resetSkipRounds(mq); + return mq; + } + + private boolean shouldSkip(MessageQueueImpl mq) { + QueueEmptyState state = emptyStateMap.get(queueKey(mq)); + if (state != null && state.shouldSkip()) { + state.incrementSkipRounds(); + return true; + } + return false; + } + + /** + * Mark a queue as having returned an empty result. This increases the skip weight for the queue. + * + * @param mq the message queue that returned no messages + */ + public void markEmptyResult(MessageQueueImpl mq) { + QueueEmptyState state = emptyStateMap.computeIfAbsent(queueKey(mq), k -> new QueueEmptyState()); + state.incrementEmptyResults(); + } + + /** + * Mark a queue as having returned messages. This resets the empty counter for the queue. + * + * @param mq the message queue that returned messages + */ + public void markNonEmptyResult(MessageQueueImpl mq) { + QueueEmptyState state = emptyStateMap.get(queueKey(mq)); + if (state != null) { + state.resetEmptyResults(); + } + } + + private void resetSkipRounds(MessageQueueImpl mq) { + QueueEmptyState state = emptyStateMap.get(queueKey(mq)); + if (state != null) { + state.resetSkipRounds(); + } + } + + private String queueKey(MessageQueueImpl mq) { + return mq.toString(); + } + + /** + * Tracks the empty-result state for a single message queue. + */ + static class QueueEmptyState { + private static final AtomicIntegerFieldUpdater SKIP_ROUNDS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(QueueEmptyState.class, "skipRounds"); + private static final AtomicIntegerFieldUpdater EMPTY_RESULTS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(QueueEmptyState.class, "consecutiveEmptyResults"); + + /** + * Number of rounds this queue has been skipped in the current skip cycle. + */ + private volatile int skipRounds; + /** + * Number of consecutive empty results received from this queue. + */ + private volatile int consecutiveEmptyResults; + + void incrementSkipRounds() { + SKIP_ROUNDS_UPDATER.incrementAndGet(this); + } + + void resetSkipRounds() { + SKIP_ROUNDS_UPDATER.set(this, 0); + } + + void incrementEmptyResults() { + EMPTY_RESULTS_UPDATER.updateAndGet(this, v -> (v + 1) % MAX_EMPTY_RESULTS); + } + + void resetEmptyResults() { + EMPTY_RESULTS_UPDATER.set(this, 0); + } + + /** + * Determine whether this queue should be skipped in the current scheduling round. + * + *

A queue is skipped when it has consecutive empty results and the current skip rounds + * have not exceeded the calculated threshold. The threshold grows linearly with consecutive + * empty results (capped at {@link #MAX_SKIP_ROUNDS}). + */ + boolean shouldSkip() { + int emptyCount = consecutiveEmptyResults; + return emptyCount > 0 + && skipRounds <= Math.min((long) emptyCount * SKIP_ROUNDS_PER_EMPTY, MAX_SKIP_ROUNDS); + } + + // Visible for testing + int getSkipRounds() { + return skipRounds; + } + + int getConsecutiveEmptyResults() { + return consecutiveEmptyResults; + } } -} \ No newline at end of file +} diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancerTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancerTest.java new file mode 100644 index 000000000..5c29b1f64 --- /dev/null +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancerTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.impl.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.MessageQueue; +import apache.rocketmq.v2.Permission; +import com.google.common.collect.ImmutableList; +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.client.java.route.MessageQueueImpl; +import org.apache.rocketmq.client.java.route.TopicRouteData; +import org.junit.Before; +import org.junit.Test; + +public class SubscriptionLoadBalancerTest { + + private TopicRouteData topicRouteData; + private SubscriptionLoadBalancer loadBalancer; + + @Before + public void setUp() { + final Broker broker = Broker.newBuilder() + .setName("broker-0") + .setId(0) + .setEndpoints(apache.rocketmq.v2.Endpoints.newBuilder() + .addAddresses(apache.rocketmq.v2.Address.newBuilder() + .setHost("127.0.0.1") + .setPort(8080) + .build()) + .build()) + .build(); + + final MessageQueue mq0 = MessageQueue.newBuilder() + .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName("test-topic").build()) + .setId(0) + .setPermission(Permission.READ_WRITE) + .setBroker(broker) + .build(); + final MessageQueue mq1 = MessageQueue.newBuilder() + .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName("test-topic").build()) + .setId(1) + .setPermission(Permission.READ_WRITE) + .setBroker(broker) + .build(); + final MessageQueue mq2 = MessageQueue.newBuilder() + .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName("test-topic").build()) + .setId(2) + .setPermission(Permission.READ_WRITE) + .setBroker(broker) + .build(); + + topicRouteData = new TopicRouteData(ImmutableList.of( + mq0, + mq1, + mq2 + )); + + loadBalancer = new SubscriptionLoadBalancer(topicRouteData); + } + + @Test + public void testBasicRoundRobin() { + // Without any empty marking, should distribute across all queues + Set queueIds = new HashSet<>(); + for (int i = 0; i < 6; i++) { + MessageQueueImpl mq = loadBalancer.takeMessageQueue(); + queueIds.add(mq.getQueueId()); + } + assertEquals("Should use all 3 queues", 3, queueIds.size()); + } + + @Test + public void testEmptyQueueSkipping() { + // Take a queue and mark it as empty multiple times + MessageQueueImpl firstMq = loadBalancer.takeMessageQueue(); + + // Mark first queue as empty multiple times to build up skip weight + for (int i = 0; i < 5; i++) { + loadBalancer.markEmptyResult(firstMq); + } + + // Now the empty queue should be skipped for a while + // Take several queues - the empty one should be skipped + Set queueIds = new HashSet<>(); + for (int i = 0; i < 6; i++) { + MessageQueueImpl mq = loadBalancer.takeMessageQueue(); + queueIds.add(mq.getQueueId()); + } + // With 3 queues and 1 marked empty, we should primarily see the other 2 + assertNotNull("Should return valid queues", queueIds); + } + + @Test + public void testNonEmptyResultResetsSkipping() { + MessageQueueImpl firstMq = loadBalancer.takeMessageQueue(); + + // Mark as empty + for (int i = 0; i < 10; i++) { + loadBalancer.markEmptyResult(firstMq); + } + + // Now mark as non-empty - should reset the skip state + loadBalancer.markNonEmptyResult(firstMq); + + // The queue should no longer be skipped + boolean foundFirst = false; + for (int i = 0; i < 10; i++) { + MessageQueueImpl mq = loadBalancer.takeMessageQueue(); + if (mq.getQueueId() == firstMq.getQueueId()) { + foundFirst = true; + break; + } + } + assertTrue("Queue should be selectable again after marking non-empty", foundFirst); + } + + @Test + public void testQueueEmptyStateShouldSkip() { + SubscriptionLoadBalancer.QueueEmptyState state = new SubscriptionLoadBalancer.QueueEmptyState(); + + // Initially should not skip + assertFalse("Fresh state should not skip", state.shouldSkip()); + + // After marking empty once, should skip + state.incrementEmptyResults(); + assertTrue("Should skip after empty result", state.shouldSkip()); + + // After many skip rounds, should eventually stop skipping + for (int i = 0; i < 100; i++) { + state.incrementSkipRounds(); + } + assertFalse("Should stop skipping after enough rounds", state.shouldSkip()); + } + + @Test + public void testQueueEmptyStateReset() { + SubscriptionLoadBalancer.QueueEmptyState state = new SubscriptionLoadBalancer.QueueEmptyState(); + state.incrementEmptyResults(); + state.incrementEmptyResults(); + assertTrue("Should skip with consecutive empties", state.shouldSkip()); + + state.resetEmptyResults(); + assertFalse("Should not skip after reset", state.shouldSkip()); + } + + @Test + public void testUpdatePreservesEmptyState() { + MessageQueueImpl mq = loadBalancer.takeMessageQueue(); + loadBalancer.markEmptyResult(mq); + loadBalancer.markEmptyResult(mq); + + // Update should preserve the empty state map + SubscriptionLoadBalancer updated = loadBalancer.update(topicRouteData); + assertNotNull("Updated balancer should not be null", updated); + } + + @Test + public void testAllQueuesEmptyStillReturnsQueue() { + // Even if all queues are empty, takeMessageQueue should still return a queue + // (it won't skip all queues - attempt limit equals queue count) + for (int round = 0; round < 3; round++) { + MessageQueueImpl mq = loadBalancer.takeMessageQueue(); + for (int i = 0; i < 5; i++) { + loadBalancer.markEmptyResult(mq); + } + } + + // Should still return a queue even when all are marked empty + MessageQueueImpl mq = loadBalancer.takeMessageQueue(); + assertNotNull("Should always return a queue", mq); + } +}