Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ public abstract class ConsumerImpl extends ClientImpl {
@SuppressWarnings("SameParameterValue")
protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
MessageQueueImpl mq, Duration awaitDuration) {
return receiveMessage(request, mq, awaitDuration, null);
}

/**
* Receive messages from the specified queue with optional load balancer feedback.
*
* <p>When a {@link SubscriptionLoadBalancer} is provided, the receive result will be fed back
* to the load balancer to track empty queues, enabling the skip scheduling optimization.
*
* @param request the receive message request
* @param mq the message queue to receive from
* @param awaitDuration the long-polling await duration
* @param loadBalancer optional load balancer for empty-queue tracking, may be null
* @return future of receive message result
*/
protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
MessageQueueImpl mq, Duration awaitDuration, SubscriptionLoadBalancer loadBalancer) {
List<MessageViewImpl> messages = new ArrayList<>();
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
Expand Down Expand Up @@ -113,6 +130,13 @@ protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRe
final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, transportDeliveryTimestamp);
messages.add(view);
}
if (loadBalancer != null) {
if (messageList.isEmpty()) {
loadBalancer.markEmptyResult(mq);
} else {
loadBalancer.markNonEmptyResult(mq);
}
}
StatusChecker.check(status, future);
final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints, messages);
return Futures.immediateFuture(receiveMessageResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
invisibleDuration, awaitDuration);
return receiveMessage(request, mq, awaitDuration);
return receiveMessage(request, mq, awaitDuration, result);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
clientCallbackExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,65 @@
import com.google.common.collect.ImmutableList;
import com.google.common.math.IntMath;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import javax.annotation.concurrent.Immutable;
import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteData;

/**
* Subscription load balancer with empty-queue skip scheduling.
*
* <p>When using long-polling to receive messages, if some queues are empty, the consumer will be
* blocked by long-polling on those empty queues, causing delayed message reception on queues that
* actually have messages. This enhanced load balancer tracks empty queue results and temporarily
* skips queues that have been returning empty results consecutively, so that queues with messages
* can be polled more frequently.
*/
@Immutable
public class SubscriptionLoadBalancer {

/**
* Maximum number of consecutive skip rounds for an empty queue.
*/
private static final int MAX_SKIP_ROUNDS = 1000;

/**
* Base multiplier: each consecutive empty result increases skip rounds by this amount.
*/
private static final int SKIP_ROUNDS_PER_EMPTY = 50;

/**
* Maximum consecutive empty results tracked before resetting the counter.
*/
private static final int MAX_EMPTY_RESULTS = 200;

/**
* Message queues to receive message.
*/
protected final ImmutableList<MessageQueueImpl> messageQueues;

/**
* Index for round-robin.
*/
private final AtomicInteger index;

/**
* Tracks empty-result state per queue for skip scheduling.
*/
private volatile Map<String, QueueEmptyState> emptyStateMap;

public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData);
this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData, null);
}

private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData,
Map<String, QueueEmptyState> emptyStateMap) {
this.index = index;
final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
.filter(SubscriptionLoadBalancer::isReadableMasterQueue)
Expand All @@ -53,6 +87,7 @@ private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteD
throw new IllegalArgumentException("No readable message queue found, topiRouteData=" + topicRouteData);
}
this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
this.emptyStateMap = emptyStateMap != null ? emptyStateMap : new ConcurrentHashMap<>();
}

/**
Expand All @@ -63,11 +98,129 @@ public static boolean isReadableMasterQueue(MessageQueueImpl mq) {
}

SubscriptionLoadBalancer update(TopicRouteData topicRouteData) {
return new SubscriptionLoadBalancer(index, topicRouteData);
return new SubscriptionLoadBalancer(index, topicRouteData, emptyStateMap);
}

/**
* Select the next message queue, skipping queues that have been returning empty results.
*
* <p>When a queue has been marked as empty, it will be temporarily skipped for a number of
* rounds proportional to its consecutive empty results count. This prevents long-polling from
* blocking on empty queues while other queues have messages waiting.
*
* @return the next message queue to poll
*/
public MessageQueueImpl takeMessageQueue() {
return selectNext(0);
}

private MessageQueueImpl selectNext(int attempt) {
final int next = index.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
int idx = IntMath.mod(next, messageQueues.size());
MessageQueueImpl mq = messageQueues.get(idx);
if (attempt < messageQueues.size() && shouldSkip(mq)) {
return selectNext(attempt + 1);
}
resetSkipRounds(mq);
return mq;
}

private boolean shouldSkip(MessageQueueImpl mq) {
QueueEmptyState state = emptyStateMap.get(queueKey(mq));
if (state != null && state.shouldSkip()) {
state.incrementSkipRounds();
return true;
}
return false;
}

/**
* Mark a queue as having returned an empty result. This increases the skip weight for the queue.
*
* @param mq the message queue that returned no messages
*/
public void markEmptyResult(MessageQueueImpl mq) {
QueueEmptyState state = emptyStateMap.computeIfAbsent(queueKey(mq), k -> new QueueEmptyState());
state.incrementEmptyResults();
}

/**
* Mark a queue as having returned messages. This resets the empty counter for the queue.
*
* @param mq the message queue that returned messages
*/
public void markNonEmptyResult(MessageQueueImpl mq) {
QueueEmptyState state = emptyStateMap.get(queueKey(mq));
if (state != null) {
state.resetEmptyResults();
}
}

private void resetSkipRounds(MessageQueueImpl mq) {
QueueEmptyState state = emptyStateMap.get(queueKey(mq));
if (state != null) {
state.resetSkipRounds();
}
}

private String queueKey(MessageQueueImpl mq) {
return mq.toString();
}

/**
* Tracks the empty-result state for a single message queue.
*/
static class QueueEmptyState {
private static final AtomicIntegerFieldUpdater<QueueEmptyState> SKIP_ROUNDS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(QueueEmptyState.class, "skipRounds");
private static final AtomicIntegerFieldUpdater<QueueEmptyState> EMPTY_RESULTS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(QueueEmptyState.class, "consecutiveEmptyResults");

/**
* Number of rounds this queue has been skipped in the current skip cycle.
*/
private volatile int skipRounds;
/**
* Number of consecutive empty results received from this queue.
*/
private volatile int consecutiveEmptyResults;

void incrementSkipRounds() {
SKIP_ROUNDS_UPDATER.incrementAndGet(this);
}

void resetSkipRounds() {
SKIP_ROUNDS_UPDATER.set(this, 0);
}

void incrementEmptyResults() {
EMPTY_RESULTS_UPDATER.updateAndGet(this, v -> (v + 1) % MAX_EMPTY_RESULTS);
}

void resetEmptyResults() {
EMPTY_RESULTS_UPDATER.set(this, 0);
}

/**
* Determine whether this queue should be skipped in the current scheduling round.
*
* <p>A queue is skipped when it has consecutive empty results and the current skip rounds
* have not exceeded the calculated threshold. The threshold grows linearly with consecutive
* empty results (capped at {@link #MAX_SKIP_ROUNDS}).
*/
boolean shouldSkip() {
int emptyCount = consecutiveEmptyResults;
return emptyCount > 0
&& skipRounds <= Math.min((long) emptyCount * SKIP_ROUNDS_PER_EMPTY, MAX_SKIP_ROUNDS);
}

// Visible for testing
int getSkipRounds() {
return skipRounds;
}

int getConsecutiveEmptyResults() {
return consecutiveEmptyResults;
}
}
}
}
Loading