Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class BinlogDeserializationSchema implements FlussDeserializationSchema<R
*/
private transient BinlogRowConverter converter;

/** The current split ID used for per-split UPDATE_BEFORE/UPDATE_AFTER pairing. */
private transient String currentSplitId;

/** Creates a new BinlogDeserializationSchema. */
public BinlogDeserializationSchema() {}

Expand All @@ -57,6 +60,15 @@ public void open(InitializationContext context) throws Exception {
}
}

/**
* Sets the current split ID for per-split UPDATE_BEFORE/UPDATE_AFTER pairing. Must be called
* before {@link #deserialize(LogRecord)} when records from multiple splits are interleaved
* through the same deserializer instance.
*/
public void setCurrentSplitId(String splitId) {
this.currentSplitId = splitId;
}

/**
* Deserializes a {@link LogRecord} into a Flink {@link RowData} object with nested before/after
* structure.
Expand All @@ -68,7 +80,7 @@ public RowData deserialize(LogRecord record) throws Exception {
throw new IllegalStateException(
"Converter not initialized. The open() method must be called before deserializing records.");
}
return converter.toBinlogRowData(record);
return converter.toBinlogRowData(record, currentSplitId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.flink.lake.LakeRecordRecordEmitter;
import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
import org.apache.fluss.flink.source.reader.FlinkSourceReader;
import org.apache.fluss.flink.source.reader.RecordAndPos;
Expand Down Expand Up @@ -63,6 +64,7 @@ public void emitRecord(

ScanRecord scanRecord = recordAndPosition.record();
if (scanRecord.logOffset() >= 0) {
setCurrentSplitIdIfNeeded(splitState);
// record is with a valid offset, means it's in incremental phase,
// update the log offset
hybridSnapshotLogSplitState.setNextOffset(scanRecord.logOffset() + 1);
Expand All @@ -73,6 +75,7 @@ public void emitRecord(
}
processAndEmitRecord(scanRecord, sourceOutput);
} else if (splitState.isLogSplitState()) {
setCurrentSplitIdIfNeeded(splitState);
// Attempt to process and emit the record.
// For $binlog, this returns true only when a complete row (or the final part of
// a split) is emitted.
Expand All @@ -97,6 +100,16 @@ public void emitRecord(
}
}

private void setCurrentSplitIdIfNeeded(SourceSplitState splitState) {
// Set split context for BinlogDeserializationSchema to ensure per-split
// UPDATE_BEFORE/UPDATE_AFTER pairing when multiple splits are
// processed by the same source reader.
if (deserializationSchema instanceof BinlogDeserializationSchema) {
((BinlogDeserializationSchema) deserializationSchema)
.setCurrentSplitId(splitState.splitId());
}
}

/**
* Processes and emits a record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public final LogSplitState asLogSplitState() {
return (LogSplitState) this;
}

/** Returns the split ID without creating a new split object. */
public String splitId() {
return split.splitId();
}

public abstract SourceSplitBase toSourceSplit();

