Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bc13713
fix: Add LITE and PRIORITY message types to match protocol definition
zhaohai666 Apr 2, 2026
a69b3f5
fix: Update proto submodule and nodejs proto definition
zhaohai666 Apr 2, 2026
ac0aeaf
fix: Sync nodejs proto definition with updated protocol
zhaohai666 Apr 2, 2026
15403e8
feat: Add priority message support for Node.js client
zhaohai666 Apr 2, 2026
8a77923
fix: Use optional chaining for setPriority to avoid TS error
zhaohai666 Apr 2, 2026
3220592
update proto
zhaohai666 Apr 2, 2026
09b19bc
update proto
zhaohai666 Apr 2, 2026
b76ec8e
add lite message
zhaohai666 Apr 2, 2026
b88e1d2
Change protos submodule URL to HTTPS for CI compatibility
zhaohai666 Apr 2, 2026
147d28e
rollback
zhaohai666 Apr 2, 2026
73ae07f
Complete mutual exclusion verification
zhaohai666 Apr 2, 2026
3106ea1
Support Lite Push
zhaohai666 Apr 3, 2026
c280674
update protos version
zhaohai666 Apr 3, 2026
1e5d7ce
add lite simple consumer
zhaohai666 Apr 7, 2026
01dc0c6
fix getClientType
zhaohai666 Apr 7, 2026
ddf4801
fix getClientType
zhaohai666 Apr 7, 2026
9362a98
update startup
zhaohai666 Apr 7, 2026
d396b22
update name
zhaohai666 Apr 7, 2026
21fcd90
fix producer getClientType
zhaohai666 Apr 7, 2026
20e0c91
fix producer getClientType
zhaohai666 Apr 7, 2026
19ce4f1
update proto
zhaohai666 Apr 8, 2026
7026f6f
All 12 review comments from reviewer zk-drizzle have been completed.
zhaohai666 Apr 10, 2026
973539f
remove log
zhaohai666 Apr 10, 2026
80ddc2c
optimize simpleConsumer
zhaohai666 Apr 10, 2026
e747e22
remove delay
zhaohai666 Apr 10, 2026
4184879
update example
zhaohai666 Apr 13, 2026
bca2334
update example
zhaohai666 Apr 13, 2026
05c2f53
Securely handle concurrent refresh requests and exceptional scenarios
zhaohai666 Apr 13, 2026
a8b971c
update producer delay
zhaohai666 Apr 13, 2026
4e70cdc
fix lite push
zhaohai666 Apr 13, 2026
d12e850
fix equals hashcode
zhaohai666 Apr 14, 2026
885595f
Optimize hashCodeOfString
zhaohai666 Apr 14, 2026
213cc5c
Optimize log
zhaohai666 Apr 14, 2026
a0d17c5
Optimization of timer cleaning sequence
zhaohai666 Apr 14, 2026
e4f4e60
TelemeterySession releases logic optimization
zhaohai666 Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions nodejs/examples/LiteProducerExample.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Producer, LiteTopicQuotaExceededException } from '../src';
import { endpoints, namespace, sessionCredentials, liteTopicConfig } from './ProducerSingleton';

async function main() {
console.log('=== Lite Producer Example ===\n');

// Create producer
const producer = new Producer({
endpoints,
namespace,
sessionCredentials,
maxAttempts: 3,
});

try {
// Start producer
await producer.startup();
console.log('Producer started successfully\n');

// Define message body
const body = Buffer.from('This is a lite message for Apache RocketMQ');

// Send Lite Topic messages
for (let i = 1; i <= 5; i++) {
const liteTopicName = `lite-topic-${i}`;
const message = {
topic: liteTopicConfig.parentTopic,
keys: [ `key-${i}` ],
body,
liteTopic: liteTopicName, // Set lite topic
};

try {
const sendReceipt = await producer.send(message);
console.log(`✓ Message ${i} sent successfully`);
console.log(` - Topic: ${liteTopicConfig.parentTopic}`);
console.log(` - Lite Topic: ${liteTopicName}`);
console.log(` - Message ID: ${sendReceipt.messageId}\n`);
} catch (error) {
if (error instanceof LiteTopicQuotaExceededException) {
// Lite topic quota exceeded.
// Evaluate and increase the lite topic resource limit.
console.error(`✗ Lite topic quota exceeded for ${liteTopicName}:`, error.message);
} else {
console.error(`✗ Failed to send message ${i}:`, error.message);
}
}

// Wait 1 second between sends
await new Promise(resolve => setTimeout(resolve, 1000));
}

console.log('All messages sent!\n');
} catch (error) {
console.error('Error:', error);
} finally {
// Shutdown producer
if (producer) {
await producer.shutdown();
console.log('Producer shutdown successfully');
}
}
}

