Skip to content

[flink] Fix $binlog update pairing across log splits#2880

Draft
luoyuxia wants to merge 7 commits intoapache:release-0.9from
luoyuxia:fix-binlog-update-pairing-release-0.9
Draft

[flink] Fix $binlog update pairing across log splits#2880
luoyuxia wants to merge 7 commits intoapache:release-0.9from
luoyuxia:fix-binlog-update-pairing-release-0.9

Conversation

@luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Mar 16, 2026

Purpose

Linked issue: close #2879

When one Flink SourceReader processes records from multiple log splits, $binlog
can interleave UPDATE_BEFORE / UPDATE_AFTER records from different splits.
The previous implementation kept only one global pending UPDATE_BEFORE record,
which could be overwritten by another split and then fail with:

Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record.

Brief change log

  • keep pending UPDATE_BEFORE state per split instead of using one global buffer;
  • pass the current split id through the emitter/deserialization path for $binlog;
  • pair UPDATE_BEFORE and UPDATE_AFTER only within the same split;
  • add a regression test for cross-split interleaving in BinlogRowConverterTest.

Tests

  • Added BinlogRowConverterTest#testCrossSplitInterleavingDoesNotCorrupt
  • Not run locally

Also verify in production env

API and Format

No API or storage format changes.

Documentation

No.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves correctness of the Flink $binlog virtual table conversion by preventing UPDATE_BEFORE/UPDATE_AFTER pairing from being corrupted when records from multiple log splits are interleaved within the same source reader.

Changes:

  • Add per-split buffering in BinlogRowConverter and a split-aware toBinlogRowData(record, splitId) overload.
  • Propagate split context to BinlogDeserializationSchema via FlinkRecordEmitter (using a new SourceSplitState.splitId() accessor).
  • Add a unit test covering cross-split interleaving behavior.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java Adds a test ensuring interleaved splits don’t cross-pair update before/after images.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java Switches update buffering from a single slot to a per-split map and adds a split-aware conversion API.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitState.java Adds a splitId() accessor to avoid reconstructing split objects just to get the id.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java Sets the current split id into the binlog deserializer before deserializing log records.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java Tracks the current split id and routes records to the split-aware converter method.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia luoyuxia marked this pull request as draft March 16, 2026 09:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants