From 2915c8fa5913e5dc446308378c780125d5c3b6af Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 17:42:18 +0900 Subject: [PATCH 01/21] feat(cpp): add get_streams FFI function and Stream shared struct Signed-off-by: shin --- foreign/cpp/Cargo.toml | 1 + foreign/cpp/src/client.rs | 11 +++++++++++ foreign/cpp/src/lib.rs | 10 ++++++++++ foreign/cpp/src/stream.rs | 14 ++++++++++++++ 4 files changed, 36 insertions(+) diff --git a/foreign/cpp/Cargo.toml b/foreign/cpp/Cargo.toml index 477cb0b74d..0b34273974 100644 --- a/foreign/cpp/Cargo.toml +++ b/foreign/cpp/Cargo.toml @@ -27,6 +27,7 @@ ignored = ["cxx-build"] crate-type = ["staticlib"] [dependencies] +bytes = "1.11.1" cxx = "1.0.194" iggy = { path = "../../core/sdk" } iggy_common = { path = "../../core/common" } diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 341e0c1a05..446603fd7c 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -73,6 +73,17 @@ impl Client { }) } + pub fn get_streams(&self) -> Result, String> { + RUNTIME.block_on(async { + let streams = self + .inner + .get_streams() + .await + .map_err(|error| format!("Could not get streams: {error}"))?; + Ok(streams.into_iter().map(ffi::Stream::from).collect()) + }) + } + pub fn create_stream(&self, stream_name: String) -> Result<(), String> { RUNTIME.block_on(async { self.inner diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 4d47a5b946..34ee9eb7e4 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -51,6 +51,15 @@ mod ffi { partitions_count: u32, } + struct Stream { + id: u32, + created_at: u64, + name: String, + size_bytes: u64, + messages_count: u64, + topics_count: u32, + } + struct StreamDetails { id: u32, created_at: u64, @@ -83,6 +92,7 @@ mod ffi { fn login_user(self: &Client, username: String, password: String) -> Result<()>; fn connect(self: &Client) -> Result<()>; fn create_stream(self: &Client, stream_name: String) -> Result<()>; + fn get_streams(self: &Client) -> Result>; fn get_stream(self: &Client, stream_id: Identifier) -> Result; fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>; // fn purge_stream(&self, stream_id: Identifier) -> Result<()>; diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs index 5c1f6e9d7d..b6e0ae22bf 100644 --- a/foreign/cpp/src/stream.rs +++ b/foreign/cpp/src/stream.rs @@ -16,8 +16,22 @@ // under the License. use crate::ffi; +use iggy::prelude::Stream as RustStream; use iggy::prelude::StreamDetails as RustStreamDetails; +impl From for ffi::Stream { + fn from(s: RustStream) -> Self { + ffi::Stream { + id: s.id, + created_at: s.created_at.as_micros(), + name: s.name, + size_bytes: s.size.as_bytes_u64(), + messages_count: s.messages_count, + topics_count: s.topics_count, + } + } +} + impl From for ffi::StreamDetails { fn from(stream: RustStreamDetails) -> Self { ffi::StreamDetails { From 761e41bb52b4ea2722836e2f88522a2d67c3522a Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 17:44:43 +0900 Subject: [PATCH 02/21] feat(cpp): add send_messages FFI function and IggyMessageToSend shared struct Signed-off-by: shin --- foreign/cpp/src/client.rs | 64 ++++++++++++++++++++++++++++++++++++++- foreign/cpp/src/lib.rs | 16 ++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 446603fd7c..a8a4103451 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -16,11 +16,13 @@ // under the License. use crate::{RUNTIME, ffi}; +use bytes::Bytes; use iggy::prelude::{ Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, - MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient, TopicClient, UserClient, + IggyMessage, MaxTopicSize as RustMaxTopicSize, MessageClient, Partitioning, PartitionClient, + StreamClient, TopicClient, UserClient, }; use std::str::FromStr; use std::sync::Arc; @@ -138,6 +140,66 @@ impl Client { // }) // } + #[allow(clippy::too_many_arguments)] + pub fn send_messages( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partitioning_kind: String, + partitioning_value: Vec, + messages: Vec, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not send messages: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not send messages: {error}"))?; + + let partitioning = match partitioning_kind.as_str() { + "balanced" => Partitioning::balanced(), + "partition_id" => { + if partitioning_value.len() < 4 { + return Err("partition_id requires 4 bytes".to_string()); + } + let id = u32::from_le_bytes( + partitioning_value[..4] + .try_into() + .map_err(|_| "Invalid partition_id value".to_string())?, + ); + Partitioning::partition_id(id) + } + "messages_key" => Partitioning::messages_key(&partitioning_value) + .map_err(|error| format!("Invalid messages key: {error}"))?, + _ => return Err(format!("Invalid partitioning kind: {partitioning_kind}")), + }; + + let mut iggy_messages: Vec = messages + .into_iter() + .map(|m| { + let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128); + let payload = Bytes::from(m.payload); + let msg = if id > 0 { + IggyMessage::builder().id(id).payload(payload).build() + } else { + IggyMessage::builder().payload(payload).build() + }; + msg.map_err(|error| format!("Could not build message: {error}")) + }) + .collect::, _>>()?; + + RUNTIME.block_on(async { + self.inner + .send_messages( + &rust_stream_id, + &rust_topic_id, + &partitioning, + &mut iggy_messages, + ) + .await + .map_err(|error| format!("Could not send messages: {error}"))?; + Ok(()) + }) + } + #[allow(clippy::too_many_arguments)] pub fn create_topic( &self, diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 34ee9eb7e4..0a70539c99 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -60,6 +60,12 @@ mod ffi { topics_count: u32, } + struct IggyMessageToSend { + id_lo: u64, + id_hi: u64, + payload: Vec, + } + struct StreamDetails { id: u32, created_at: u64, @@ -140,6 +146,16 @@ mod ffi { group_id: Identifier, ) -> Result<()>; + #[allow(clippy::too_many_arguments)] + fn send_messages( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partitioning_kind: String, + partitioning_value: Vec, + messages: Vec, + ) -> Result<()>; + unsafe fn delete_connection(client: *mut Client) -> Result<()>; // Identifier functions From 1a31f58c59a0539bd0d9de7fc59093765635ce44 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 17:46:21 +0900 Subject: [PATCH 03/21] feat(cpp): add poll_messages FFI function with IggyMessagePolled and PolledMessages Signed-off-by: shin --- foreign/cpp/build.rs | 1 + foreign/cpp/src/client.rs | 71 ++++++++++++++++++++++++++++++++++--- foreign/cpp/src/lib.rs | 32 +++++++++++++++++ foreign/cpp/src/messages.rs | 45 +++++++++++++++++++++++ 4 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 foreign/cpp/src/messages.rs diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs index 476a9d796c..d2cbf0094c 100644 --- a/foreign/cpp/build.rs +++ b/foreign/cpp/build.rs @@ -24,6 +24,7 @@ fn main() { println!("cargo:rerun-if-changed=src/consumer_group.rs"); println!("cargo:rerun-if-changed=src/identifier.rs"); println!("cargo:rerun-if-changed=src/lib.rs"); + println!("cargo:rerun-if-changed=src/messages.rs"); println!("cargo:rerun-if-changed=src/stream.rs"); println!("cargo:rerun-if-changed=src/topic.rs"); } diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index a8a4103451..bc2822ed7c 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -19,10 +19,11 @@ use crate::{RUNTIME, ffi}; use bytes::Bytes; use iggy::prelude::{ Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, - ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient, - IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, - IggyMessage, MaxTopicSize as RustMaxTopicSize, MessageClient, Partitioning, PartitionClient, - StreamClient, TopicClient, UserClient, + Consumer, ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, + IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, IggyError, + IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, + MessageClient, Partitioning, PartitionClient, PollingStrategy, StreamClient, TopicClient, + UserClient, }; use std::str::FromStr; use std::sync::Arc; @@ -200,6 +201,68 @@ impl Client { }) } + #[allow(clippy::too_many_arguments)] + pub fn poll_messages( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: ffi::Identifier, + strategy_kind: String, + strategy_value: u64, + count: u32, + auto_commit: bool, + ) -> Result { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not poll messages: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not poll messages: {error}"))?; + let rust_consumer_id = RustIdentifier::try_from(consumer_id) + .map_err(|error| format!("Could not poll messages: {error}"))?; + + let consumer = Consumer { + kind: match consumer_kind.as_str() { + "consumer" => ConsumerKind::Consumer, + "consumer_group" => ConsumerKind::ConsumerGroup, + _ => return Err(format!("Invalid consumer kind: {consumer_kind}")), + }, + id: rust_consumer_id, + }; + + let strategy = match strategy_kind.as_str() { + "offset" => PollingStrategy::offset(strategy_value), + "timestamp" => PollingStrategy::timestamp(IggyTimestamp::from(strategy_value)), + "first" => PollingStrategy::first(), + "last" => PollingStrategy::last(), + "next" => PollingStrategy::next(), + _ => return Err(format!("Invalid polling strategy: {strategy_kind}")), + }; + + let opt_partition = if partition_id == u32::MAX { + None + } else { + Some(partition_id) + }; + + RUNTIME.block_on(async { + let polled = self + .inner + .poll_messages( + &rust_stream_id, + &rust_topic_id, + opt_partition, + &consumer, + &strategy, + count, + auto_commit, + ) + .await + .map_err(|error| format!("Could not poll messages: {error}"))?; + Ok(ffi::PolledMessages::from(polled)) + }) + } + #[allow(clippy::too_many_arguments)] pub fn create_topic( &self, diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 0a70539c99..90a22b40f1 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -17,6 +17,7 @@ mod client; mod consumer_group; mod identifier; +mod messages; mod stream; mod topic; @@ -66,6 +67,23 @@ mod ffi { payload: Vec, } + struct IggyMessagePolled { + id_lo: u64, + id_hi: u64, + offset: u64, + timestamp: u64, + origin_timestamp: u64, + payload: Vec, + user_headers: Vec, + } + + struct PolledMessages { + partition_id: u32, + current_offset: u64, + count: u32, + messages: Vec, + } + struct StreamDetails { id: u32, created_at: u64, @@ -146,6 +164,20 @@ mod ffi { group_id: Identifier, ) -> Result<()>; + #[allow(clippy::too_many_arguments)] + fn poll_messages( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: Identifier, + strategy_kind: String, + strategy_value: u64, + count: u32, + auto_commit: bool, + ) -> Result; + #[allow(clippy::too_many_arguments)] fn send_messages( self: &Client, diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs new file mode 100644 index 0000000000..ef437966f3 --- /dev/null +++ b/foreign/cpp/src/messages.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as RustPolledMessages}; + +impl From for ffi::IggyMessagePolled { + fn from(m: RustIggyMessage) -> Self { + let id = m.header.id; + ffi::IggyMessagePolled { + id_lo: id as u64, + id_hi: (id >> 64) as u64, + offset: m.header.offset, + timestamp: m.header.timestamp, + origin_timestamp: m.header.origin_timestamp, + payload: m.payload.to_vec(), + user_headers: m.user_headers.map(|h| h.to_vec()).unwrap_or_default(), + } + } +} + +impl From for ffi::PolledMessages { + fn from(p: RustPolledMessages) -> Self { + ffi::PolledMessages { + partition_id: p.partition_id, + current_offset: p.current_offset, + count: p.count, + messages: p.messages.into_iter().map(Into::into).collect(), + } + } +} From 730700cd6d59b532489030928cba3f964e0b5134 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 18:04:07 +0900 Subject: [PATCH 04/21] style(cpp): apply rustfmt formatting Signed-off-by: shin --- foreign/cpp/src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index bc2822ed7c..0e88df328f 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -18,8 +18,8 @@ use crate::{RUNTIME, ffi}; use bytes::Bytes; use iggy::prelude::{ - Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, - Consumer, ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, + Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer, + ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, Partitioning, PartitionClient, PollingStrategy, StreamClient, TopicClient, From fca1ac07f50b604d4869fa74ff123d6473000d30 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 18:05:04 +0900 Subject: [PATCH 05/21] test(cpp): add e2e tests for get_streams, send_messages, and poll_messages Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 195 ++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 foreign/cpp/tests/message/low_level_e2e.cpp diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp new file mode 100644 index 0000000000..9103721f48 --- /dev/null +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include + +#include "lib.rs.h" +#include "tests/common/test_helpers.hpp" + +static rust::Vec to_payload(const std::string &s) { + rust::Vec v; + for (const char c : s) { + v.push_back(static_cast(c)); + } + return v; +} + +static rust::Vec partition_id_bytes(std::uint32_t id) { + rust::Vec v; + v.push_back(static_cast(id & 0xFF)); + v.push_back(static_cast((id >> 8) & 0xFF)); + v.push_back(static_cast((id >> 16) & 0xFF)); + v.push_back(static_cast((id >> 24) & 0xFF)); + return v; +} + +TEST(LowLevelE2E_Message, GetStreamsReturnsEmptyInitially) { + RecordProperty("description", "Verifies get_streams returns empty vector on fresh server."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + auto streams = client->get_streams(); + ASSERT_EQ(streams.size(), 0); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, GetStreamsReturnsStreamAfterCreation) { + RecordProperty("description", "Verifies created stream appears in get_streams result."); + const std::string stream_name = "cpp-msg-get-streams"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto streams = client->get_streams(); + ASSERT_GE(streams.size(), 1); + + bool found = false; + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + found = true; + ASSERT_GT(s.id, 0u); + break; + } + } + ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in get_streams result"; + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { + RecordProperty("description", + "Sends 10 messages and polls them back, verifying count, offsets, and payloads."); + const std::string stream_name = "cpp-msg-roundtrip"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, + "never_expire", 0, "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::IggyMessageToSend msg; + msg.id_lo = static_cast(i + 1); + msg.id_hi = 0; + msg.payload = to_payload("test message " + std::to_string(i)); + messages.push_back(std::move(msg)); + } + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), + make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages))); + + auto polled = + client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 10u); + ASSERT_EQ(polled.messages.size(), 10u); + for (std::uint32_t i = 0; i < 10; i++) { + ASSERT_EQ(polled.messages[i].offset, static_cast(i)); + std::string expected = "test message " + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { + RecordProperty("description", "Verifies that polled message IDs match the sent IDs."); + const std::string stream_name = "cpp-msg-verify-ids"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, + "never_expire", 0, "server_default"); + + rust::Vec messages; + iggy::ffi::IggyMessageToSend msg; + msg.id_lo = 42; + msg.id_hi = 0; + msg.payload = to_payload("id-test-message"); + messages.push_back(std::move(msg)); + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages)); + + auto polled = + client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.messages.size(), 1u); + ASSERT_EQ(polled.messages[0].id_lo, 42u); + ASSERT_EQ(polled.messages[0].id_hi, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) { + RecordProperty("description", "Verifies polling from an empty partition returns zero messages."); + const std::string stream_name = "cpp-msg-empty-poll"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, + "never_expire", 0, "server_default"); + + auto polled = + client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 0u); + ASSERT_EQ(polled.messages.size(), 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { + RecordProperty("description", "Verifies send_messages throws when not authenticated."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + ASSERT_NO_THROW(client->connect()); + + rust::Vec messages; + iggy::ffi::IggyMessageToSend msg; + msg.id_lo = 1; + msg.id_hi = 0; + msg.payload = to_payload("should-fail"); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(1), make_numeric_identifier(1), + "partition_id", partition_id_bytes(0), std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} From 3388f8457791267a8ab75480b59fba6ac961e798 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 18:30:36 +0900 Subject: [PATCH 06/21] fix(cpp): address code review findings for FFI messaging functions - fix error message format consistency in send_messages partitioning - extract to_payload and partition_id_bytes helpers to test_helpers.hpp - fix GetStreamsReturnsEmpty test to clean up before asserting Signed-off-by: shin --- foreign/cpp/src/client.rs | 17 +++++++------- foreign/cpp/tests/common/test_helpers.hpp | 17 ++++++++++++++ foreign/cpp/tests/message/low_level_e2e.cpp | 26 ++++++--------------- 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 0e88df328f..7804e974d6 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -159,17 +159,18 @@ impl Client { "balanced" => Partitioning::balanced(), "partition_id" => { if partitioning_value.len() < 4 { - return Err("partition_id requires 4 bytes".to_string()); + return Err( + "Could not send messages: partition_id requires 4 bytes".to_string() + ); } - let id = u32::from_le_bytes( - partitioning_value[..4] - .try_into() - .map_err(|_| "Invalid partition_id value".to_string())?, - ); + let id = u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| { + "Could not send messages: invalid partition_id value".to_string() + })?); Partitioning::partition_id(id) } - "messages_key" => Partitioning::messages_key(&partitioning_value) - .map_err(|error| format!("Invalid messages key: {error}"))?, + "messages_key" => Partitioning::messages_key(&partitioning_value).map_err(|error| { + format!("Could not send messages: invalid messages key: {error}") + })?, _ => return Err(format!("Invalid partitioning kind: {partitioning_kind}")), }; diff --git a/foreign/cpp/tests/common/test_helpers.hpp b/foreign/cpp/tests/common/test_helpers.hpp index 5457c09d7c..15851f08cc 100644 --- a/foreign/cpp/tests/common/test_helpers.hpp +++ b/foreign/cpp/tests/common/test_helpers.hpp @@ -42,3 +42,20 @@ inline iggy::ffi::Client *login_to_server() { client->login_user("iggy", "iggy"); return client; } + +inline rust::Vec to_payload(const std::string &s) { + rust::Vec v; + for (const char c : s) { + v.push_back(static_cast(c)); + } + return v; +} + +inline rust::Vec partition_id_bytes(std::uint32_t id) { + rust::Vec v; + v.push_back(static_cast(id & 0xFF)); + v.push_back(static_cast((id >> 8) & 0xFF)); + v.push_back(static_cast((id >> 16) & 0xFF)); + v.push_back(static_cast((id >> 24) & 0xFF)); + return v; +} diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 9103721f48..991fde1831 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -24,29 +24,17 @@ #include "lib.rs.h" #include "tests/common/test_helpers.hpp" -static rust::Vec to_payload(const std::string &s) { - rust::Vec v; - for (const char c : s) { - v.push_back(static_cast(c)); - } - return v; -} - -static rust::Vec partition_id_bytes(std::uint32_t id) { - rust::Vec v; - v.push_back(static_cast(id & 0xFF)); - v.push_back(static_cast((id >> 8) & 0xFF)); - v.push_back(static_cast((id >> 16) & 0xFF)); - v.push_back(static_cast((id >> 24) & 0xFF)); - return v; -} - -TEST(LowLevelE2E_Message, GetStreamsReturnsEmptyInitially) { - RecordProperty("description", "Verifies get_streams returns empty vector on fresh server."); +TEST(LowLevelE2E_Message, GetStreamsReturnsEmptyAfterCleanup) { + RecordProperty("description", "Verifies get_streams returns empty vector after cleaning up all streams."); iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); auto streams = client->get_streams(); + for (const auto &s : streams) { + client->delete_stream(make_numeric_identifier(s.id)); + } + + streams = client->get_streams(); ASSERT_EQ(streams.size(), 0); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); From cc6ffc0c4966bfddcd5024f2bd916db76854dda7 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 19:59:59 +0900 Subject: [PATCH 07/21] fix(cpp): apply clang-format and remove fragile stream id assertion Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 44 +++++++++------------ 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 991fde1831..cebbbd9007 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -54,7 +54,6 @@ TEST(LowLevelE2E_Message, GetStreamsReturnsStreamAfterCreation) { for (const auto &s : streams) { if (std::string(s.name) == stream_name) { found = true; - ASSERT_GT(s.id, 0u); break; } } @@ -65,16 +64,15 @@ TEST(LowLevelE2E_Message, GetStreamsReturnsStreamAfterCreation) { } TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { - RecordProperty("description", - "Sends 10 messages and polls them back, verifying count, offsets, and payloads."); + RecordProperty("description", "Sends 10 messages and polls them back, verifying count, offsets, and payloads."); const std::string stream_name = "cpp-msg-roundtrip"; iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); client->create_stream(stream_name); auto stream = client->get_stream(make_string_identifier(stream_name)); - client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, - "never_expire", 0, "server_default"); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); rust::Vec messages; for (std::uint32_t i = 0; i < 10; i++) { @@ -85,13 +83,11 @@ TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { messages.push_back(std::move(msg)); } - ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), - make_numeric_identifier(0), "partition_id", - partition_id_bytes(0), std::move(messages))); + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); - auto polled = - client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, - "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); ASSERT_EQ(polled.count, 10u); ASSERT_EQ(polled.messages.size(), 10u); @@ -114,8 +110,8 @@ TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { client->create_stream(stream_name); auto stream = client->get_stream(make_string_identifier(stream_name)); - client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, - "never_expire", 0, "server_default"); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); rust::Vec messages; iggy::ffi::IggyMessageToSend msg; @@ -124,12 +120,11 @@ TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { msg.payload = to_payload("id-test-message"); messages.push_back(std::move(msg)); - client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), - "partition_id", partition_id_bytes(0), std::move(messages)); + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); - auto polled = - client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, - "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); ASSERT_EQ(polled.messages.size(), 1u); ASSERT_EQ(polled.messages[0].id_lo, 42u); @@ -147,12 +142,11 @@ TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) { client->create_stream(stream_name); auto stream = client->get_stream(make_string_identifier(stream_name)); - client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, - "never_expire", 0, "server_default"); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); - auto polled = - client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, - "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); ASSERT_EQ(polled.count, 0u); ASSERT_EQ(polled.messages.size(), 0u); @@ -175,8 +169,8 @@ TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { msg.payload = to_payload("should-fail"); messages.push_back(std::move(msg)); - ASSERT_THROW(client->send_messages(make_numeric_identifier(1), make_numeric_identifier(1), - "partition_id", partition_id_bytes(0), std::move(messages)), + ASSERT_THROW(client->send_messages(make_numeric_identifier(1), make_numeric_identifier(1), "partition_id", + partition_id_bytes(0), std::move(messages)), std::exception); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); From b1b347d3d0213e2e0e5546fdd9f540d83ed879d6 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 20:37:00 +0900 Subject: [PATCH 08/21] refactor(cpp): address review feedback - unify Message struct and improve naming - unify IggyMessageToSend/IggyMessagePolled into single Message struct - add new_message(payload) factory function exported to C++ - add TryFrom for IggyMessage conversion - replace map(Into::into) with explicit map(ffi::X::from) - rename strategy_kind/value to polling_strategy_kind/value Signed-off-by: shin --- foreign/cpp/src/client.rs | 42 ++++++++++--------- foreign/cpp/src/lib.rs | 23 ++++++----- foreign/cpp/src/messages.rs | 46 +++++++++++++++++++-- foreign/cpp/tests/message/low_level_e2e.cpp | 19 +++------ 4 files changed, 83 insertions(+), 47 deletions(-) diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 7804e974d6..aec0c23080 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -16,7 +16,6 @@ // under the License. use crate::{RUNTIME, ffi}; -use bytes::Bytes; use iggy::prelude::{ Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer, ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, @@ -148,7 +147,7 @@ impl Client { topic_id: ffi::Identifier, partitioning_kind: String, partitioning_value: Vec, - messages: Vec, + messages: Vec, ) -> Result<(), String> { let rust_stream_id = RustIdentifier::try_from(stream_id) .map_err(|error| format!("Could not send messages: {error}"))?; @@ -171,21 +170,16 @@ impl Client { "messages_key" => Partitioning::messages_key(&partitioning_value).map_err(|error| { format!("Could not send messages: invalid messages key: {error}") })?, - _ => return Err(format!("Invalid partitioning kind: {partitioning_kind}")), + _ => { + return Err(format!( + "Could not send messages: invalid partitioning kind: {partitioning_kind}" + )); + } }; let mut iggy_messages: Vec = messages .into_iter() - .map(|m| { - let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128); - let payload = Bytes::from(m.payload); - let msg = if id > 0 { - IggyMessage::builder().id(id).payload(payload).build() - } else { - IggyMessage::builder().payload(payload).build() - }; - msg.map_err(|error| format!("Could not build message: {error}")) - }) + .map(IggyMessage::try_from) .collect::, _>>()?; RUNTIME.block_on(async { @@ -210,8 +204,8 @@ impl Client { partition_id: u32, consumer_kind: String, consumer_id: ffi::Identifier, - strategy_kind: String, - strategy_value: u64, + polling_strategy_kind: String, + polling_strategy_value: u64, count: u32, auto_commit: bool, ) -> Result { @@ -226,18 +220,26 @@ impl Client { kind: match consumer_kind.as_str() { "consumer" => ConsumerKind::Consumer, "consumer_group" => ConsumerKind::ConsumerGroup, - _ => return Err(format!("Invalid consumer kind: {consumer_kind}")), + _ => { + return Err(format!( + "Could not poll messages: invalid consumer kind: {consumer_kind}" + )); + } }, id: rust_consumer_id, }; - let strategy = match strategy_kind.as_str() { - "offset" => PollingStrategy::offset(strategy_value), - "timestamp" => PollingStrategy::timestamp(IggyTimestamp::from(strategy_value)), + let strategy = match polling_strategy_kind.as_str() { + "offset" => PollingStrategy::offset(polling_strategy_value), + "timestamp" => PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)), "first" => PollingStrategy::first(), "last" => PollingStrategy::last(), "next" => PollingStrategy::next(), - _ => return Err(format!("Invalid polling strategy: {strategy_kind}")), + _ => { + return Err(format!( + "Could not poll messages: invalid polling strategy: {polling_strategy_kind}" + )); + } }; let opt_partition = if partition_id == u32::MAX { diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 90a22b40f1..d2600390cf 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -22,6 +22,7 @@ mod stream; mod topic; use client::{Client, delete_connection, new_connection}; +use messages::new_message; use std::sync::LazyLock; static RUNTIME: LazyLock = LazyLock::new(|| { @@ -61,18 +62,16 @@ mod ffi { topics_count: u32, } - struct IggyMessageToSend { - id_lo: u64, - id_hi: u64, - payload: Vec, - } - - struct IggyMessagePolled { + struct Message { + checksum: u64, id_lo: u64, id_hi: u64, offset: u64, timestamp: u64, origin_timestamp: u64, + user_headers_length: u32, + payload_length: u32, + reserved: u64, payload: Vec, user_headers: Vec, } @@ -81,7 +80,7 @@ mod ffi { partition_id: u32, current_offset: u64, count: u32, - messages: Vec, + messages: Vec, } struct StreamDetails { @@ -172,12 +171,14 @@ mod ffi { partition_id: u32, consumer_kind: String, consumer_id: Identifier, - strategy_kind: String, - strategy_value: u64, + polling_strategy_kind: String, + polling_strategy_value: u64, count: u32, auto_commit: bool, ) -> Result; + fn new_message(payload: Vec) -> Result; + #[allow(clippy::too_many_arguments)] fn send_messages( self: &Client, @@ -185,7 +186,7 @@ mod ffi { topic_id: Identifier, partitioning_kind: String, partitioning_value: Vec, - messages: Vec, + messages: Vec, ) -> Result<()>; unsafe fn delete_connection(client: *mut Client) -> Result<()>; diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs index ef437966f3..7b0c5b0261 100644 --- a/foreign/cpp/src/messages.rs +++ b/foreign/cpp/src/messages.rs @@ -16,30 +16,70 @@ // under the License. use crate::ffi; +use bytes::Bytes; use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as RustPolledMessages}; -impl From for ffi::IggyMessagePolled { +pub fn new_message(payload: Vec) -> Result { + if payload.is_empty() { + return Err("Could not create message: payload must not be empty".to_string()); + } + let payload_length = payload.len() as u32; + Ok(ffi::Message { + checksum: 0, + id_lo: 0, + id_hi: 0, + offset: 0, + timestamp: 0, + origin_timestamp: 0, + user_headers_length: 0, + payload_length, + reserved: 0, + payload, + user_headers: Vec::new(), + }) +} + +impl From for ffi::Message { fn from(m: RustIggyMessage) -> Self { let id = m.header.id; - ffi::IggyMessagePolled { + ffi::Message { + checksum: m.header.checksum, id_lo: id as u64, id_hi: (id >> 64) as u64, offset: m.header.offset, timestamp: m.header.timestamp, origin_timestamp: m.header.origin_timestamp, + user_headers_length: m.header.user_headers_length, + payload_length: m.header.payload_length, + reserved: m.header.reserved, payload: m.payload.to_vec(), user_headers: m.user_headers.map(|h| h.to_vec()).unwrap_or_default(), } } } +impl TryFrom for RustIggyMessage { + type Error = String; + + fn try_from(m: ffi::Message) -> Result { + let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128); + let payload = Bytes::from(m.payload); + let msg = if id > 0 { + RustIggyMessage::builder().id(id).payload(payload).build() + } else { + RustIggyMessage::builder().payload(payload).build() + }; + msg.map_err(|error| format!("Could not convert message: {error}")) + } +} + impl From for ffi::PolledMessages { fn from(p: RustPolledMessages) -> Self { ffi::PolledMessages { partition_id: p.partition_id, current_offset: p.current_offset, count: p.count, - messages: p.messages.into_iter().map(Into::into).collect(), + messages: p.messages.into_iter().map(ffi::Message::from).collect(), } } } diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index cebbbd9007..8576f82860 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -74,12 +74,9 @@ TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, "server_default"); - rust::Vec messages; + rust::Vec messages; for (std::uint32_t i = 0; i < 10; i++) { - iggy::ffi::IggyMessageToSend msg; - msg.id_lo = static_cast(i + 1); - msg.id_hi = 0; - msg.payload = to_payload("test message " + std::to_string(i)); + auto msg = iggy::ffi::new_message(to_payload("test message " + std::to_string(i))); messages.push_back(std::move(msg)); } @@ -113,11 +110,10 @@ TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, "server_default"); - rust::Vec messages; - iggy::ffi::IggyMessageToSend msg; + rust::Vec messages; + auto msg = iggy::ffi::new_message(to_payload("id-test-message")); msg.id_lo = 42; msg.id_hi = 0; - msg.payload = to_payload("id-test-message"); messages.push_back(std::move(msg)); client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", @@ -162,11 +158,8 @@ TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { ASSERT_NE(client, nullptr); ASSERT_NO_THROW(client->connect()); - rust::Vec messages; - iggy::ffi::IggyMessageToSend msg; - msg.id_lo = 1; - msg.id_hi = 0; - msg.payload = to_payload("should-fail"); + rust::Vec messages; + auto msg = iggy::ffi::new_message(to_payload("should-fail")); messages.push_back(std::move(msg)); ASSERT_THROW(client->send_messages(make_numeric_identifier(1), make_numeric_identifier(1), "partition_id", From 61dd065017b12e823bfb866e6deead00afd59a9b Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 29 Mar 2026 20:40:00 +0900 Subject: [PATCH 09/21] style(cpp): fix clang-format alignment in message test Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 8576f82860..06c12f93bc 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -111,9 +111,9 @@ TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { "server_default"); rust::Vec messages; - auto msg = iggy::ffi::new_message(to_payload("id-test-message")); - msg.id_lo = 42; - msg.id_hi = 0; + auto msg = iggy::ffi::new_message(to_payload("id-test-message")); + msg.id_lo = 42; + msg.id_hi = 0; messages.push_back(std::move(msg)); client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", From 100d846b17d8ce94da34eee321d3f32d3c4884ee Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 31 Mar 2026 00:46:08 +0900 Subject: [PATCH 10/21] refactor(cpp): change new_message from free function to impl Message method Signed-off-by: shin --- foreign/cpp/src/lib.rs | 3 +- foreign/cpp/src/messages.rs | 33 ++++++++++----------- foreign/cpp/tests/message/low_level_e2e.cpp | 9 ++++-- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index d2600390cf..434801f504 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -22,7 +22,6 @@ mod stream; mod topic; use client::{Client, delete_connection, new_connection}; -use messages::new_message; use std::sync::LazyLock; static RUNTIME: LazyLock = LazyLock::new(|| { @@ -177,7 +176,7 @@ mod ffi { auto_commit: bool, ) -> Result; - fn new_message(payload: Vec) -> Result; + fn new_message(self: &mut Message, payload: Vec); #[allow(clippy::too_many_arguments)] fn send_messages( diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs index 7b0c5b0261..6b513a6c15 100644 --- a/foreign/cpp/src/messages.rs +++ b/foreign/cpp/src/messages.rs @@ -19,24 +19,23 @@ use crate::ffi; use bytes::Bytes; use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as RustPolledMessages}; -pub fn new_message(payload: Vec) -> Result { - if payload.is_empty() { - return Err("Could not create message: payload must not be empty".to_string()); +impl ffi::Message { + pub fn new_message(&mut self, payload: Vec) { + let payload_length = payload.len() as u32; + *self = Self { + checksum: 0, + id_lo: 0, + id_hi: 0, + offset: 0, + timestamp: 0, + origin_timestamp: 0, + user_headers_length: 0, + payload_length, + reserved: 0, + payload, + user_headers: Vec::new(), + }; } - let payload_length = payload.len() as u32; - Ok(ffi::Message { - checksum: 0, - id_lo: 0, - id_hi: 0, - offset: 0, - timestamp: 0, - origin_timestamp: 0, - user_headers_length: 0, - payload_length, - reserved: 0, - payload, - user_headers: Vec::new(), - }) } impl From for ffi::Message { diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 06c12f93bc..687b399aab 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -76,7 +76,8 @@ TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { rust::Vec messages; for (std::uint32_t i = 0; i < 10; i++) { - auto msg = iggy::ffi::new_message(to_payload("test message " + std::to_string(i))); + iggy::ffi::Message msg; + msg.new_message(to_payload("test message " + std::to_string(i))); messages.push_back(std::move(msg)); } @@ -111,7 +112,8 @@ TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { "server_default"); rust::Vec messages; - auto msg = iggy::ffi::new_message(to_payload("id-test-message")); + iggy::ffi::Message msg; + msg.new_message(to_payload("id-test-message")); msg.id_lo = 42; msg.id_hi = 0; messages.push_back(std::move(msg)); @@ -159,7 +161,8 @@ TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { ASSERT_NO_THROW(client->connect()); rust::Vec messages; - auto msg = iggy::ffi::new_message(to_payload("should-fail")); + iggy::ffi::Message msg; + msg.new_message(to_payload("should-fail")); messages.push_back(std::move(msg)); ASSERT_THROW(client->send_messages(make_numeric_identifier(1), make_numeric_identifier(1), "partition_id", From ae52d518287f95825f6b76ce5806b89a1c1223ea Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 31 Mar 2026 00:46:57 +0900 Subject: [PATCH 11/21] test(cpp): move get_streams tests from message/ to stream/ directory Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 40 --------------------- foreign/cpp/tests/stream/low_level_e2e.cpp | 40 +++++++++++++++++++++ 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 687b399aab..485151062c 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -17,52 +17,12 @@ #include #include -#include #include #include "lib.rs.h" #include "tests/common/test_helpers.hpp" -TEST(LowLevelE2E_Message, GetStreamsReturnsEmptyAfterCleanup) { - RecordProperty("description", "Verifies get_streams returns empty vector after cleaning up all streams."); - iggy::ffi::Client *client = login_to_server(); - ASSERT_NE(client, nullptr); - - auto streams = client->get_streams(); - for (const auto &s : streams) { - client->delete_stream(make_numeric_identifier(s.id)); - } - - streams = client->get_streams(); - ASSERT_EQ(streams.size(), 0); - - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); -} - -TEST(LowLevelE2E_Message, GetStreamsReturnsStreamAfterCreation) { - RecordProperty("description", "Verifies created stream appears in get_streams result."); - const std::string stream_name = "cpp-msg-get-streams"; - iggy::ffi::Client *client = login_to_server(); - ASSERT_NE(client, nullptr); - - client->create_stream(stream_name); - auto streams = client->get_streams(); - ASSERT_GE(streams.size(), 1); - - bool found = false; - for (const auto &s : streams) { - if (std::string(s.name) == stream_name) { - found = true; - break; - } - } - ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in get_streams result"; - - client->delete_stream(make_string_identifier(stream_name)); - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); -} - TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { RecordProperty("description", "Sends 10 messages and polls them back, verifying count, offsets, and payloads."); const std::string stream_name = "cpp-msg-roundtrip"; diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp b/foreign/cpp/tests/stream/low_level_e2e.cpp index 43d0496dee..a678fabc6f 100644 --- a/foreign/cpp/tests/stream/low_level_e2e.cpp +++ b/foreign/cpp/tests/stream/low_level_e2e.cpp @@ -19,6 +19,7 @@ #include #include +#include #include @@ -293,3 +294,42 @@ TEST(LowLevelE2E_Stream, GetStreamByNumericIdentifierReturnsStreamDetails) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; } + +TEST(LowLevelE2E_Stream, GetStreamsReturnsEmptyAfterCleanup) { + RecordProperty("description", "Verifies get_streams returns empty vector after cleaning up all streams."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + auto streams = client->get_streams(); + for (const auto &s : streams) { + client->delete_stream(make_numeric_identifier(s.id)); + } + + streams = client->get_streams(); + ASSERT_EQ(streams.size(), 0); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Stream, GetStreamsReturnsStreamAfterCreation) { + RecordProperty("description", "Verifies created stream appears in get_streams result."); + const std::string stream_name = "cpp-stream-get-streams"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto streams = client->get_streams(); + ASSERT_GE(streams.size(), 1); + + bool found = false; + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + found = true; + break; + } + } + ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in get_streams result"; + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} From f5bcb5e045bc01ae0b459c099d3180ec0f3fc02c Mon Sep 17 00:00:00 2001 From: shin Date: Tue, 31 Mar 2026 01:03:50 +0900 Subject: [PATCH 12/21] test(cpp): add comprehensive e2e tests for get_streams, send_messages, and poll_messages Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 775 ++++++++++++++++++++ foreign/cpp/tests/stream/low_level_e2e.cpp | 118 +++ 2 files changed, 893 insertions(+) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 485151062c..52961f5cf2 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -131,3 +131,778 @@ TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) { + RecordProperty("description", "Throws when sending messages with an invalid stream identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), "partition_id", partition_id_bytes(0), + std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) { + RecordProperty("description", "Throws when sending messages to a non-existent stream."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) { + RecordProperty("description", "Throws when sending messages with an invalid partitioning kind."); + const std::string stream_name = "cpp-msg-invalid-part-kind"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "invalid_kind", + partition_id_bytes(0), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) { + RecordProperty("description", "Throws when sending messages with insufficient partitioning value bytes."); + const std::string stream_name = "cpp-msg-invalid-part-val"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + rust::Vec short_bytes; + short_bytes.push_back(0x00); + short_bytes.push_back(0x01); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + std::move(short_bytes), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) { + RecordProperty("description", + "Verifies messages sent to a specific partition are only retrievable from that partition."); + const std::string stream_name = "cpp-msg-specific-part"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("partition-test-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled_part0 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + ASSERT_EQ(polled_part0.count, 5u); + + auto polled_part1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 1, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + ASSERT_EQ(polled_part1.count, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendEmptyMessageVector) { + RecordProperty("description", "Verifies behavior when sending an empty message vector."); + const std::string stream_name = "cpp-msg-empty-vec"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec empty_messages; + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(empty_messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessageWithEmptyPayload) { + RecordProperty("description", "Throws when sending a message with an empty payload."); + const std::string stream_name = "cpp-msg-empty-payload"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + rust::Vec empty_payload; + msg.new_message(std::move(empty_payload)); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessageWithOversizedPayload) { + RecordProperty("description", "Throws when sending a message exceeding maximum payload size."); + const std::string stream_name = "cpp-msg-oversized"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec oversized_payload; + for (std::uint32_t i = 0; i < 64000001u; i++) { + oversized_payload.push_back(0x41); + } + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(std::move(oversized_payload)); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesPreservesOrder) { + RecordProperty("description", "Verifies messages are stored and retrieved in the order they were sent."); + const std::string stream_name = "cpp-msg-order"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 50; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("order-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 50u); + for (std::uint32_t i = 0; i < 50; i++) { + ASSERT_EQ(polled.messages[i].offset, static_cast(i)); + std::string expected = "order-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithDuplicateIds) { + RecordProperty("description", "Verifies sending multiple messages with the same ID succeeds."); + const std::string stream_name = "cpp-msg-dup-ids"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 3; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("dup-id-msg-" + std::to_string(i))); + msg.id_lo = 99; + msg.id_hi = 0; + messages.push_back(std::move(msg)); + } + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 3u); + for (std::size_t i = 0; i < polled.messages.size(); i++) { + EXPECT_EQ(polled.messages[i].id_lo, 99u); + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithVariousPayloads) { + RecordProperty("description", + "Verifies various payload types including null bytes, UTF-8, and binary data are preserved."); + const std::string stream_name = "cpp-msg-various-payloads"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec payload_null; + payload_null.push_back(0x00); + payload_null.push_back(0x01); + payload_null.push_back(0x00); + payload_null.push_back(0xFF); + + rust::Vec payload_binary; + payload_binary.push_back(0xDE); + payload_binary.push_back(0xAD); + payload_binary.push_back(0xBE); + payload_binary.push_back(0xEF); + + rust::Vec messages; + + iggy::ffi::Message msg0; + msg0.new_message(to_payload("simple ascii")); + messages.push_back(std::move(msg0)); + + iggy::ffi::Message msg1; + msg1.new_message(std::move(payload_null)); + messages.push_back(std::move(msg1)); + + iggy::ffi::Message msg2; + msg2.new_message(to_payload("héllo wörld")); + messages.push_back(std::move(msg2)); + + iggy::ffi::Message msg3; + msg3.new_message(std::move(payload_binary)); + messages.push_back(std::move(msg3)); + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 4u); + + std::string ascii_actual(polled.messages[0].payload.begin(), polled.messages[0].payload.end()); + EXPECT_EQ(ascii_actual, "simple ascii"); + + ASSERT_EQ(polled.messages[1].payload.size(), 4u); + EXPECT_EQ(polled.messages[1].payload[0], 0x00); + EXPECT_EQ(polled.messages[1].payload[1], 0x01); + EXPECT_EQ(polled.messages[1].payload[2], 0x00); + EXPECT_EQ(polled.messages[1].payload[3], 0xFF); + + std::string utf8_actual(polled.messages[2].payload.begin(), polled.messages[2].payload.end()); + EXPECT_EQ(utf8_actual, "héllo wörld"); + + ASSERT_EQ(polled.messages[3].payload.size(), 4u); + EXPECT_EQ(polled.messages[3].payload[0], 0xDE); + EXPECT_EQ(polled.messages[3].payload[1], 0xAD); + EXPECT_EQ(polled.messages[3].payload[2], 0xBE); + EXPECT_EQ(polled.messages[3].payload[3], 0xEF); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) { + RecordProperty("description", "Throws when polling messages before authentication."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + ASSERT_NO_THROW(client->connect()); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamId) { + RecordProperty("description", "Throws when polling messages with an invalid stream identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->poll_messages(invalid_id, make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStream) { + RecordProperty("description", "Throws when polling messages from a non-existent stream."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->poll_messages(make_string_identifier("nonexistent-stream-poll"), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKind) { + RecordProperty("description", "Throws when polling messages with an invalid consumer kind."); + const std::string stream_name = "cpp-msg-invalid-consumer"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "invalid", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKind) { + RecordProperty("description", "Throws when polling messages with an invalid polling strategy kind."); + const std::string stream_name = "cpp-msg-invalid-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "invalid", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesCountLessThanAvailable) { + RecordProperty("description", "Returns only the requested count when fewer messages are requested than available."); + const std::string stream_name = "cpp-msg-count-less"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 5, false); + + ASSERT_EQ(polled.count, 5u); + ASSERT_EQ(polled.messages.size(), 5u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithLargeOffset) { + RecordProperty("description", "Returns zero messages when polling with an offset beyond available messages."); + const std::string stream_name = "cpp-msg-large-offset"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 999999, 100, false); + + ASSERT_EQ(polled.count, 0u); + ASSERT_EQ(polled.messages.size(), 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) { + RecordProperty("description", "Verifies first polling strategy returns messages from the beginning."); + const std::string stream_name = "cpp-msg-first-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "first", 0, 3, false); + + ASSERT_EQ(polled.count, 3u); + ASSERT_EQ(polled.messages.size(), 3u); + EXPECT_EQ(polled.messages[0].offset, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesLastStrategy) { + RecordProperty("description", "Verifies last polling strategy returns messages from the end."); + const std::string stream_name = "cpp-msg-last-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "last", 0, 3, false); + + ASSERT_EQ(polled.count, 3u); + ASSERT_EQ(polled.messages.size(), 3u); + EXPECT_EQ(polled.messages[0].offset, 7u); + EXPECT_EQ(polled.messages[2].offset, 9u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) { + RecordProperty("description", + "Verifies next strategy without auto-commit returns the same messages on repeated calls."); + const std::string stream_name = "cpp-msg-next-no-commit"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 100, false); + ASSERT_EQ(polled1.count, 5u); + + auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 100, false); + ASSERT_EQ(polled2.count, 5u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) { + RecordProperty("description", "Verifies next strategy with auto-commit advances the offset on subsequent polls."); + const std::string stream_name = "cpp-msg-next-auto-commit"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled1.count, 5u); + EXPECT_EQ(polled1.messages[0].offset, 0u); + EXPECT_EQ(polled1.messages[4].offset, 4u); + + auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled2.count, 5u); + EXPECT_EQ(polled2.messages[0].offset, 5u); + EXPECT_EQ(polled2.messages[4].offset, 9u); + + auto polled3 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled3.count, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesConsumerIdIndependence) { + RecordProperty("description", "Verifies different consumer IDs maintain independent offsets."); + const std::string stream_name = "cpp-msg-consumer-indep"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled_c1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "next", 0, 3, true); + ASSERT_EQ(polled_c1.count, 3u); + + auto polled_c2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(2), "next", 0, 5, true); + ASSERT_EQ(polled_c2.count, 5u); + + auto polled_c1_again = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled_c1_again.count, 2u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) { + RecordProperty("description", "Verifies message ordering is preserved across multiple send batches."); + const std::string stream_name = "cpp-msg-multi-batch-order"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec batch1; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch1-" + std::to_string(i))); + batch1.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch1)); + + rust::Vec batch2; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch2-" + std::to_string(i))); + batch2.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch2)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 10u); + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected = "batch1-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "batch1 payload mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected = "batch2-" + std::to_string(i); + std::string actual(polled.messages[5 + i].payload.begin(), polled.messages[5 + i].payload.end()); + EXPECT_EQ(actual, expected) << "batch2 payload mismatch at index " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) { + RecordProperty("description", "Verifies multiple messages with distinct custom IDs are all preserved."); + const std::string stream_name = "cpp-msg-multi-custom-ids"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + const std::uint64_t id_values[] = {100, 200, 300, 400, 500}; + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + msg.id_lo = id_values[i]; + msg.id_hi = 0; + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 5u); + for (std::uint32_t i = 0; i < 5; i++) { + EXPECT_EQ(polled.messages[i].id_lo, id_values[i]) << "ID mismatch at index " << i; + EXPECT_EQ(polled.messages[i].id_hi, 0u); + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeleted) { + RecordProperty("description", "Throws when polling messages after the stream has been deleted."); + const std::string stream_name = "cpp-msg-deleted-stream"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + std::uint32_t saved_stream_id = stream.id; + client->delete_stream(make_numeric_identifier(saved_stream_id)); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(saved_stream_id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp b/foreign/cpp/tests/stream/low_level_e2e.cpp index a678fabc6f..15a5f5a5c6 100644 --- a/foreign/cpp/tests/stream/low_level_e2e.cpp +++ b/foreign/cpp/tests/stream/low_level_e2e.cpp @@ -333,3 +333,121 @@ TEST(LowLevelE2E_Stream, GetStreamsReturnsStreamAfterCreation) { client->delete_stream(make_string_identifier(stream_name)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } + +TEST(LowLevelE2E_Stream, GetStreamsFieldsVerification) { + RecordProperty("description", + "Verifies get_streams returns correct field values after creating stream with topic and messages."); + const std::string stream_name = "cpp-stream-fields-verify"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("field-verify-message-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto streams = client->get_streams(); + ASSERT_GE(streams.size(), 1u); + + bool found = false; + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + found = true; + EXPECT_EQ(s.topics_count, 1u); + EXPECT_EQ(s.messages_count, 5u); + break; + } + } + ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in get_streams result"; + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Stream, GetStreamsBeforeLoginThrows) { + RecordProperty("description", "Throws when get_streams is called before authentication."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->get_streams(), std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->get_streams(), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { + RecordProperty("description", "Verifies get_streams result is consistent with get_stream for the same stream."); + const std::string stream_name = "cpp-stream-consistency"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + + std::string list_name; + std::uint32_t list_topics_count = 0; + auto streams = client->get_streams(); + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + list_name = std::string(s.name); + list_topics_count = s.topics_count; + break; + } + } + ASSERT_FALSE(list_name.empty()) << "Stream '" << stream_name << "' not found in get_streams result"; + + auto single = client->get_stream(make_string_identifier(stream_name)); + auto single_name = std::string(single.name); + auto single_topics = single.topics_count; + + EXPECT_EQ(list_name, single_name); + EXPECT_EQ(list_topics_count, single_topics); + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Stream, GetStreamsRepeatedCallsReturnSameResult) { + RecordProperty("description", "Verifies repeated get_streams calls return consistent results."); + const std::string stream_name = "cpp-stream-repeated"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + + auto streams1 = client->get_streams(); + auto streams2 = client->get_streams(); + auto streams3 = client->get_streams(); + + ASSERT_EQ(streams1.size(), streams2.size()); + ASSERT_EQ(streams2.size(), streams3.size()); + + auto contains_stream = [&](const rust::Vec &vec) { + for (const auto &s : vec) { + if (std::string(s.name) == stream_name) { + return true; + } + } + return false; + }; + + ASSERT_TRUE(contains_stream(streams1)) << "Stream not found in first call"; + ASSERT_TRUE(contains_stream(streams2)) << "Stream not found in second call"; + ASSERT_TRUE(contains_stream(streams3)) << "Stream not found in third call"; + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} From 9ffc8c6542bbaccf4996a30be57f7e243ed888a4 Mon Sep 17 00:00:00 2001 From: shin Date: Wed, 1 Apr 2026 09:15:52 +0900 Subject: [PATCH 13/21] test(cpp): add overlooked poll_messages e2e tests for partition, count, timestamp, and offset verification Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 176 ++++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 52961f5cf2..b8c5af7f36 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include #include #include +#include #include @@ -906,3 +908,177 @@ TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeleted) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionId) { + RecordProperty("description", "Throws when polling with a non-existent partition ID."); + const std::string stream_name = "cpp-msg-invalid-partition"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 9999, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithCountZero) { + RecordProperty("description", "Verifies polling with count=0 returns zero messages successfully."); + const std::string stream_name = "cpp-msg-count-zero"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 0, false); + + ASSERT_EQ(polled.count, 0u); + ASSERT_EQ(polled.messages.size(), 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) { + RecordProperty("description", + "Verifies polling with partition_id=u32::MAX defaults to partition 0 and returns messages."); + const std::string stream_name = "cpp-msg-no-partition"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), UINT32_MAX, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 5u); + ASSERT_EQ(polled.messages.size(), 5u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) { + RecordProperty("description", + "Verifies timestamp polling strategy returns messages with timestamp >= the specified value."); + const std::string stream_name = "cpp-msg-timestamp-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec batch1; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch1-" + std::to_string(i))); + batch1.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch1)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + rust::Vec batch2; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch2-" + std::to_string(i))); + batch2.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch2)); + + auto all = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + ASSERT_EQ(all.count, 10u); + + std::uint64_t batch2_timestamp = all.messages[5].timestamp; + ASSERT_GT(batch2_timestamp, all.messages[0].timestamp); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(2), "timestamp", batch2_timestamp, 100, false); + + ASSERT_GE(polled.count, 5u); + for (std::size_t i = 0; i < polled.messages.size(); i++) { + EXPECT_GE(polled.messages[i].timestamp, batch2_timestamp) + << "Message at index " << i << " has earlier timestamp"; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesMonotonicOffsets) { + RecordProperty("description", + "Verifies offsets are monotonically increasing and continuous across multiple polls."); + const std::string stream_name = "cpp-msg-monotonic-offsets"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 20; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("mono-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + std::uint64_t expected_offset = 0; + for (int chunk = 0; chunk < 4; chunk++) { + auto polled = + client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", expected_offset, 5, false); + + ASSERT_EQ(polled.count, 5u) << "Chunk " << chunk; + ASSERT_EQ(polled.messages.size(), 5u) << "Chunk " << chunk; + + for (std::size_t i = 0; i < polled.messages.size(); i++) { + EXPECT_EQ(polled.messages[i].offset, expected_offset) << "Chunk " << chunk << " index " << i; + expected_offset++; + } + } + + ASSERT_EQ(expected_offset, 20u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} From 9a48392ed3f8531d2e6dcef60185077593281288 Mon Sep 17 00:00:00 2001 From: shin Date: Wed, 1 Apr 2026 10:05:18 +0900 Subject: [PATCH 14/21] test(cpp): add large batch send and large count poll e2e tests Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 64 +++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index b8c5af7f36..a38e39f283 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -1082,3 +1082,67 @@ TEST(LowLevelE2E_Message, PollMessagesMonotonicOffsets) { client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } + +TEST(LowLevelE2E_Message, SendMessagesLargeBatch) { + RecordProperty("description", "Verifies sending a large batch of 1000 messages succeeds and all are retrievable."); + const std::string stream_name = "cpp-msg-large-batch"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 1000; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch-msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 1000, false); + + ASSERT_EQ(polled.count, 1000u); + ASSERT_EQ(polled.messages.size(), 1000u); + EXPECT_EQ(polled.messages[0].offset, 0u); + EXPECT_EQ(polled.messages[999].offset, 999u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesLargeCount) { + RecordProperty("description", + "Verifies polling with a very large count returns only the available messages without error."); + const std::string stream_name = "cpp-msg-large-count"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, UINT32_MAX, false); + + ASSERT_EQ(polled.count, 10u); + ASSERT_EQ(polled.messages.size(), 10u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} From 41a70503fa82bddbac6addcda49e7dd6ba1a940b Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 2 Apr 2026 00:15:37 +0900 Subject: [PATCH 15/21] test(cpp): add unit tests for Message.new_message Signed-off-by: shin --- foreign/cpp/src/client.rs | 9 +- foreign/cpp/src/identifier.rs | 1 + foreign/cpp/tests/message/unit_tests.cpp | 161 +++++++++++++++++++++++ 3 files changed, 166 insertions(+), 5 deletions(-) create mode 100644 foreign/cpp/tests/message/unit_tests.cpp diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index aec0c23080..a923b9c527 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -18,11 +18,10 @@ use crate::{RUNTIME, ffi}; use iggy::prelude::{ Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer, - ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, - IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, IggyError, - IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, - MessageClient, Partitioning, PartitionClient, PollingStrategy, StreamClient, TopicClient, - UserClient, + ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, IggyClient as RustIggyClient, + IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, + IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient, + Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient, }; use std::str::FromStr; use std::sync::Arc; diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs index 02c9d8a480..29ed5a3fbd 100644 --- a/foreign/cpp/src/identifier.rs +++ b/foreign/cpp/src/identifier.rs @@ -62,6 +62,7 @@ impl TryFrom for RustIdentifier { } } +#[allow(clippy::wrong_self_convention)] impl ffi::Identifier { pub fn from_string(&mut self, id: String) -> Result<(), String> { *self = RustIdentifier::named(&id) diff --git a/foreign/cpp/tests/message/unit_tests.cpp b/foreign/cpp/tests/message/unit_tests.cpp new file mode 100644 index 0000000000..7e6917b065 --- /dev/null +++ b/foreign/cpp/tests/message/unit_tests.cpp @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +#include "lib.rs.h" + +TEST(MessageTest, NewMessageSetsPayloadAndLength) { + RecordProperty("description", "Verifies new_message sets payload and payload_length correctly."); + iggy::ffi::Message msg; + rust::Vec payload; + const std::string text = "hello world"; + for (const char c : text) { + payload.push_back(static_cast(c)); + } + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, static_cast(text.size())); + ASSERT_EQ(msg.payload.size(), text.size()); + for (std::size_t i = 0; i < text.size(); i++) { + EXPECT_EQ(msg.payload[i], static_cast(text[i])); + } +} + +TEST(MessageTest, NewMessageZerosHeaderFields) { + RecordProperty("description", "Verifies new_message initializes all header fields to zero."); + iggy::ffi::Message msg; + msg.checksum = 999; + msg.id_lo = 999; + msg.id_hi = 999; + msg.offset = 999; + msg.timestamp = 999; + msg.origin_timestamp = 999; + msg.reserved = 999; + + rust::Vec payload; + payload.push_back(0x42); + msg.new_message(std::move(payload)); + + EXPECT_EQ(msg.checksum, 0u); + EXPECT_EQ(msg.id_lo, 0u); + EXPECT_EQ(msg.id_hi, 0u); + EXPECT_EQ(msg.offset, 0u); + EXPECT_EQ(msg.timestamp, 0u); + EXPECT_EQ(msg.origin_timestamp, 0u); + EXPECT_EQ(msg.user_headers_length, 0u); + EXPECT_EQ(msg.reserved, 0u); + EXPECT_TRUE(msg.user_headers.empty()); +} + +TEST(MessageTest, NewMessageWithEmptyPayload) { + RecordProperty("description", "Verifies new_message accepts an empty payload."); + iggy::ffi::Message msg; + rust::Vec empty_payload; + + msg.new_message(std::move(empty_payload)); + + ASSERT_EQ(msg.payload_length, 0u); + ASSERT_EQ(msg.payload.size(), 0u); +} + +TEST(MessageTest, NewMessageWithSingleByte) { + RecordProperty("description", "Verifies new_message works with a single-byte payload."); + iggy::ffi::Message msg; + rust::Vec payload; + payload.push_back(0xFF); + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, 1u); + ASSERT_EQ(msg.payload.size(), 1u); + EXPECT_EQ(msg.payload[0], 0xFF); +} + +TEST(MessageTest, NewMessageWithNullBytes) { + RecordProperty("description", "Verifies new_message preserves null bytes in payload."); + iggy::ffi::Message msg; + rust::Vec payload; + payload.push_back(0x00); + payload.push_back(0x01); + payload.push_back(0x00); + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, 3u); + ASSERT_EQ(msg.payload.size(), 3u); + EXPECT_EQ(msg.payload[0], 0x00); + EXPECT_EQ(msg.payload[1], 0x01); + EXPECT_EQ(msg.payload[2], 0x00); +} + +TEST(MessageTest, NewMessageOverwritesPreviousState) { + RecordProperty("description", "Verifies calling new_message twice overwrites previous payload and fields."); + iggy::ffi::Message msg; + + rust::Vec payload1; + payload1.push_back(0xAA); + payload1.push_back(0xBB); + msg.new_message(std::move(payload1)); + msg.id_lo = 42; + + rust::Vec payload2; + payload2.push_back(0xCC); + msg.new_message(std::move(payload2)); + + ASSERT_EQ(msg.payload_length, 1u); + ASSERT_EQ(msg.payload.size(), 1u); + EXPECT_EQ(msg.payload[0], 0xCC); + EXPECT_EQ(msg.id_lo, 0u); +} + +TEST(MessageTest, NewMessageThenSetCustomId) { + RecordProperty("description", "Verifies custom ID can be set after new_message without affecting payload."); + iggy::ffi::Message msg; + rust::Vec payload; + payload.push_back(0x42); + msg.new_message(std::move(payload)); + + msg.id_lo = 100; + msg.id_hi = 200; + + EXPECT_EQ(msg.id_lo, 100u); + EXPECT_EQ(msg.id_hi, 200u); + ASSERT_EQ(msg.payload_length, 1u); + EXPECT_EQ(msg.payload[0], 0x42); +} + +TEST(MessageTest, NewMessageWithLargePayload) { + RecordProperty("description", "Verifies new_message handles a larger payload correctly."); + iggy::ffi::Message msg; + rust::Vec payload; + for (std::uint32_t i = 0; i < 10000; i++) { + payload.push_back(static_cast(i % 256)); + } + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, 10000u); + ASSERT_EQ(msg.payload.size(), 10000u); + EXPECT_EQ(msg.payload[0], 0u); + EXPECT_EQ(msg.payload[255], 255u); + EXPECT_EQ(msg.payload[256], 0u); +} From 1dbc51f01f5be7080011f4b83c128fa1f1f45b9b Mon Sep 17 00:00:00 2001 From: shin Date: Thu, 2 Apr 2026 07:33:41 +0900 Subject: [PATCH 16/21] =?UTF-8?q?fix(cpp):=20fix=20PollMessagesWithCountZe?= =?UTF-8?q?ro=20test=20=E2=80=94=20server=20rejects=20count=3D0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index a38e39f283..f915a8025e 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -929,7 +929,7 @@ TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionId) { } TEST(LowLevelE2E_Message, PollMessagesWithCountZero) { - RecordProperty("description", "Verifies polling with count=0 returns zero messages successfully."); + RecordProperty("description", "Throws when polling with count=0."); const std::string stream_name = "cpp-msg-count-zero"; iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); @@ -939,20 +939,9 @@ TEST(LowLevelE2E_Message, PollMessagesWithCountZero) { client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, "server_default"); - rust::Vec messages; - for (std::uint32_t i = 0; i < 5; i++) { - iggy::ffi::Message msg; - msg.new_message(to_payload("msg-" + std::to_string(i))); - messages.push_back(std::move(msg)); - } - client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", - partition_id_bytes(0), std::move(messages)); - - auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", - make_numeric_identifier(1), "offset", 0, 0, false); - - ASSERT_EQ(polled.count, 0u); - ASSERT_EQ(polled.messages.size(), 0u); + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 0, false), + std::exception); client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); From 9656b73ac27f616e0247aaa8de5790e4582660fc Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 5 Apr 2026 00:43:40 +0900 Subject: [PATCH 17/21] =?UTF-8?q?refactor(cpp):=20address=20review=20feedb?= =?UTF-8?q?ack=20=E2=80=94=20rename=20tests,=20strengthen=20assertions,=20?= =?UTF-8?q?remove=20redundant=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename 10 error-throwing tests with `Throws` suffix for naming consistency - Add payload/offset verification to 7 tests that only checked counts - Delete PollMessagesLargeCount test (not explicitly testing limits) Signed-off-by: shin --- foreign/cpp/tests/message/low_level_e2e.cpp | 99 ++++++++++++--------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index f915a8025e..efe65fbac6 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -258,8 +258,8 @@ TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, SendEmptyMessageVector) { - RecordProperty("description", "Verifies behavior when sending an empty message vector."); +TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) { + RecordProperty("description", "Throws when sending an empty message vector."); const std::string stream_name = "cpp-msg-empty-vec"; iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); @@ -279,7 +279,7 @@ TEST(LowLevelE2E_Message, SendEmptyMessageVector) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, SendMessageWithEmptyPayload) { +TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) { RecordProperty("description", "Throws when sending a message with an empty payload."); const std::string stream_name = "cpp-msg-empty-payload"; iggy::ffi::Client *client = login_to_server(); @@ -304,7 +304,7 @@ TEST(LowLevelE2E_Message, SendMessageWithEmptyPayload) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, SendMessageWithOversizedPayload) { +TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) { RecordProperty("description", "Throws when sending a message exceeding maximum payload size."); const std::string stream_name = "cpp-msg-oversized"; iggy::ffi::Client *client = login_to_server(); @@ -490,7 +490,7 @@ TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamId) { +TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamIdThrows) { RecordProperty("description", "Throws when polling messages with an invalid stream identifier."); iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); @@ -506,7 +506,7 @@ TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamId) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStream) { +TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStreamThrows) { RecordProperty("description", "Throws when polling messages from a non-existent stream."); iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); @@ -518,7 +518,7 @@ TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStream) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKind) { +TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKindThrows) { RecordProperty("description", "Throws when polling messages with an invalid consumer kind."); const std::string stream_name = "cpp-msg-invalid-consumer"; iggy::ffi::Client *client = login_to_server(); @@ -537,7 +537,7 @@ TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKind) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKind) { +TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKindThrows) { RecordProperty("description", "Throws when polling messages with an invalid polling strategy kind."); const std::string stream_name = "cpp-msg-invalid-strategy"; iggy::ffi::Client *client = login_to_server(); @@ -645,6 +645,12 @@ TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) { ASSERT_EQ(polled.count, 3u); ASSERT_EQ(polled.messages.size(), 3u); EXPECT_EQ(polled.messages[0].offset, 0u); + for (std::uint32_t i = 0; i < 3; i++) { + EXPECT_EQ(polled.messages[i].offset, static_cast(i)); + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); @@ -678,6 +684,11 @@ TEST(LowLevelE2E_Message, PollMessagesLastStrategy) { ASSERT_EQ(polled.messages.size(), 3u); EXPECT_EQ(polled.messages[0].offset, 7u); EXPECT_EQ(polled.messages[2].offset, 9u); + for (std::uint32_t i = 0; i < 3; i++) { + std::string expected = "msg-" + std::to_string(7 + i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i; + } client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); @@ -712,6 +723,18 @@ TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) { auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", make_numeric_identifier(1), "next", 0, 100, false); ASSERT_EQ(polled2.count, 5u); + for (std::uint32_t i = 0; i < 5; i++) { + EXPECT_EQ(polled1.messages[i].offset, static_cast(i)); + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled1.messages[i].payload.begin(), polled1.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "polled1 payload mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + EXPECT_EQ(polled2.messages[i].offset, static_cast(i)); + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled2.messages[i].payload.begin(), polled2.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "polled2 payload mismatch at index " << i; + } client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); @@ -749,6 +772,16 @@ TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) { ASSERT_EQ(polled2.count, 5u); EXPECT_EQ(polled2.messages[0].offset, 5u); EXPECT_EQ(polled2.messages[4].offset, 9u); + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected1 = "msg-" + std::to_string(i); + std::string actual1(polled1.messages[i].payload.begin(), polled1.messages[i].payload.end()); + EXPECT_EQ(actual1, expected1) << "polled1 payload mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected2 = "msg-" + std::to_string(5 + i); + std::string actual2(polled2.messages[i].payload.begin(), polled2.messages[i].payload.end()); + EXPECT_EQ(actual2, expected2) << "polled2 payload mismatch at index " << i; + } auto polled3 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", make_numeric_identifier(1), "next", 0, 5, true); @@ -828,6 +861,9 @@ TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) { make_numeric_identifier(1), "offset", 0, 100, false); ASSERT_EQ(polled.count, 10u); + for (std::uint32_t i = 0; i < 10; i++) { + EXPECT_EQ(polled.messages[i].offset, static_cast(i)) << "Offset mismatch at index " << i; + } for (std::uint32_t i = 0; i < 5; i++) { std::string expected = "batch1-" + std::to_string(i); std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); @@ -880,7 +916,7 @@ TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeleted) { +TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeletedThrows) { RecordProperty("description", "Throws when polling messages after the stream has been deleted."); const std::string stream_name = "cpp-msg-deleted-stream"; iggy::ffi::Client *client = login_to_server(); @@ -909,7 +945,7 @@ TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeleted) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionId) { +TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionIdThrows) { RecordProperty("description", "Throws when polling with a non-existent partition ID."); const std::string stream_name = "cpp-msg-invalid-partition"; iggy::ffi::Client *client = login_to_server(); @@ -928,7 +964,7 @@ TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionId) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } -TEST(LowLevelE2E_Message, PollMessagesWithCountZero) { +TEST(LowLevelE2E_Message, PollMessagesWithCountZeroThrows) { RecordProperty("description", "Throws when polling with count=0."); const std::string stream_name = "cpp-msg-count-zero"; iggy::ffi::Client *client = login_to_server(); @@ -973,6 +1009,11 @@ TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) { ASSERT_EQ(polled.count, 5u); ASSERT_EQ(polled.messages.size(), 5u); + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i; + } client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); @@ -1025,6 +1066,11 @@ TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) { EXPECT_GE(polled.messages[i].timestamp, batch2_timestamp) << "Message at index " << i << " has earlier timestamp"; } + for (std::size_t i = 0; i < polled.messages.size(); i++) { + std::string expected = "batch2-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i; + } client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); @@ -1104,34 +1150,3 @@ TEST(LowLevelE2E_Message, SendMessagesLargeBatch) { client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } - -TEST(LowLevelE2E_Message, PollMessagesLargeCount) { - RecordProperty("description", - "Verifies polling with a very large count returns only the available messages without error."); - const std::string stream_name = "cpp-msg-large-count"; - iggy::ffi::Client *client = login_to_server(); - ASSERT_NE(client, nullptr); - - client->create_stream(stream_name); - auto stream = client->get_stream(make_string_identifier(stream_name)); - client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, - "server_default"); - - rust::Vec messages; - for (std::uint32_t i = 0; i < 10; i++) { - iggy::ffi::Message msg; - msg.new_message(to_payload("msg-" + std::to_string(i))); - messages.push_back(std::move(msg)); - } - client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", - partition_id_bytes(0), std::move(messages)); - - auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", - make_numeric_identifier(1), "offset", 0, UINT32_MAX, false); - - ASSERT_EQ(polled.count, 10u); - ASSERT_EQ(polled.messages.size(), 10u); - - client->delete_stream(make_numeric_identifier(stream.id)); - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); -} From 596b9d8dcacd151b60321fa94363f0cc2d902c83 Mon Sep 17 00:00:00 2001 From: shin Date: Sun, 5 Apr 2026 01:07:23 +0900 Subject: [PATCH 18/21] fix(cpp): remove unneeded clippy allow on identifier.rs Signed-off-by: shin --- foreign/cpp/src/identifier.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs index 29ed5a3fbd..02c9d8a480 100644 --- a/foreign/cpp/src/identifier.rs +++ b/foreign/cpp/src/identifier.rs @@ -62,7 +62,6 @@ impl TryFrom for RustIdentifier { } } -#[allow(clippy::wrong_self_convention)] impl ffi::Identifier { pub fn from_string(&mut self, id: String) -> Result<(), String> { *self = RustIdentifier::named(&id) From 41a3f03c8fd3b51c876a559069d4a723a5a1991e Mon Sep 17 00:00:00 2001 From: shin Date: Wed, 8 Apr 2026 00:00:15 +0900 Subject: [PATCH 19/21] feat(cpp): add join/leave consumer group FFI and address review feedback - Add join_consumer_group and leave_consumer_group FFI functions - Add field assertions to GetStreamsReturnsStreamAfterCreation test - Add field comparison to GetStreamsConsistentWithGetStream test - Add SendMessagesWithInvalidTopicIdThrows test - Add PollMessagesWithInvalidTopicIdThrows test - Add PollMessagesWithInvalidConsumerIdThrows test - Add ConsumerGroupCreateJoinAndPollMessages e2e test Signed-off-by: shin --- foreign/cpp/src/client.rs | 60 +++++++++++ foreign/cpp/src/lib.rs | 12 +++ foreign/cpp/tests/message/low_level_e2e.cpp | 114 ++++++++++++++++++++ foreign/cpp/tests/stream/low_level_e2e.cpp | 10 ++ 4 files changed, 196 insertions(+) diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index a923b9c527..def43b84a2 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -516,6 +516,66 @@ impl Client { Ok(()) }) } + + pub fn join_consumer_group( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + group_id: ffi::Identifier, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| { + format!("Could not join consumer group: invalid stream identifier: {error}") + })?; + let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| { + format!("Could not join consumer group: invalid topic identifier: {error}") + })?; + let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| { + format!("Could not join consumer group: invalid group identifier: {error}") + })?; + + RUNTIME.block_on(async { + self.inner + .join_consumer_group(&rust_stream_id, &rust_topic_id, &rust_group_id) + .await + .map_err(|error| { + format!( + "Could not join consumer group '{}' for topic '{}' on stream '{}': {error}", + rust_group_id, rust_topic_id, rust_stream_id + ) + })?; + Ok(()) + }) + } + + pub fn leave_consumer_group( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + group_id: ffi::Identifier, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| { + format!("Could not leave consumer group: invalid stream identifier: {error}") + })?; + let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| { + format!("Could not leave consumer group: invalid topic identifier: {error}") + })?; + let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| { + format!("Could not leave consumer group: invalid group identifier: {error}") + })?; + + RUNTIME.block_on(async { + self.inner + .leave_consumer_group(&rust_stream_id, &rust_topic_id, &rust_group_id) + .await + .map_err(|error| { + format!( + "Could not leave consumer group '{}' for topic '{}' on stream '{}': {error}", + rust_group_id, rust_topic_id, rust_stream_id + ) + })?; + Ok(()) + }) + } } pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> { diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 434801f504..d65e3339db 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -161,6 +161,18 @@ mod ffi { topic_id: Identifier, group_id: Identifier, ) -> Result<()>; + fn join_consumer_group( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + group_id: Identifier, + ) -> Result<()>; + fn leave_consumer_group( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + group_id: Identifier, + ) -> Result<()>; #[allow(clippy::too_many_arguments)] fn poll_messages( diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index efe65fbac6..c092929ef5 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -1150,3 +1150,117 @@ TEST(LowLevelE2E_Message, SendMessagesLargeBatch) { client->delete_stream(make_numeric_identifier(stream.id)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidTopicIdThrows) { + RecordProperty("description", "Throws when sending messages with an invalid topic identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->send_messages(make_numeric_identifier(1), invalid_id, "partition_id", partition_id_bytes(0), + std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidTopicIdThrows) { + RecordProperty("description", "Throws when polling messages with an invalid topic identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), invalid_id, 0, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerIdThrows) { + RecordProperty("description", "Throws when polling messages with an invalid consumer identifier."); + const std::string stream_name = "cpp-msg-invalid-consumer-id"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + invalid_id, "offset", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, ConsumerGroupCreateJoinAndPollMessages) { + RecordProperty("description", + "Creates a consumer group, joins it, sends messages, and polls them using consumer_group kind."); + const std::string stream_name = "cpp-msg-consumer-group"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + auto group = + client->create_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), "test-group"); + ASSERT_EQ(group.members_count, 0u); + + ASSERT_NO_THROW(client->join_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id))); + + auto group_after_join = client->get_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id)); + ASSERT_EQ(group_after_join.members_count, 1u); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("cg-msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer_group", make_numeric_identifier(group.id), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 10u); + ASSERT_EQ(polled.messages.size(), 10u); + for (std::uint32_t i = 0; i < 10; i++) { + std::string expected = "cg-msg-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + ASSERT_NO_THROW(client->leave_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id))); + + auto group_after_leave = client->get_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id)); + ASSERT_EQ(group_after_leave.members_count, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp b/foreign/cpp/tests/stream/low_level_e2e.cpp index 15a5f5a5c6..f59c7934f4 100644 --- a/foreign/cpp/tests/stream/low_level_e2e.cpp +++ b/foreign/cpp/tests/stream/low_level_e2e.cpp @@ -325,6 +325,10 @@ TEST(LowLevelE2E_Stream, GetStreamsReturnsStreamAfterCreation) { for (const auto &s : streams) { if (std::string(s.name) == stream_name) { found = true; + EXPECT_GT(s.created_at, static_cast(0)); + EXPECT_EQ(s.size_bytes, static_cast(0)); + EXPECT_EQ(s.messages_count, static_cast(0)); + EXPECT_EQ(s.topics_count, 0u); break; } } @@ -397,11 +401,15 @@ TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { std::string list_name; std::uint32_t list_topics_count = 0; + std::uint64_t list_created_at = 0; + std::uint64_t list_size_bytes = 0; auto streams = client->get_streams(); for (const auto &s : streams) { if (std::string(s.name) == stream_name) { list_name = std::string(s.name); list_topics_count = s.topics_count; + list_created_at = s.created_at; + list_size_bytes = s.size_bytes; break; } } @@ -413,6 +421,8 @@ TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { EXPECT_EQ(list_name, single_name); EXPECT_EQ(list_topics_count, single_topics); + EXPECT_EQ(list_created_at, single.created_at); + EXPECT_EQ(list_size_bytes, single.size_bytes); client->delete_stream(make_string_identifier(stream_name)); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); From 64d9d08778242a15eb11fd1afbba85992c057941 Mon Sep 17 00:00:00 2001 From: shin Date: Wed, 8 Apr 2026 20:34:52 +0900 Subject: [PATCH 20/21] chore(cpp): add TODO for join/leave consumer group tests Signed-off-by: shin --- foreign/cpp/tests/client/low_level_e2e.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp b/foreign/cpp/tests/client/low_level_e2e.cpp index 26b82b82f8..7306f0c953 100644 --- a/foreign/cpp/tests/client/low_level_e2e.cpp +++ b/foreign/cpp/tests/client/low_level_e2e.cpp @@ -16,6 +16,7 @@ // under the License. // TODO(slbotbm): create fixture for setup/teardown. +// TODO(slbotbm): Add tests for join_consumer_group() and leave_consumer_group() #include From af1c9e675182c7a9d948d226d9ca1903cb0f678e Mon Sep 17 00:00:00 2001 From: shin Date: Wed, 8 Apr 2026 21:59:46 +0900 Subject: [PATCH 21/21] fix(cpp): add id comparison to GetStreamsConsistentWithGetStream test Signed-off-by: shin --- foreign/cpp/tests/stream/low_level_e2e.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp b/foreign/cpp/tests/stream/low_level_e2e.cpp index f59c7934f4..30d8a55a1c 100644 --- a/foreign/cpp/tests/stream/low_level_e2e.cpp +++ b/foreign/cpp/tests/stream/low_level_e2e.cpp @@ -400,6 +400,7 @@ TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { client->create_stream(stream_name); std::string list_name; + std::uint32_t list_id = 0; std::uint32_t list_topics_count = 0; std::uint64_t list_created_at = 0; std::uint64_t list_size_bytes = 0; @@ -407,6 +408,7 @@ TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { for (const auto &s : streams) { if (std::string(s.name) == stream_name) { list_name = std::string(s.name); + list_id = s.id; list_topics_count = s.topics_count; list_created_at = s.created_at; list_size_bytes = s.size_bytes; @@ -420,6 +422,7 @@ TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { auto single_topics = single.topics_count; EXPECT_EQ(list_name, single_name); + EXPECT_EQ(list_id, single.id); EXPECT_EQ(list_topics_count, single_topics); EXPECT_EQ(list_created_at, single.created_at); EXPECT_EQ(list_size_bytes, single.size_bytes);