Skip to content

Replace Docker-based Kafka with embedded KRaft broker for integration tests#17790

Open
xiangfu0 wants to merge 7 commits intoapache:masterfrom
xiangfu0:flaky-kafka-startable
Open

Replace Docker-based Kafka with embedded KRaft broker for integration tests#17790
xiangfu0 wants to merge 7 commits intoapache:masterfrom
xiangfu0:flaky-kafka-startable

Conversation

@xiangfu0
Copy link
Contributor

@xiangfu0 xiangfu0 commented Mar 1, 2026

Summary

  • Replace KafkaServerStartable (Docker CLI-based) with EmbeddedKafkaCluster using Kafka's KafkaClusterTestKit in KRaft mode for integration tests
  • Eliminates Docker dependency for tests, improving startup speed (~5s vs ~30-60s) and removing the primary source of CI flakiness on GitHub Actions (image pulls, network creation races, port mapping issues, container timeouts)
  • Simplify BaseClusterIntegrationTest Kafka lifecycle — no retry loop, no Docker container management, no port scanning

Changes

  • New: EmbeddedKafkaCluster — in-process KRaft cluster wrapper implementing StreamDataServerStartable, supporting multi-broker clusters and Kafka transactions
  • Modified: BaseClusterIntegrationTest — rewrote Kafka start/stop to use embedded cluster; fixed replication factor calculation for multi-broker clusters; removed Docker-specific dead code (~170 lines)
  • Modified: KafkaPartitionLevelConsumerTest — use EmbeddedKafkaCluster instead of Docker
  • Modified: pom.xml (root, pinot-kafka-3.0, pinot-integration-test-base, pinot-integration-tests) — added kafka test-jar dependencies for embedded broker support
  • Unchanged: KafkaServerStartable (production Docker-based class for QuickStart) — not modified

Test plan

  • KafkaPartitionLevelConsumerTest — 9 tests pass (single broker)
  • LLCRealtimeClusterIntegrationTest — passes (2 brokers, non-transactional)
  • ExactlyOnceKafkaRealtimeClusterIntegrationTest — passes (3 brokers, Kafka transactions with read_committed)
  • Full integration test suite via CI

🤖 Generated with Claude Code

@codecov-commenter
Copy link

codecov-commenter commented Mar 1, 2026

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
10929 3 10926 68
View the top 3 failed test(s) by shortest run time
org.apache.pinot.controller.helix.core.util.SegmentDeletionManagerTest::testRemoveDeletedSegments
Stack Traces | 101s run time
Failed to meet condition in 100000ms, error message: dummyDir2 still exists
org.apache.pinot.controller.helix.core.util.SegmentDeletionManagerTest::testRemoveDeletedSegments
Stack Traces | 101s run time
Failed to meet condition in 100000ms, error message: dummyDir2 still exists
org.apache.pinot.integration.tests.UpsertTableIntegrationTest::testUpsertCompactionWithSoftDelete
Stack Traces | 607s run time
Failed to meet condition in 600000ms, error message: Failed to load all documents
org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest::@BeforeClass setUp
Stack Traces | 1247s run time
Failed to load 115545 documents; current count=0 for table=mytable expected [115545] but found [0]
View the full list of 1 ❄️ flaky test(s)
org.apache.pinot.integration.tests.ExactlyOnceKafkaRealtimeClusterIntegrationTest::setUp

Flake rate in main: 100.00% (Passed 0 times, Failed 44 times)

Stack Traces | 1247s run time
Failed to load 115545 documents; current count=0 for table=mytable expected [115545] but found [0]

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@xiangfu0 xiangfu0 force-pushed the flaky-kafka-startable branch 6 times, most recently from ba33bd3 to 1fcba20 Compare March 2, 2026 04:46
@xiangfu0 xiangfu0 requested a review from Copilot March 2, 2026 05:12
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR modernizes Pinot’s Kafka-backed integration testing by replacing Docker-managed Kafka startup with an in-process Kafka KRaft cluster (Kafka KafkaClusterTestKit), aiming to reduce CI flakiness and improve test startup time.