main().catch(err => {
console.error('Program error:', err);
process.exit(1);
});
102 changes: 102 additions & 0 deletions nodejs/examples/LitePushConsumerExample.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* LitePushConsumer Example
*
* This example demonstrates how to use LitePushConsumer to consume messages
* from lite topics with reduced overhead.
*
* Prerequisites:
* - RocketMQ server with lite topic support enabled
* - A parent topic created on the server
* - Consumer group registered on the server
*/

import {
LitePushConsumerBuilder,
ConsumeResult,
OffsetOption,
type MessageView,
} from '../src';
import { endpoints, namespace, consumerGroup, sessionCredentials } from './ProducerSingleton';

async function main() {
console.log('========== LitePushConsumer Example ==========\n');

// Create and start LitePushConsumer using builder pattern
const consumer = await new LitePushConsumerBuilder()
.setClientConfiguration({
endpoints,
namespace,
sessionCredentials,
})
.setConsumerGroup(consumerGroup)
.bindTopic('your-parent-topic') // Replace with your actual parent topic
.setMessageListener({
async consume(messageView: MessageView) {
console.log('Received message:', {
messageId: messageView.messageId,
topic: messageView.topic,
liteTopic: messageView.liteTopic,
tag: messageView.tag,
keys: messageView.keys,
body: messageView.body.toString('utf-8'),
});
return ConsumeResult.SUCCESS;
},
})
.setMaxCacheMessageCount(1024)
.setMaxCacheMessageSizeInBytes(64 * 1024 * 1024) // 64MB
.setConsumptionThreadCount(20)
.build();

console.log('✓ Consumer started successfully\n');

try {
// Subscribe to lite topics with different offset strategies
console.log('Subscribing to lite topics...\n');

await consumer.subscribeLite('lite-topic-1');
console.log('✓ Subscribed to lite-topic-1 (from last offset)');

await consumer.subscribeLite('lite-topic-2', OffsetOption.MIN_OFFSET);
console.log('✓ Subscribed to lite-topic-2 (from minimum offset)');

await consumer.subscribeLite('lite-topic-3', OffsetOption.ofTailN(100));
console.log('✓ Subscribed to lite-topic-3 (last 100 messages)\n');

// Keep running to receive messages
console.log('Consumer is running. Press Ctrl+C to exit...\n');
// eslint-disable-next-line @typescript-eslint/no-empty-function
await new Promise(() => {});
} catch (error) {
console.error('Error during consumption:', error);
throw error;
} finally {
// Close consumer to release resources
console.log('\nShutting down consumer...');
await consumer.close();
console.log('✓ Consumer closed successfully');
}
}

// Run example
main().catch(error => {
console.error('Fatal error:', error);
process.exit(1);
});
42 changes: 5 additions & 37 deletions nodejs/examples/ProducerDelayMessageExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import { topics, endpoints, sessionCredentials, namespace } from './ProducerSing
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
maxAttempts: 3,
});
await producer.startup();

