From 60284646ebb9f7470cf432033be681b34b211044 Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Thu, 2 Apr 2026 10:33:15 +0800 Subject: [PATCH 1/2] feat: Add LITE and PRIORITY message types - Add LITE = 5 for lite topic support - Add PRIORITY = 6 for priority-based message delivery - Align protocol definition with latest messaging features --- apache/rocketmq/v2/definition.proto | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto index 468c410..5835261 100644 --- a/apache/rocketmq/v2/definition.proto +++ b/apache/rocketmq/v2/definition.proto @@ -146,6 +146,12 @@ enum MessageType { // Messages that are transactional. Only committed messages are delivered to // subscribers. TRANSACTION = 4; + + // lite topic + LITE = 5; + + // Messages that lower prioritised ones may need to wait for higher priority messages to be processed first + PRIORITY = 6; } enum DigestType { From 5e81e4586c13ea6abeab234b4f42ff3600b5a1dd Mon Sep 17 00:00:00 2001 From: zhaohaihzb Date: Thu, 2 Apr 2026 11:05:41 +0800 Subject: [PATCH 2/2] update proto --- apache/rocketmq/v2/definition.proto | 44 +++++++++++++++++++++++++++++ apache/rocketmq/v2/service.proto | 33 ++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/apache/rocketmq/v2/definition.proto b/apache/rocketmq/v2/definition.proto index 5835261..516474d 100644 --- a/apache/rocketmq/v2/definition.proto +++ b/apache/rocketmq/v2/definition.proto @@ -192,6 +192,8 @@ enum ClientType { PUSH_CONSUMER = 2; SIMPLE_CONSUMER = 3; PULL_CONSUMER = 4; + LITE_PUSH_CONSUMER = 5; + LITE_SIMPLE_CONSUMER = 6; } enum Encoding { @@ -276,6 +278,12 @@ message SystemProperties { // Information to identify whether this message is from dead letter queue. optional DeadLetterQueue dead_letter_queue = 20; + + // lite topic + optional string lite_topic = 21; + + // Priority of message, which is optional + optional int32 priority = 22; } message DeadLetterQueue { @@ -354,6 +362,8 @@ enum Code { ILLEGAL_POLLING_TIME = 40018; // Offset is illegal. ILLEGAL_OFFSET = 40019; + // Format of lite topic is illegal. + ILLEGAL_LITE_TOPIC = 40020; // Generic code indicates that the client request lacks valid authentication // credentials for the requested resource. @@ -395,6 +405,10 @@ enum Code { // Requests are throttled. TOO_MANY_REQUESTS = 42900; + // LiteTopic related quota exceeded + LITE_TOPIC_QUOTA_EXCEEDED = 42901; + LITE_SUBSCRIPTION_QUOTA_EXCEEDED = 42902; + // Generic code for the case that the server is unwilling to process the request because its header fields are too large. // The request may be resubmitted after reducing the size of the request header fields. REQUEST_HEADER_FIELDS_TOO_LARGE = 43100; @@ -554,6 +568,21 @@ message Subscription { // Long-polling timeout for `ReceiveMessageRequest`, which is essential for // push consumer. optional google.protobuf.Duration long_polling_timeout = 5; + + // Only lite push consumer + // client-side lite subscription quota limit + optional int32 lite_subscription_quota = 6; + + // Only lite push consumer + // Maximum length limit for lite topic + optional int32 max_lite_topic_size = 7; +} + +enum LiteSubscriptionAction { + PARTIAL_ADD = 0; + PARTIAL_REMOVE = 1; + COMPLETE_ADD = 2; + COMPLETE_REMOVE = 3; } message Metric { @@ -573,4 +602,19 @@ enum QueryOffsetPolicy { // Use this option if time-based seek is targeted. TIMESTAMP = 2; +} + +message OffsetOption { + oneof offset_type { + Policy policy = 1; + int64 offset = 2; + int64 tail_n = 3; + int64 timestamp = 4; + } + + enum Policy { + LAST = 0; + MIN = 1; + MAX = 2; + } } \ No newline at end of file diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto index 18db185..b58ac41 100644 --- a/apache/rocketmq/v2/service.proto +++ b/apache/rocketmq/v2/service.proto @@ -114,6 +114,7 @@ message ReceiveMessageResponse { message AckMessageEntry { string message_id = 1; string receipt_handle = 2; + optional string lite_topic = 3; } message AckMessageRequest { @@ -148,6 +149,7 @@ message ForwardMessageToDeadLetterQueueRequest { string message_id = 4; int32 delivery_attempt = 5; int32 max_delivery_attempts = 6; + optional string lite_topic = 7; } message ForwardMessageToDeadLetterQueueResponse { Status status = 1; } @@ -193,6 +195,10 @@ message RecoverOrphanedTransactionCommand { string transaction_id = 2; } +message NotifyUnsubscribeLiteCommand { + string lite_topic = 1; +} + message TelemetryCommand { optional Status status = 1; @@ -221,6 +227,9 @@ message TelemetryCommand { // Request client to reconnect server use the latest endpoints. ReconnectEndpointsCommand reconnect_endpoints_command = 8; + + // Request client to unsubscribe lite topic. + NotifyUnsubscribeLiteCommand notify_unsubscribe_lite_command = 9; } } @@ -243,6 +252,10 @@ message ChangeInvisibleDurationRequest { // For message tracing string message_id = 5; + + optional string lite_topic = 6; + // If true, server will not increment the retry times for this message + optional bool suspend = 7; } message ChangeInvisibleDurationResponse { @@ -311,6 +324,22 @@ message RecallMessageResponse { string message_id = 2; } +message SyncLiteSubscriptionRequest { + LiteSubscriptionAction action = 1; + // bindTopic for lite push consumer + Resource topic = 2; + // consumer group + Resource group = 3; + // lite subscription set of lite topics + repeated string lite_topic_set = 4; + optional int64 version = 5; + optional OffsetOption offset_option = 6; +} + +message SyncLiteSubscriptionResponse { + Status status = 1; +} + // For all the RPCs in MessagingService, the following error handling policies // apply: // @@ -440,4 +469,8 @@ service MessagingService { // for normal message, not supported for now. rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) { } + + // Sync lite subscription info, lite push consumer only + rpc SyncLiteSubscription(SyncLiteSubscriptionRequest) returns (SyncLiteSubscriptionResponse) {} + } \ No newline at end of file