Changes:

  • Introduces EmbeddedKafkaCluster (KRaft, in-process) implementing StreamDataServerStartable for integration tests.
  • Refactors Kafka lifecycle in BaseClusterIntegrationTest and updates Kafka-based tests to use the embedded broker.
  • Updates Maven + GitHub Actions integration test partitioning to run ExactlyOnceKafkaRealtimeClusterIntegrationTest in its own test set with required Kafka test dependencies.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pom.xml Adds dependency management entries for Kafka test artifacts, Pinot Kafka test-jar, and JUnit Jupiter API needed by Kafka testkit.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java New embedded Kafka (KRaft) cluster wrapper for test usage.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java Switches test Kafka setup from Docker-based starter to embedded cluster.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml Adds Kafka testkit + JUnit Jupiter API dependencies required to compile/run embedded broker tests.
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java Replaces Docker Kafka startup with embedded cluster and adjusts topic readiness/replication handling.
pinot-integration-test-base/pom.xml Adds test dependencies on pinot-kafka-3.0 test-jar and Kafka test artifacts.
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java Updates cleanup strategy (drop/recreate table + recreate topic) and adjusts lease extender executor handling after stopping servers.
pinot-integration-tests/pom.xml Adds a new integration-tests-set-3 profile for ExactlyOnce test and pulls in embedded Kafka dependencies.
.github/workflows/scripts/pr-tests/.pinot_tests_integration.sh Adds support for running integration test set 3.
.github/workflows/pinot_tests.yml Expands integration test matrix to include test set 3.

@@ -154,6 +154,7 @@ public void testRebalance()

serverStarter1.stop();
serverStarter2.stop();
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SegmentBuildTimeLeaseExtender.initExecutor() here is non-obvious (it’s effectively compensating for the static executor being shut down when servers stop). Adding a short inline comment (like the one in UpsertTableIntegrationTest) would help future maintainers understand why this re-initialization is required and avoid accidental removal.

Suggested change
serverStarter2.stop();
serverStarter2.stop();
// Re-init the static executor because stopping servers shuts it down; required for subsequent operations.

Copilot uses AI. Check for mistakes.
Comment on lines +62 to +67
int replicationFactor = Math.min(3, _brokerCount);
Map<String, String> serverProps = new HashMap<>();
serverProps.put("offsets.topic.replication.factor", String.valueOf(replicationFactor));
serverProps.put("transaction.state.log.replication.factor", String.valueOf(replicationFactor));
serverProps.put("transaction.state.log.min.isr", String.valueOf(Math.min(2, replicationFactor)));

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In start(), the local serverProps map is populated but never used. This looks like leftover code and can be removed, or wired into the TestKit configuration (e.g., via per-server properties) to avoid dead code and confusion about which configs are actually applied.

Copilot uses AI. Check for mistakes.
Comment on lines +124 to +185
@Override
public void createTopic(String topic, Properties topicProps) {
int numPartitions = Integer.parseInt(String.valueOf(topicProps.getOrDefault("partition", "1")));
int requestedReplicationFactor = Integer.parseInt(
String.valueOf(topicProps.getOrDefault("replicationFactor", "1")));
short replicationFactor = (short) Math.max(1, Math.min(_brokerCount, requestedReplicationFactor));
try (AdminClient adminClient = createAdminClient()) {
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
runAdminWithRetry(() -> adminClient.createTopics(Collections.singletonList(newTopic)).all().get(),
"create topic: " + topic);
} catch (Exception e) {
if (e instanceof ExecutionException
&& e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
return;
}
throw new RuntimeException("Failed to create topic: " + topic, e);
}
}

@Override
public void deleteTopic(String topic) {
try (AdminClient adminClient = createAdminClient()) {
runAdminWithRetry(() -> adminClient.deleteTopics(Collections.singletonList(topic)).all().get(),
"delete topic: " + topic);
} catch (Exception e) {
throw new RuntimeException("Failed to delete topic: " + topic, e);
}
}

@Override
public void createPartitions(String topic, int numPartitions) {
try (AdminClient adminClient = createAdminClient()) {
runAdminWithRetry(() -> {
adminClient.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(numPartitions)))
.all().get();
return null;
}, "create partitions for topic: " + topic);
} catch (Exception e) {
throw new RuntimeException("Failed to create partitions for topic: " + topic, e);
}
}

@Override
public void deleteRecordsBeforeOffset(String topic, int partition, long offset) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
try (AdminClient adminClient = createAdminClient()) {
runAdminWithRetry(() -> {
adminClient.deleteRecords(Collections.singletonMap(topicPartition, RecordsToDelete.beforeOffset(offset)))
.all().get();
return null;
}, "delete records before offset for topic: " + topic + ", partition: " + partition);
} catch (Exception e) {
throw new RuntimeException("Failed to delete records before offset for topic: " + topic
+ ", partition: " + partition + ", offset: " + offset, e);
}
}

