From cb3cb95d2ef392df28edc7279d55747534967862 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Fri, 13 Mar 2026 10:05:51 +0800 Subject: [PATCH 1/7] fix update_before issue --- .../BinlogDeserializationSchema.java | 14 +++++- .../source/emitter/FlinkRecordEmitter.java | 8 ++++ .../flink/source/split/SourceSplitState.java | 5 ++ .../fluss/flink/utils/BinlogRowConverter.java | 28 ++++++++--- .../flink/utils/BinlogRowConverterTest.java | 46 +++++++++++++++++++ 5 files changed, 93 insertions(+), 8 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java index 1114febb9c..b5d838fc3a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java @@ -46,6 +46,9 @@ public class BinlogDeserializationSchema implements FlussDeserializationSchema pendingUpdateBeforeMap = new HashMap<>(); + + private static final String DEFAULT_SPLIT_ID = "__default__"; /** Creates a new BinlogRowConverter. */ public BinlogRowConverter(RowType rowType) { @@ -60,6 +65,15 @@ public BinlogRowConverter(RowType rowType) { /** Converts a LogRecord to a binlog RowData with nested before/after structure. */ @Nullable public RowData toBinlogRowData(LogRecord record) { + return toBinlogRowData(record, DEFAULT_SPLIT_ID); + } + + /** + * Converts a LogRecord to a binlog RowData with nested before/after structure, using a + * split-specific buffer for UPDATE_BEFORE/UPDATE_AFTER pairing. + */ + @Nullable + public RowData toBinlogRowData(LogRecord record, String splitId) { ChangeType changeType = record.getChangeType(); switch (changeType) { @@ -72,13 +86,14 @@ public RowData toBinlogRowData(LogRecord record) { baseConverter.toFlinkRowData(record.getRow())); case UPDATE_BEFORE: - // Buffer the -U record and return null. + // Buffer the -U record per split and return null. // FlinkRecordEmitter.processAndEmitRecord() skips null results. - this.pendingUpdateBefore = record; + pendingUpdateBeforeMap.put(splitId, record); return null; case UPDATE_AFTER: - // Merge with the buffered -U record + // Merge with the buffered -U record for this split + LogRecord pendingUpdateBefore = pendingUpdateBeforeMap.remove(splitId); if (pendingUpdateBefore == null) { throw new IllegalStateException( "Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. " @@ -89,7 +104,6 @@ public RowData toBinlogRowData(LogRecord record) { // Use offset and timestamp from the -U record (first entry of update pair) long offset = pendingUpdateBefore.logOffset(); long timestamp = pendingUpdateBefore.timestamp(); - pendingUpdateBefore = null; return buildBinlogRow("update", offset, timestamp, beforeRow, afterRow); case DELETE: diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java index a09dd7c419..5de61de483 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java @@ -229,6 +229,52 @@ void testMultipleUpdatesInSequence() throws Exception { assertThat(result2.getLong(1)).isEqualTo(20L); // offset from second -U } + @Test + void testCrossSplitInterleavingDoesNotCorrupt() throws Exception { + String splitA = "log-1-0"; + String splitB = "log-1-1"; + + // Split A: send -U (buffered per split A) + RowData resultA1 = + converter.toBinlogRowData( + createLogRecord(ChangeType.UPDATE_BEFORE, 100L, 1000L, 1, "A-old", 100L), + splitA); + assertThat(resultA1).isNull(); + + // Split B: send -U then +U (should pair correctly within split B) + RowData resultB1 = + converter.toBinlogRowData( + createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2, "B-old", 200L), + splitB); + assertThat(resultB1).isNull(); + + RowData resultB2 = + converter.toBinlogRowData( + createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2, "B-new", 300L), + splitB); + assertThat(resultB2).isNotNull(); + assertThat(resultB2.getString(0)).isEqualTo(StringData.fromString("update")); + // Verify it paired with split B's -U + RowData beforeRowB = resultB2.getRow(3, 3); + assertThat(beforeRowB.getString(1).toString()).isEqualTo("B-old"); + RowData afterRowB = resultB2.getRow(4, 3); + assertThat(afterRowB.getString(1).toString()).isEqualTo("B-new"); + + // Split A: send +U (should pair with split A's buffered -U, not corrupted by split B) + RowData resultA2 = + converter.toBinlogRowData( + createLogRecord(ChangeType.UPDATE_AFTER, 101L, 1000L, 1, "A-new", 150L), + splitA); + assertThat(resultA2).isNotNull(); + assertThat(resultA2.getString(0)).isEqualTo(StringData.fromString("update")); + assertThat(resultA2.getLong(1)).isEqualTo(100L); // offset from split A's -U + // Verify it paired with split A's -U + RowData beforeRowA = resultA2.getRow(3, 3); + assertThat(beforeRowA.getString(1).toString()).isEqualTo("A-old"); + RowData afterRowA = resultA2.getRow(4, 3); + assertThat(afterRowA.getString(1).toString()).isEqualTo("A-new"); + } + private LogRecord createLogRecord( ChangeType changeType, long offset, long timestamp, int id, String name, long amount) throws Exception { From c0dc5236aa81b79aa829cea4f2b3f9d2640b0356 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 16 Mar 2026 17:59:03 +0800 Subject: [PATCH 2/7] [flink] Fix hybrid split binlog update pairing --- .../source/emitter/FlinkRecordEmitter.java | 19 +++--- .../fluss/flink/utils/BinlogRowConverter.java | 5 +- .../emitter/FlinkRecordEmitterTest.java | 63 +++++++++++++++++++ 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java index 366c8ee4b4..c9c177ade8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java @@ -64,6 +64,7 @@ public void emitRecord( ScanRecord scanRecord = recordAndPosition.record(); if (scanRecord.logOffset() >= 0) { + setCurrentSplitIdIfNeeded(splitState); // record is with a valid offset, means it's in incremental phase, // update the log offset hybridSnapshotLogSplitState.setNextOffset(scanRecord.logOffset() + 1); @@ -74,13 +75,7 @@ public void emitRecord( } processAndEmitRecord(scanRecord, sourceOutput); } else if (splitState.isLogSplitState()) { - // Set split context for BinlogDeserializationSchema to ensure per-split - // UPDATE_BEFORE/UPDATE_AFTER pairing when multiple bucket splits are - // processed by the same source reader. - if (deserializationSchema instanceof BinlogDeserializationSchema) { - ((BinlogDeserializationSchema) deserializationSchema) - .setCurrentSplitId(splitState.splitId()); - } + setCurrentSplitIdIfNeeded(splitState); // Attempt to process and emit the record. // For $binlog, this returns true only when a complete row (or the final part of // a split) is emitted. @@ -105,6 +100,16 @@ public void emitRecord( } } + private void setCurrentSplitIdIfNeeded(SourceSplitState splitState) { + // Set split context for BinlogDeserializationSchema to ensure per-split + // UPDATE_BEFORE/UPDATE_AFTER pairing when multiple splits are + // processed by the same source reader. + if (deserializationSchema instanceof BinlogDeserializationSchema) { + ((BinlogDeserializationSchema) deserializationSchema) + .setCurrentSplitId(splitState.splitId()); + } + } + /** * Processes and emits a record. * diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java index 9e97648ae4..dcbafbebe2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java @@ -54,7 +54,6 @@ public class BinlogRowConverter implements RecordToFlinkRowConverter { */ private final Map pendingUpdateBeforeMap = new HashMap<>(); - private static final String DEFAULT_SPLIT_ID = "__default__"; /** Creates a new BinlogRowConverter. */ public BinlogRowConverter(RowType rowType) { @@ -65,7 +64,7 @@ public BinlogRowConverter(RowType rowType) { /** Converts a LogRecord to a binlog RowData with nested before/after structure. */ @Nullable public RowData toBinlogRowData(LogRecord record) { - return toBinlogRowData(record, DEFAULT_SPLIT_ID); + return toBinlogRowData(record, null); } /** @@ -73,7 +72,7 @@ public RowData toBinlogRowData(LogRecord record) { * split-specific buffer for UPDATE_BEFORE/UPDATE_AFTER pairing. */ @Nullable - public RowData toBinlogRowData(LogRecord record, String splitId) { + public RowData toBinlogRowData(LogRecord record, @Nullable String splitId) { ChangeType changeType = record.getChangeType(); switch (changeType) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java index d2a647d041..bd27f77a2a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.flink.source.emitter; import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema; import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl; import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; import org.apache.fluss.flink.source.reader.RecordAndPos; @@ -36,6 +37,7 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -89,6 +91,67 @@ void testEmitRowDataRecordWithHybridSplitInSnapshotPhase() throws Exception { assertThat(results).isEqualTo(expectedResult); } + @Test + void testEmitBinlogRecordWithHybridSplitInIncrementalPhase() throws Exception { + RowType sourceOutputType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}, + new String[] {"id", "name", "amount"}); + + BinlogDeserializationSchema deserializationSchema = new BinlogDeserializationSchema(); + deserializationSchema.open(new DeserializerInitContextImpl(null, null, sourceOutputType)); + FlinkRecordEmitter emitter = new FlinkRecordEmitter<>(deserializationSchema); + + HybridSnapshotLogSplitState splitStateA = + new HybridSnapshotLogSplitState( + new HybridSnapshotLogSplit(new TableBucket(1L, 0), null, 0L, 0L)); + HybridSnapshotLogSplitState splitStateB = + new HybridSnapshotLogSplitState( + new HybridSnapshotLogSplit(new TableBucket(1L, 1), null, 0L, 0L)); + + TestSourceOutput sourceOutput = new TestSourceOutput<>(); + + emitter.emitRecord( + new RecordAndPos( + new ScanRecord(100L, 1000L, ChangeType.UPDATE_BEFORE, row(1, "A-old", 100L)), + 1L), + sourceOutput, + splitStateA); + emitter.emitRecord( + new RecordAndPos( + new ScanRecord(200L, 2000L, ChangeType.UPDATE_BEFORE, row(2, "B-old", 200L)), + 1L), + sourceOutput, + splitStateB); + emitter.emitRecord( + new RecordAndPos( + new ScanRecord(201L, 2000L, ChangeType.UPDATE_AFTER, row(2, "B-new", 300L)), + 1L), + sourceOutput, + splitStateB); + emitter.emitRecord( + new RecordAndPos( + new ScanRecord(101L, 1000L, ChangeType.UPDATE_AFTER, row(1, "A-new", 150L)), + 1L), + sourceOutput, + splitStateA); + + List results = sourceOutput.getRecords(); + assertThat(results).hasSize(2); + + RowData resultB = results.get(0); + assertThat(resultB.getString(0)).isEqualTo(StringData.fromString("update")); + assertThat(resultB.getLong(1)).isEqualTo(200L); + assertThat(resultB.getRow(3, 3).getString(1)).isEqualTo(StringData.fromString("B-old")); + assertThat(resultB.getRow(4, 3).getString(1)).isEqualTo(StringData.fromString("B-new")); + + RowData resultA = results.get(1); + assertThat(resultA.getString(0)).isEqualTo(StringData.fromString("update")); + assertThat(resultA.getLong(1)).isEqualTo(100L); + assertThat(resultA.getRow(3, 3).getString(1)).isEqualTo(StringData.fromString("A-old")); + assertThat(resultA.getRow(4, 3).getString(1)).isEqualTo(StringData.fromString("A-new")); + } + @Test void testEmitPojoRecordWithHybridSplitInSnapshotPhase() throws Exception { // Setup From 187cafc8c1c2708c75d448b0db16d05bd4115e7a Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 16 Mar 2026 19:03:27 +0800 Subject: [PATCH 3/7] [flink] Remove redundant binlog converter test --- .../flink/utils/BinlogRowConverterTest.java | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java index 5de61de483..a09dd7c419 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java @@ -229,52 +229,6 @@ void testMultipleUpdatesInSequence() throws Exception { assertThat(result2.getLong(1)).isEqualTo(20L); // offset from second -U } - @Test - void testCrossSplitInterleavingDoesNotCorrupt() throws Exception { - String splitA = "log-1-0"; - String splitB = "log-1-1"; - - // Split A: send -U (buffered per split A) - RowData resultA1 = - converter.toBinlogRowData( - createLogRecord(ChangeType.UPDATE_BEFORE, 100L, 1000L, 1, "A-old", 100L), - splitA); - assertThat(resultA1).isNull(); - - // Split B: send -U then +U (should pair correctly within split B) - RowData resultB1 = - converter.toBinlogRowData( - createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2, "B-old", 200L), - splitB); - assertThat(resultB1).isNull(); - - RowData resultB2 = - converter.toBinlogRowData( - createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2, "B-new", 300L), - splitB); - assertThat(resultB2).isNotNull(); - assertThat(resultB2.getString(0)).isEqualTo(StringData.fromString("update")); - // Verify it paired with split B's -U - RowData beforeRowB = resultB2.getRow(3, 3); - assertThat(beforeRowB.getString(1).toString()).isEqualTo("B-old"); - RowData afterRowB = resultB2.getRow(4, 3); - assertThat(afterRowB.getString(1).toString()).isEqualTo("B-new"); - - // Split A: send +U (should pair with split A's buffered -U, not corrupted by split B) - RowData resultA2 = - converter.toBinlogRowData( - createLogRecord(ChangeType.UPDATE_AFTER, 101L, 1000L, 1, "A-new", 150L), - splitA); - assertThat(resultA2).isNotNull(); - assertThat(resultA2.getString(0)).isEqualTo(StringData.fromString("update")); - assertThat(resultA2.getLong(1)).isEqualTo(100L); // offset from split A's -U - // Verify it paired with split A's -U - RowData beforeRowA = resultA2.getRow(3, 3); - assertThat(beforeRowA.getString(1).toString()).isEqualTo("A-old"); - RowData afterRowA = resultA2.getRow(4, 3); - assertThat(afterRowA.getString(1).toString()).isEqualTo("A-new"); - } - private LogRecord createLogRecord( ChangeType changeType, long offset, long timestamp, int id, String name, long amount) throws Exception { From d5e6818d1da3f6d3cf6bb5c51d9743ae58a4d89c Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 16 Mar 2026 19:09:54 +0800 Subject: [PATCH 4/7] [flink] Rename binlog hybrid split emitter test --- .../fluss/flink/source/emitter/FlinkRecordEmitterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java index bd27f77a2a..cc3621696e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java @@ -92,7 +92,7 @@ void testEmitRowDataRecordWithHybridSplitInSnapshotPhase() throws Exception { } @Test - void testEmitBinlogRecordWithHybridSplitInIncrementalPhase() throws Exception { + void testHybridIncrementalBinlogUpdatesDoNotCorruptAcrossSplits() throws Exception { RowType sourceOutputType = RowType.of( new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}, From 855071e1bd3164f796487fa9e67119569af4f331 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 16 Mar 2026 19:15:10 +0800 Subject: [PATCH 5/7] [flink] Tighten binlog hybrid split test comments --- .../fluss/flink/source/emitter/FlinkRecordEmitterTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java index cc3621696e..e9ee57ceb0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java @@ -111,6 +111,7 @@ void testHybridIncrementalBinlogUpdatesDoNotCorruptAcrossSplits() throws Excepti TestSourceOutput sourceOutput = new TestSourceOutput<>(); + // Interleave -U/+U across two hybrid splits and verify pairing stays split-scoped. emitter.emitRecord( new RecordAndPos( new ScanRecord(100L, 1000L, ChangeType.UPDATE_BEFORE, row(1, "A-old", 100L)), @@ -139,6 +140,7 @@ void testHybridIncrementalBinlogUpdatesDoNotCorruptAcrossSplits() throws Excepti List results = sourceOutput.getRecords(); assertThat(results).hasSize(2); + // Each emitted update should keep its own split's before/after rows and offsets. RowData resultB = results.get(0); assertThat(resultB.getString(0)).isEqualTo(StringData.fromString("update")); assertThat(resultB.getLong(1)).isEqualTo(200L); From e5029d79b766c4a7de7d91c93f1e45946a5214b0 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 16 Mar 2026 19:25:12 +0800 Subject: [PATCH 6/7] [flink] Fix binlog emitter test style --- .../fluss/flink/source/emitter/FlinkRecordEmitterTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java index e9ee57ceb0..bcd897edb3 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitterTest.java @@ -114,13 +114,15 @@ void testHybridIncrementalBinlogUpdatesDoNotCorruptAcrossSplits() throws Excepti // Interleave -U/+U across two hybrid splits and verify pairing stays split-scoped. emitter.emitRecord( new RecordAndPos( - new ScanRecord(100L, 1000L, ChangeType.UPDATE_BEFORE, row(1, "A-old", 100L)), + new ScanRecord( + 100L, 1000L, ChangeType.UPDATE_BEFORE, row(1, "A-old", 100L)), 1L), sourceOutput, splitStateA); emitter.emitRecord( new RecordAndPos( - new ScanRecord(200L, 2000L, ChangeType.UPDATE_BEFORE, row(2, "B-old", 200L)), + new ScanRecord( + 200L, 2000L, ChangeType.UPDATE_BEFORE, row(2, "B-old", 200L)), 1L), sourceOutput, splitStateB); From 354fefe450a557bdede45e4a639ff5e6654ec36e Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 16 Mar 2026 19:40:59 +0800 Subject: [PATCH 7/7] [flink] Fix binlog converter spacing --- .../java/org/apache/fluss/flink/utils/BinlogRowConverter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java index dcbafbebe2..7133ec8d3b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java @@ -54,7 +54,6 @@ public class BinlogRowConverter implements RecordToFlinkRowConverter { */ private final Map pendingUpdateBeforeMap = new HashMap<>(); - /** Creates a new BinlogRowConverter. */ public BinlogRowConverter(RowType rowType) { this.baseConverter = new FlussRowToFlinkRowConverter(rowType);