diff --git a/core/integration/tests/server/scenarios/message_deduplication_scenario.rs b/core/integration/tests/server/scenarios/message_deduplication_scenario.rs new file mode 100644 index 0000000000..ddeab330a2 --- /dev/null +++ b/core/integration/tests/server/scenarios/message_deduplication_scenario.rs @@ -0,0 +1,221 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bytes::Bytes; +use iggy::prelude::*; +use integration::harness::TestHarness; +use std::collections::HashSet; +use std::time::Duration; + +const STREAM_NAME: &str = "dedup-test-stream"; +const TOPIC_NAME: &str = "dedup-test-topic"; +const MESSAGES_PER_BATCH: u32 = 10; +const DEDUP_TTL_SECS: u64 = 2; + +pub async fn run(harness: &TestHarness) { + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + + client.create_stream(STREAM_NAME).await.unwrap(); + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + let stream_id = Identifier::named(STREAM_NAME).unwrap(); + let topic_id = Identifier::named(TOPIC_NAME).unwrap(); + let partitioning = Partitioning::partition_id(0); + let consumer = Consumer::default(); + + // Step 1: Send 10 messages with id=0 (server auto-generates UUID) + let mut auto_messages = build_messages("auto-id", &[0; MESSAGES_PER_BATCH as usize]); + client + .send_messages(&stream_id, &topic_id, &partitioning, &mut auto_messages) + .await + .unwrap(); + + let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await; + assert_eq!(polled.messages.len(), MESSAGES_PER_BATCH as usize); + let auto_ids: HashSet = polled.messages.iter().map(|m| m.header.id).collect(); + assert_eq!( + auto_ids.len(), + MESSAGES_PER_BATCH as usize, + "All auto-generated IDs must be unique" + ); + + // Step 2: Send 10 messages with explicit IDs 1-10 + let explicit_ids: Vec = (1..=MESSAGES_PER_BATCH as u128).collect(); + let mut original_messages = build_messages("original", &explicit_ids); + client + .send_messages(&stream_id, &topic_id, &partitioning, &mut original_messages) + .await + .unwrap(); + + let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await; + assert_eq!(polled.messages.len(), (MESSAGES_PER_BATCH * 2) as usize); + + // Step 3: Re-send IDs 1-10 with different payload — duplicates should be dropped + let mut duplicate_messages = build_messages("duplicate", &explicit_ids); + client + .send_messages( + &stream_id, + &topic_id, + &partitioning, + &mut duplicate_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await; + assert_eq!( + polled.messages.len(), + (MESSAGES_PER_BATCH * 2) as usize, + "Duplicate messages must not increase count" + ); + for msg in &polled.messages[MESSAGES_PER_BATCH as usize..] { + let payload = std::str::from_utf8(&msg.payload).unwrap(); + assert!( + payload.starts_with("original-"), + "Original payload must be preserved, got: {payload}" + ); + } + + // Step 4: Send all-duplicate batch (regression test for empty batch panic) + let mut all_dup_messages = build_messages("all-dup", &explicit_ids); + client + .send_messages(&stream_id, &topic_id, &partitioning, &mut all_dup_messages) + .await + .unwrap(); + + let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await; + assert_eq!( + polled.messages.len(), + (MESSAGES_PER_BATCH * 2) as usize, + "All-duplicate batch must not change count" + ); + + // Step 5: Verify monotonically increasing offsets + for window in polled.messages.windows(2) { + assert!( + window[1].header.offset > window[0].header.offset, + "Offsets not monotonically increasing: {} followed by {}", + window[0].header.offset, + window[1].header.offset + ); + } + + // Step 6: Wait for TTL expiry, then re-send IDs 1-10 + tokio::time::sleep(Duration::from_secs(DEDUP_TTL_SECS + 1)).await; + + let mut after_ttl_messages = build_messages("after-ttl", &explicit_ids); + client + .send_messages( + &stream_id, + &topic_id, + &partitioning, + &mut after_ttl_messages, + ) + .await + .unwrap(); + + let expected_after_ttl = (MESSAGES_PER_BATCH * 3) as usize; // 30 + let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await; + assert_eq!( + polled.messages.len(), + expected_after_ttl, + "After TTL expiry, previously seen IDs must be accepted again" + ); + + let ttl_start = (MESSAGES_PER_BATCH * 2) as usize; + for msg in &polled.messages[ttl_start..] { + let payload = std::str::from_utf8(&msg.payload).unwrap(); + assert!( + payload.starts_with("after-ttl-"), + "After-TTL payload mismatch: {payload}" + ); + } + + for window in polled.messages.windows(2) { + assert!( + window[1].header.offset > window[0].header.offset, + "Offsets not monotonically increasing after TTL: {} followed by {}", + window[0].header.offset, + window[1].header.offset + ); + } + + client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} + +fn build_messages(prefix: &str, ids: &[u128]) -> Vec { + ids.iter() + .enumerate() + .map(|(idx, &id)| { + let payload = if id == 0 { + format!("{prefix}-auto-{idx}") + } else { + format!("{prefix}-{id}") + }; + if id != 0 { + IggyMessage::builder() + .id(id) + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + } else { + IggyMessage::builder() + .payload(Bytes::from(payload)) + .build() + .expect("Failed to build message") + } + }) + .collect() +} + +async fn poll_all( + client: &IggyClient, + stream_id: &Identifier, + topic_id: &Identifier, + consumer: &Consumer, +) -> PolledMessages { + client + .poll_messages( + stream_id, + topic_id, + Some(0), + consumer, + &PollingStrategy::offset(0), + 1000, + false, + ) + .await + .unwrap() +} diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 234572887a..91034f6f78 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -33,6 +33,7 @@ pub mod encryption_scenario; pub mod invalid_consumer_offset_scenario; pub mod log_rotation_scenario; pub mod message_cleanup_scenario; +pub mod message_deduplication_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; pub mod offset_scenario; diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index 49c2241509..49cf3d191a 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -18,9 +18,9 @@ */ use crate::server::scenarios::{ - message_size_scenario, reconnect_after_restart_scenario, restart_offset_skip_scenario, - segment_rotation_race_scenario, single_message_per_batch_scenario, tcp_tls_scenario, - websocket_tls_scenario, + message_deduplication_scenario, message_size_scenario, reconnect_after_restart_scenario, + restart_offset_skip_scenario, segment_rotation_race_scenario, + single_message_per_batch_scenario, tcp_tls_scenario, websocket_tls_scenario, }; use integration::iggy_harness; @@ -147,3 +147,12 @@ async fn restart_offset_skip(harness: &mut TestHarness) { async fn segment_rotation_scenario(harness: &TestHarness) { segment_rotation_race_scenario::run(harness).await; } + +#[iggy_harness(server( + message_deduplication.enabled = true, + message_deduplication.expiry = "2s", + partition.messages_required_to_save = "1" +))] +async fn message_deduplication(harness: &TestHarness) { + message_deduplication_scenario::run(harness).await; +} diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 82a5d25b06..6e1721742a 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -355,6 +355,10 @@ impl IggyShard { ) .await; + if batch.count() == 0 { + return Ok(()); + } + let (journal_messages_count, journal_size, is_full) = { let mut partitions = self.local_partitions.borrow_mut(); let partition = partitions @@ -380,15 +384,11 @@ impl IggyShard { segment.end_timestamp = batch.last_timestamp().unwrap(); segment.end_offset = batch.last_offset().unwrap(); + let last_offset = segment.end_offset; + let (journal_messages_count, journal_size) = partition.log.journal_mut().append(batch)?; - let last_offset = if batch_messages_count == 0 { - current_offset - } else { - current_offset + batch_messages_count as u64 - 1 - }; - if partition.should_increment_offset { partition.offset.store(last_offset, Ordering::Relaxed); } else { diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index f0831828ad..5e21a8ff3f 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -23,6 +23,7 @@ use crate::shard::transmission::event::PartitionInfo; use crate::shard::transmission::message::ResolvedTopic; use crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets; use crate::streaming::partitions::consumer_offsets::ConsumerOffsets; +use crate::streaming::partitions::helpers::create_message_deduplicator; use crate::streaming::partitions::local_partition::LocalPartition; use crate::streaming::partitions::storage::create_partition_file_hierarchy; use crate::streaming::partitions::storage::delete_partitions_from_disk; @@ -350,13 +351,14 @@ impl IggyShard { } } + let message_deduplicator = create_message_deduplicator(&self.config.system).map(Arc::new); let partition = LocalPartition::with_log( loaded_log, stats, std::sync::Arc::new(std::sync::atomic::AtomicU64::new(current_offset)), consumer_offsets, consumer_group_offsets, - None, + message_deduplicator, created_at, revision_id, should_increment_offset, diff --git a/core/server/src/streaming/partitions/journal.rs b/core/server/src/streaming/partitions/journal.rs index 87bd9cfcb5..ebd044e861 100644 --- a/core/server/src/streaming/partitions/journal.rs +++ b/core/server/src/streaming/partitions/journal.rs @@ -84,12 +84,14 @@ impl Journal for MemoryMessageJournal { && let Some(first_offset) = entry.first_offset() { // Allow disagreement when either side is 0 (fresh partition or - // reset after purge). Only flag when both are non-zero and differ. + // reset after purge), or when first_offset > base_offset (offset + // gap from deduplicated batches that skipped journal append). + // Only flag when first_offset < base_offset (offset regression). debug_assert!( self.inner.base_offset == 0 || first_offset == 0 - || self.inner.base_offset == first_offset, - "journal base_offset ({}) disagrees with batch first_offset ({})", + || self.inner.base_offset <= first_offset, + "journal base_offset ({}) is ahead of batch first_offset ({})", self.inner.base_offset, first_offset ); @@ -99,6 +101,7 @@ impl Journal for MemoryMessageJournal { let batch_size = entry.size(); let first_timestamp = entry.first_timestamp().unwrap(); let last_timestamp = entry.last_timestamp().unwrap(); + let last_offset = entry.last_offset().unwrap(); self.batches.add_batch(entry); if self.inner.first_timestamp == 0 { @@ -106,7 +109,7 @@ impl Journal for MemoryMessageJournal { } self.inner.end_timestamp = last_timestamp; self.inner.messages_count += batch_messages_count; - self.inner.current_offset = self.inner.base_offset + self.inner.messages_count as u64 - 1; + self.inner.current_offset = last_offset; self.inner.size = IggyByteSize::from(self.inner.size.as_bytes_u64() + batch_size as u64); Ok((self.inner.messages_count, self.inner.size.as_bytes_u32()))