diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java index 1114febb9c..b5d838fc3a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java @@ -46,6 +46,9 @@ public class BinlogDeserializationSchema implements FlussDeserializationSchema= 0) { + setCurrentSplitIdIfNeeded(splitState); // record is with a valid offset, means it's in incremental phase, // update the log offset hybridSnapshotLogSplitState.setNextOffset(scanRecord.logOffset() + 1); @@ -73,6 +75,7 @@ public void emitRecord( } processAndEmitRecord(scanRecord, sourceOutput); } else if (splitState.isLogSplitState()) { + setCurrentSplitIdIfNeeded(splitState); // Attempt to process and emit the record. // For $binlog, this returns true only when a complete row (or the final part of // a split) is emitted. @@ -97,6 +100,16 @@ public void emitRecord( } } + private void setCurrentSplitIdIfNeeded(SourceSplitState splitState) { + // Set split context for BinlogDeserializationSchema to ensure per-split + // UPDATE_BEFORE/UPDATE_AFTER pairing when multiple splits are + // processed by the same source reader. + if (deserializationSchema instanceof BinlogDeserializationSchema) { + ((BinlogDeserializationSchema) deserializationSchema) + .setCurrentSplitId(splitState.splitId()); + } + } + /** * Processes and emits a record. * diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java index 32f93491e9..ebe5f5d8ea 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java @@ -46,6 +46,11 @@ public final LogSplitState asLogSplitState() { return (LogSplitState) this; } + /** Returns the split ID without creating a new split object. */ + public String splitId() { + return split.splitId(); + } + public abstract SourceSplitBase toSourceSplit(); public boolean isLakeSplit() { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java index 3a9f2c9411..7133ec8d3b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java @@ -34,7 +34,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * A converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} with nested @@ -46,10 +48,11 @@ public class BinlogRowConverter implements RecordToFlinkRowConverter { private final org.apache.flink.table.types.logical.RowType producedType; /** - * Buffer for the UPDATE_BEFORE (-U) record pending merge with the next UPDATE_AFTER (+U) - * record. Null when no update is in progress. + * Per-split buffer for UPDATE_BEFORE (-U) records pending merge with the next UPDATE_AFTER (+U) + * record. Keyed by split ID to prevent cross-split state corruption when multiple bucket splits + * are processed by the same source reader. */ - @Nullable private LogRecord pendingUpdateBefore; + private final Map pendingUpdateBeforeMap = new HashMap<>(); /** Creates a new BinlogRowConverter. */ public BinlogRowConverter(RowType rowType) { @@ -60,6 +63,15 @@ public BinlogRowConverter(RowType rowType) { /** Converts a LogRecord to a binlog RowData with nested before/after structure. */ @Nullable public RowData toBinlogRowData(LogRecord record) { + return toBinlogRowData(record, null); + } + + /** + * Converts a LogRecord to a binlog RowData with nested before/after structure, using a + * split-specific buffer for UPDATE_BEFORE/UPDATE_AFTER pairing. + */ + @Nullable + public RowData toBinlogRowData(LogRecord record, @Nullable String splitId) { ChangeType changeType = record.getChangeType(); switch (changeType) { @@ -72,13 +84,14 @@ public RowData toBinlogRowData(LogRecord record) { baseConverter.toFlinkRowData(record.getRow())); case UPDATE_BEFORE: - // Buffer the -U record and return null. + // Buffer the -U record per split and return null. // FlinkRecordEmitter.processAndEmitRecord() skips null results. - this.pendingUpdateBefore = record; + pendingUpdateBeforeMap.put(splitId, record); return null; case UPDATE_AFTER: - // Merge with the buffered -U record + // Merge with the buffered -U record for this split + LogRecord pendingUpdateBefore = pendingUpdateBeforeMap.remove(splitId); if (pendingUpdateBefore == null) { throw new IllegalStateException( "Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. " @@ -89,7 +102,6 @@ public RowData toBinlogRowData(LogRecord record) { // Use offset and timestamp from the -U record (first entry of update pair) long offset = pendingUpdateBefore.logOffset(); long timestamp = pendingUpdateBefore.timestamp(); - pendingUpdateBefore = null; return buildBinlogRow("update", offset, timestamp, beforeRow, afterRow); case DELETE: diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java index d2a647d041..bcd897edb3 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.flink.source.emitter; import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema; import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl; import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; import org.apache.fluss.flink.source.reader.RecordAndPos; @@ -36,6 +37,7 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -89,6 +91,71 @@ void testEmitRowDataRecordWithHybridSplitInSnapshotPhase() throws Exception { assertThat(results).isEqualTo(expectedResult); } + @Test + void testHybridIncrementalBinlogUpdatesDoNotCorruptAcrossSplits() throws Exception { + RowType sourceOutputType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}, + new String[] {"id", "name", "amount"}); + + BinlogDeserializationSchema deserializationSchema = new BinlogDeserializationSchema(); + deserializationSchema.open(new DeserializerInitContextImpl(null, null, sourceOutputType)); + FlinkRecordEmitter emitter = new FlinkRecordEmitter<>(deserializationSchema); + + HybridSnapshotLogSplitState splitStateA = + new HybridSnapshotLogSplitState( + new HybridSnapshotLogSplit(new TableBucket(1L, 0), null, 0L, 0L)); + HybridSnapshotLogSplitState splitStateB = + new HybridSnapshotLogSplitState( + new HybridSnapshotLogSplit(new TableBucket(1L, 1), null, 0L, 0L)); + + TestSourceOutput sourceOutput = new TestSourceOutput<>(); + + // Interleave -U/+U across two hybrid splits and verify pairing stays split-scoped. + emitter.emitRecord( + new RecordAndPos( + new ScanRecord( + 100L, 1000L, ChangeType.UPDATE_BEFORE, row(1, "A-old", 100L)), + 1L), + sourceOutput, + splitStateA); + emitter.emitRecord( + new RecordAndPos( + new ScanRecord( + 200L, 2000L, ChangeType.UPDATE_BEFORE, row(2, "B-old", 200L)), + 1L), + sourceOutput, + splitStateB); + emitter.emitRecord( + new RecordAndPos( + new ScanRecord(201L, 2000L, ChangeType.UPDATE_AFTER, row(2, "B-new", 300L)), + 1L), + sourceOutput, + splitStateB); + emitter.emitRecord( + new RecordAndPos( + new ScanRecord(101L, 1000L, ChangeType.UPDATE_AFTER, row(1, "A-new", 150L)), + 1L), + sourceOutput, + splitStateA); + + List results = sourceOutput.getRecords(); + assertThat(results).hasSize(2); + + // Each emitted update should keep its own split's before/after rows and offsets. + RowData resultB = results.get(0); + assertThat(resultB.getString(0)).isEqualTo(StringData.fromString("update")); + assertThat(resultB.getLong(1)).isEqualTo(200L); + assertThat(resultB.getRow(3, 3).getString(1)).isEqualTo(StringData.fromString("B-old")); + assertThat(resultB.getRow(4, 3).getString(1)).isEqualTo(StringData.fromString("B-new")); + + RowData resultA = results.get(1); + assertThat(resultA.getString(0)).isEqualTo(StringData.fromString("update")); + assertThat(resultA.getLong(1)).isEqualTo(100L); + assertThat(resultA.getRow(3, 3).getString(1)).isEqualTo(StringData.fromString("A-old")); + assertThat(resultA.getRow(4, 3).getString(1)).isEqualTo(StringData.fromString("A-new")); + } + @Test void testEmitPojoRecordWithHybridSplitInSnapshotPhase() throws Exception { // Setup