-
Notifications
You must be signed in to change notification settings - Fork 516
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
main (development)
Please describe the bug 🐞
When reading a $binlog table, the Flink source can fail with the following exception if one SourceReader processes records from multiple log splits and update events are interleaved across splits:
Cause: Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. This indicates a corrupted log sequence.
The root cause is that BinlogRowConverter keeps only one global pending UPDATE_BEFORE record. This works if records are strictly processed from a single split, but it breaks once the same reader interleaves records from different bucket log splits.
A problematic sequence looks like this:
- split A emits
UPDATE_BEFORE - split B emits
UPDATE_BEFORE - split B emits
UPDATE_AFTER - split A emits
UPDATE_AFTER
With a single global pending buffer:
- split B's
UPDATE_BEFOREoverwrites split A's pending state; - split B's
UPDATE_AFTERconsumes that global state; - split A's
UPDATE_AFTERno longer finds its matchingUPDATE_BEFOREand throws the exception above.
Depending on the exact interleaving, this can also temporarily pair the wrong before/after rows across splits before eventually failing.
This is especially relevant for $binlog on primary key tables with multiple buckets, where one reader may own multiple log splits at the same time.
Expected behavior
UPDATE_BEFORE / UPDATE_AFTER pairing should be isolated per split, so interleaving records from different splits does not corrupt update reconstruction.
Actual behavior
The pairing state is shared across splits, which can corrupt update reconstruction and fail the source task with:
Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record.
How to reproduce
A minimal reproduction is:
- Create a primary key table with multiple buckets.
- Read from the corresponding
$binlogtable with the Flink source. - Make one reader consume multiple log splits.
- Produce interleaved updates from different buckets/splits in the order
A:-U -> B:-U -> B:+U -> A:+U.
The reader can then fail because the pending UPDATE_BEFORE state is overwritten by another split.
Solution
Use split-scoped buffering for update pairing instead of a single global pending record.
Concretely:
- change
BinlogRowConverterfrom a single pendingUPDATE_BEFOREfield to aMap<splitId, LogRecord>; - pass the current split id through the deserialization path before converting a log record;
- merge
UPDATE_BEFORE/UPDATE_AFTERonly within the same split; - add regression tests for interleaved cross-split update sequences, ideally both at the converter level and through the emitter/deserializer path.
This keeps $binlog update reconstruction correct even when multiple splits are processed by the same source reader.
Are you willing to submit a PR?
- I'm willing to submit a PR!