public boolean isLakeSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} with nested
Expand All @@ -46,10 +48,11 @@ public class BinlogRowConverter implements RecordToFlinkRowConverter {
private final org.apache.flink.table.types.logical.RowType producedType;

/**
* Buffer for the UPDATE_BEFORE (-U) record pending merge with the next UPDATE_AFTER (+U)
* record. Null when no update is in progress.
* Per-split buffer for UPDATE_BEFORE (-U) records pending merge with the next UPDATE_AFTER (+U)
* record. Keyed by split ID to prevent cross-split state corruption when multiple bucket splits
* are processed by the same source reader.
*/
@Nullable private LogRecord pendingUpdateBefore;
private final Map<String, LogRecord> pendingUpdateBeforeMap = new HashMap<>();

/** Creates a new BinlogRowConverter. */
public BinlogRowConverter(RowType rowType) {
Expand All @@ -60,6 +63,15 @@ public BinlogRowConverter(RowType rowType) {
/** Converts a LogRecord to a binlog RowData with nested before/after structure. */
@Nullable
public RowData toBinlogRowData(LogRecord record) {
return toBinlogRowData(record, null);
}

/**
* Converts a LogRecord to a binlog RowData with nested before/after structure, using a
* split-specific buffer for UPDATE_BEFORE/UPDATE_AFTER pairing.
*/
@Nullable
public RowData toBinlogRowData(LogRecord record, @Nullable String splitId) {
ChangeType changeType = record.getChangeType();

switch (changeType) {
Expand All @@ -72,13 +84,14 @@ public RowData toBinlogRowData(LogRecord record) {
baseConverter.toFlinkRowData(record.getRow()));

case UPDATE_BEFORE:
// Buffer the -U record and return null.
// Buffer the -U record per split and return null.
// FlinkRecordEmitter.processAndEmitRecord() skips null results.
this.pendingUpdateBefore = record;
pendingUpdateBeforeMap.put(splitId, record);
return null;

case UPDATE_AFTER:
// Merge with the buffered -U record
// Merge with the buffered -U record for this split
LogRecord pendingUpdateBefore = pendingUpdateBeforeMap.remove(splitId);
if (pendingUpdateBefore == null) {
throw new IllegalStateException(
"Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. "
Expand All @@ -89,7 +102,6 @@ public RowData toBinlogRowData(LogRecord record) {
// Use offset and timestamp from the -U record (first entry of update pair)
long offset = pendingUpdateBefore.logOffset();
long timestamp = pendingUpdateBefore.timestamp();
pendingUpdateBefore = null;
return buildBinlogRow("update", offset, timestamp, beforeRow, afterRow);

case DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.flink.source.emitter;

import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl;
import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
import org.apache.fluss.flink.source.reader.RecordAndPos;
Expand All @@ -36,6 +37,7 @@
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -89,6 +91,71 @@ void testEmitRowDataRecordWithHybridSplitInSnapshotPhase() throws Exception {
assertThat(results).isEqualTo(expectedResult);
}

@Test
void testHybridIncrementalBinlogUpdatesDoNotCorruptAcrossSplits() throws Exception {
RowType sourceOutputType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()},
new String[] {"id", "name", "amount"});

BinlogDeserializationSchema deserializationSchema = new BinlogDeserializationSchema();
deserializationSchema.open(new DeserializerInitContextImpl(null, null, sourceOutputType));
FlinkRecordEmitter<RowData> emitter = new FlinkRecordEmitter<>(deserializationSchema);

HybridSnapshotLogSplitState splitStateA =
new HybridSnapshotLogSplitState(
new HybridSnapshotLogSplit(new TableBucket(1L, 0), null, 0L, 0L));
HybridSnapshotLogSplitState splitStateB =
new HybridSnapshotLogSplitState(
new HybridSnapshotLogSplit(new TableBucket(1L, 1), null, 0L, 0L));

TestSourceOutput<RowData> sourceOutput = new TestSourceOutput<>();

// Interleave -U/+U across two hybrid splits and verify pairing stays split-scoped.
emitter.emitRecord(
new RecordAndPos(
new ScanRecord(
100L, 1000L, ChangeType.UPDATE_BEFORE, row(1, "A-old", 100L)),
1L),
sourceOutput,
splitStateA);
emitter.emitRecord(
new RecordAndPos(
new ScanRecord(
200L, 2000L, ChangeType.UPDATE_BEFORE, row(2, "B-old", 200L)),
1L),
sourceOutput,
splitStateB);
emitter.emitRecord(
new RecordAndPos(
new ScanRecord(201L, 2000L, ChangeType.UPDATE_AFTER, row(2, "B-new", 300L)),
1L),
sourceOutput,
splitStateB);
emitter.emitRecord(
new RecordAndPos(
new ScanRecord(101L, 1000L, ChangeType.UPDATE_AFTER, row(1, "A-new", 150L)),
1L),
sourceOutput,
splitStateA);

List<RowData> results = sourceOutput.getRecords();
assertThat(results).hasSize(2);

// Each emitted update should keep its own split's before/after rows and offsets.
RowData resultB = results.get(0);
assertThat(resultB.getString(0)).isEqualTo(StringData.fromString("update"));
assertThat(resultB.getLong(1)).isEqualTo(200L);
assertThat(resultB.getRow(3, 3).getString(1)).isEqualTo(StringData.fromString("B-old"));
assertThat(resultB.getRow(4, 3).getString(1)).isEqualTo(StringData.fromString("B-new"));

RowData resultA = results.get(1);
assertThat(resultA.getString(0)).isEqualTo(StringData.fromString("update"));
assertThat(resultA.getLong(1)).isEqualTo(100L);
assertThat(resultA.getRow(3, 3).getString(1)).isEqualTo(StringData.fromString("A-old"));
assertThat(resultA.getRow(4, 3).getString(1)).isEqualTo(StringData.fromString("A-new"));
}

@Test
void testEmitPojoRecordWithHybridSplitInSnapshotPhase() throws Exception {
// Setup
Expand Down
Loading