try {
// 发送延迟消息
// Send delay message
const deliveryTimestamp = new Date(Date.now() + 5000); // Deliver after 5 seconds
const receipt = await producer.send({
topic: topics.delay,
tag: 'rocketmq-delay',
delay: 5000,
deliveryTimestamp,
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world ',
now: Date(),
Expand All @@ -41,41 +42,8 @@ import { topics, endpoints, sessionCredentials, namespace } from './ProducerSing

console.log('✅ Message sent successfully');
console.log(' - Message ID:', receipt.messageId);
console.log(' - Recall Handle:', receipt.recallHandle);
console.log(' - Delivery Timestamp:', deliveryTimestamp.toISOString());
console.log(' - Offset:', receipt.offset);

// 检查 recallHandle 是否存在
if (!receipt.recallHandle || receipt.recallHandle.trim() === '') {
console.warn('\n⚠️ Warning: Recall handle is empty');
console.log('This might be because:');
console.log('1. The topic is not configured as DELAY type');
console.log('2. Broker does not support mixed message type');
console.log('3. The message was not recognized as a delay message');

// 尝试检查 Topic 配置
console.log('\n💡 Suggestion: Check if the topic "time-topic" has message.type=DELAY attribute');
} else {
console.log('\n🔄 Attempting to recall message...');

try {
const recallReceipt = await producer.recallMessage(topics.delay, receipt.recallHandle);
console.log('✅ Message recalled successfully!');
console.log(' - Recalled Message ID:', recallReceipt.messageId);
} catch (recallError) {
console.error('❌ Failed to recall message:');
console.error(' Error:', (recallError as Error).message);
console.error(' Status Code:', (recallError as any).code);

// 提供更多调试信息
if ((recallError as Error).message.includes('recall handle is invalid')) {
console.log('\n📝 Possible reasons:');
console.log(' 1. The recall handle format is incorrect');
console.log(' 2. The message has already been delivered/consumed');
console.log(' 3. The recall time window has expired');
console.log(' 4. Broker configuration issue (enableMixedMessageType=false)');
}
}
}
} catch (error) {
console.error('❌ Error sending message:', error);
} finally {
Expand Down
34 changes: 9 additions & 25 deletions nodejs/examples/ProducerFifoMessageExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

/**
* Producer with FIFO Message Example
*
*
* This example demonstrates how to send FIFO (First-In-First-Out) messages
* that will be consumed in order by PushConsumer with FIFO support.
*
*
* Key points for FIFO messages:
* - Messages with the same messageGroup are stored and delivered in order
* - Different messageGroups can be consumed concurrently
Expand All @@ -46,7 +46,7 @@ async function main() {

// Send FIFO messages with different message groups
// Messages within the same group will be consumed in strict order

// Group 1: Order processing sequence
console.log('Sending FIFO messages for Group 1 (Order-001)...');
await sendFifoMessages(producer, 'Order-001', [
Expand All @@ -72,22 +72,6 @@ async function main() {
{ type: 'TRANSFER', accountId: 'Alice', amount: 100, balance: 700 },
]);

// Send some messages without group (will be consumed concurrently)
console.log('\nSending non-FIFO messages (no messageGroup)...');
for (let i = 0; i < 3; i++) {
const receipt = await producer.send({
topic: topics.fifo,
tag: 'non-fifo',
body: Buffer.from(JSON.stringify({
type: 'NON_FIFO',
index: i,
timestamp: Date.now(),
note: 'This message has no messageGroup, will be consumed concurrently',
})),
});
console.log(`Non-FIFO message ${i} sent:`, receipt.messageId);
}

console.log('\n✓ All FIFO messages sent successfully!');
console.log('\nNote: Start PushConsumerFifoMessageExample to consume these messages.');
console.log('Messages with the same messageGroup will be consumed in strict order.\n');
Expand All @@ -104,13 +88,13 @@ async function main() {
async function sendFifoMessages(
producer: Producer,
messageGroup: string,
messages: Array<Record<string, any>>
messages: Array<Record<string, any>>,
) {
console.log(`\n Sending ${messages.length} messages to group "${messageGroup}"...`);

for (let i = 0; i < messages.length; i++) {
const messageData = messages[i];

const receipt = await producer.send({
topic: topics.fifo,
tag: 'fifo-message',
Expand All @@ -122,7 +106,7 @@ async function sendFifoMessages(
})),
messageGroup, // Required for FIFO ordering
});

console.log(` [${i + 1}/${messages.length}] Sent:`, {
messageId: receipt.messageId,
sequence: i + 1,
Expand All @@ -132,9 +116,9 @@ async function sendFifoMessages(
// Small delay between messages to ensure clear sequencing
await new Promise(resolve => setTimeout(resolve, 10));
}

console.log(` ✓ Completed sending ${messages.length} messages for group "${messageGroup}"`);
}

// Run example
main().catch(console.error);
main().catch(console.error);
2 changes: 1 addition & 1 deletion nodejs/examples/ProducerNormalMessageExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { topics, endpoints, sessionCredentials, namespace } from './ProducerSing
endpoints,
namespace,
sessionCredentials,
maxAttempts: 2,
maxAttempts: 3,
});
await producer.startup();

Expand Down
Loading
Loading