private AdminClient createAdminClient() {
Properties props = new Properties();
props.put("bootstrap.servers", _bootstrapServers);
return AdminClient.create(props);
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic admin operations (createTopic/deleteTopic/createPartitions/deleteRecordsBeforeOffset) rely on _bootstrapServers being set, but they don't guard against being called before start() or after stop(). Consider adding an explicit started check (similar to getPort()) and throwing a clear IllegalStateException to avoid null-based failures and make misuse easier to diagnose.

Copilot uses AI. Check for mistakes.
xiangfu0 and others added 5 commits March 2, 2026 13:43
…integration tests

Replace KafkaServerStartable (Docker-based) with EmbeddedKafkaCluster using
Kafka's KafkaClusterTestKit for integration tests. This eliminates Docker
dependency, improves startup speed (~5s vs ~30-60s), and removes the primary
source of CI flakiness on GitHub Actions.

Key changes:
- Add EmbeddedKafkaCluster wrapping KafkaClusterTestKit in KRaft mode
- Simplify BaseClusterIntegrationTest Kafka lifecycle (no retry loop needed)
- Fix replication factor calculation for multi-broker embedded clusters
- Add kafka test-jar dependencies for embedded broker support

Verified: KafkaPartitionLevelConsumerTest (9 tests),
LLCRealtimeClusterIntegrationTest (2 brokers, non-transactional), and
ExactlyOnceKafkaRealtimeClusterIntegrationTest (3 brokers, transactional)
all pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…st set

Run ExactlyOnceKafkaRealtimeClusterIntegrationTest in its own integration
test set (set 3) to isolate the 3-broker transactional Kafka test from
the rest of set 1, preventing it from causing flakiness in that suite.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the complex afterMethod cleanup cycle (pause/drop-segments/
stop-kafka/restart-servers/start-kafka/resume) with a simpler
drop-table/recreate approach. The old approach broke after 2 cycles
because the controller/server state became stale. The new approach
uses Pinot's standard table lifecycle: drop the table (clears all
segments and server-side upsert state), delete/recreate the Kafka
topic, then recreate the table (triggers fresh consuming segment
creation via setUpNewTable).

Also add SegmentBuildTimeLeaseExtender.initExecutor() calls after
stopping extra servers in test methods to fix the static executor
NPE caused by extra servers' stop() nulling the shared executor.

Simplify BaseClusterIntegrationTest.stopKafka() to a simple loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…bedded broker

With 3 embedded KRaft brokers in the same JVM, the transaction
coordinator's async WriteTxnMarkers to partition leaders gets delayed
in resource-constrained CI, preventing the Last Stable Offset from
advancing. The read_committed consumer then sees zero documents.

Use 1 broker for transaction tests since exactly-once semantics
(aborted txns skipped, committed txns visible) don't require
multi-broker replication. Also remove unused serverProps map in
EmbeddedKafkaCluster and redundant getNumKafkaBrokers override.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… embedded KRaft

The root cause of the ExactlyOnce test failure is that
MetadataVersion.latestTesting() returns IBP_4_0_IV1 (Kafka 4.0
metadata) on a 3.9.x broker, enabling experimental protocol changes
(new transaction protocol, new consumer group protocol) that break
transaction marker propagation. The read_committed consumer never
sees committed records because the LSO fails to advance under the
unstable 4.0 metadata version.

Switch to latestProduction() which returns IBP_3_9_IV0, a stable
metadata version. Restore 3-broker configuration for transaction
tests to properly exercise multi-broker transaction semantics.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@xiangfu0 xiangfu0 force-pushed the flaky-kafka-startable branch from 655816f to d3f883b Compare March 2, 2026 21:44
xiangfu0 and others added 2 commits March 2, 2026 15:52
…l ID

The test was using two separate producers with different transactional IDs
for the abort and commit transactions. Since abort markers are written
asynchronously after abortTransaction() returns, the second producer's
committed data could be produced before abort markers reach the data
partitions, leaving the LSO stuck at offset 0 and making all records
invisible to read_committed consumers (count=0).

Fix: Use the same transactional ID for both producers. When the second
producer calls initTransactions(), the coordinator must complete the
first transaction's abort (including writing all abort markers) before
assigning a new epoch, guaranteeing the LSO has advanced.

Also reduce transaction.state.log.min.isr to 1 in EmbeddedKafkaCluster
for more robust testing with embedded KRaft brokers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…diagnostics

Use a single KafkaProducer for both abort and commit transactions instead
of two separate producers. With a single producer, the coordinator's state
machine ensures abort markers are fully written before the next transaction
can proceed (returns CONCURRENT_TRANSACTIONS until done).

Also elevate all diagnostic logging to WARN level so results are visible
in CI logs to help diagnose any remaining issues.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants