-
Notifications
You must be signed in to change notification settings - Fork 516
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
Background
Semi-structured data (e.g., JSON) is increasingly common in modern data pipelines. Many query engines and storage systems (such as Apache Spark, Apache Iceberg, and Apache Paimon) have adopted a VARIANT data type to efficiently represent and query semi-structured data using a compact binary encoding, rather than storing raw JSON strings.
Currently, Fluss treats VARIANT internally as plain byte[], which has several limitations:
- Loss of semantic structure: A single
byte[]conflates the variant's value and metadata (string dictionary) into one opaque blob. Downstream consumers must know the internal wire format ([4-byte value length][value bytes][metadata bytes]) to decode it correctly. - Inconsistent API: All other complex types in Fluss (e.g.,
InternalArray,InternalMap,InternalRow) have dedicated first-class types in the row infrastructure, while VARIANT does not. - Poor interoperability with lake formats: When writing to lake formats (Paimon, Iceberg, Lance), the VARIANT data must be split into separate
valueandmetadatacomponents. Usingbyte[]forces every integration point to re-implement the split/merge logic. - No alignment with industry standards: Apache Paimon has already introduced a full
Variantinterface withvalue()andmetadata()accessors, following the Variant Binary Encoding spec. Fluss should align with this design for ecosystem consistency.
Use Case
- Users ingesting JSON or semi-structured data into Fluss tables should benefit from efficient binary encoding and per-path access without full deserialization.
- Lake connector writers (Paimon, Iceberg, Lance) need structured access to
valueandmetadataseparately. - A first-class
Varianttype enables future optimizations like predicate pushdown on variant paths.
Solution
Proposed Design
Introduce a first-class Variant interface and GenericVariant implementation throughout Fluss's row infrastructure, following the same pattern as Apache Paimon's Variant design.
1. Core Types
-
Variantinterface (fluss-common/.../row/Variant.java)byte[] value()— returns the binary-encoded variant value (header + data)byte[] metadata()— returns the string dictionary (version + deduplicated object key names)long sizeInBytes()— total byte sizeVariant copy()— deep copy- Static helpers:
bytesToVariant(byte[])andvariantToBytes(Variant)for backward-compatible wire format conversion
-
GenericVariantclass (fluss-common/.../row/GenericVariant.java)- Implements
VariantandSerializable - Stores two
byte[]fields:valueandmetadata - Proper
equals(),hashCode(),toString()
- Implements
2. Row Infrastructure Changes
| Layer | Change |
|---|---|
| DataGetters | Add Variant getVariant(int pos) |
| BinaryWriter | Add writeVariant(int pos, Variant value) |
| All InternalRow implementations | Implement getVariant() — GenericRow, BinaryRow, CompactedRow, IndexedRow, ProjectedRow, PaddingRow, ColumnarRow, etc. |
| All InternalArray implementations | Implement getVariant() — GenericArray, BinaryArray, ColumnarArray |
| Readers/Writers | CompactedRowReader/Writer, IndexedRowReader/Writer — add readVariant()/writeVariant(Variant) |
3. Binary Storage Format (Backward Compatible)
The on-wire format remains unchanged for compatibility:
Variant.variantToBytes() and Variant.bytesToVariant() handle the conversion.
4. Integration Points
- Lake connectors (Paimon, Iceberg, Lance): Encoders/decoders use
Variantdirectly instead of rawbyte[] - Flink bridge:
FlussRowToFlinkRowConverterconvertsVariant→byte[]for Flink compatibility - Client converters:
PojoToRowConverter/RowToPojoConvertersupport bothbyte[]andVariantinputs - Utilities:
InternalRowUtils,TypeUtils,PartitionUtilsupdated accordingly
5. References
- Variant Binary Encoding Spec (Parquet)
- Apache Paimon Variant Implementation
- Apache Spark VARIANT FLIP
Anything else?
No response
Willingness to contribute
- I'm willing to submit a PR!