prevent excessive buffer allocation in stream marshal/unmarshal#166
prevent excessive buffer allocation in stream marshal/unmarshal#166
Conversation
| newBuf, err := marshaller.MarshalSSZTo(encoder.GetBuffer()) | ||
| if err != nil { | ||
| return err | ||
| if int(sourceType.Size) <= encoder.MaxEncodeBufferSize() || sourceType.SszType == ssztypes.SszCustomType { |
| newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer()) | ||
| if err != nil { | ||
| return err | ||
| if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) { |
| newBuf, err := marshalFn(ctx.ds, encoder.GetBuffer()) | ||
| if err != nil { | ||
| return true, err | ||
| if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) { |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #166 +/- ##
==========================================
- Coverage 95.75% 95.70% -0.05%
==========================================
Files 47 47
Lines 10921 10947 +26
==========================================
+ Hits 10457 10477 +20
- Misses 290 296 +6
Partials 174 174
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR aims to preserve the benefits of streaming SSZ marshal/unmarshal by preventing the reflection layer from delegating to buffer-based (whole-object) marshal/unmarshal methods when doing so would force large temporary allocations.
Changes:
- Added
MaxEncodeBufferSize()/MaxDecodeBufferSize()toEncoder/Decoderand implemented them for buffer vs stream encoders/decoders. - Split StreamEncoder/StreamDecoder configuration into (1) internal I/O buffer size and (2) max delegation buffer size (default 200KB).
- Added delegation guards in reflection marshal/unmarshal paths and new DynSsz options to configure the max delegation buffer.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| sszutils/encoder.go | Extends Encoder interface with max-delegation buffer sizing. |
| sszutils/encoder_stream.go | Adds max delegation buffer config + new StreamEncoder constructor signature. |
| sszutils/encoder_buffer.go | Implements MaxEncodeBufferSize() as unlimited for in-memory encoding. |
| sszutils/decoder.go | Extends Decoder interface with max-delegation buffer sizing. |
| sszutils/decoder_stream.go | Splits internal buffer size vs max delegation buffer size + new constructor signature. |
| sszutils/decoder_buffer.go | Implements MaxDecodeBufferSize() as unlimited for in-memory decoding. |
| reflection/marshal.go | Adds size-based guards to avoid large buffer-based delegation on stream encoders. |
| reflection/unmarshal.go | Adds size-based guards to avoid large buffer-based delegation on stream decoders. |
| options.go | Adds DynSsz options for stream max delegation buffer sizes. |
| dynssz.go | Wires new stream encoder/decoder constructor args from DynSsz options. |
| sszutils/stream_test.go | Updates constructor call sites to new signatures. |
| reflection/overflow_internal_test.go | Updates StreamEncoder constructor call site. |
| reflection/marshal_test.go | Updates StreamEncoder constructor call site. |
| docs/streaming.md | Updates docs for new constructor signatures and max-delegation buffer behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if useFastSsz { | ||
| if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.FastsszMarshaler); ok { | ||
| newBuf, err := marshaller.MarshalSSZTo(encoder.GetBuffer()) | ||
| if err != nil { | ||
| return err | ||
| if int(sourceType.Size) <= encoder.MaxEncodeBufferSize() || sourceType.SszType == ssztypes.SszCustomType { | ||
| newBuf, err := marshaller.MarshalSSZTo(encoder.GetBuffer()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| encoder.SetBuffer(newBuf) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The FastSSZ delegation guard uses sourceType.Size to decide whether to call MarshalSSZTo, but for dynamic containers TypeDescriptor.Size is typically 0 (unknown), which will always pass the <= MaxEncodeBufferSize() check and still trigger a full temporary buffer allocation on stream encoders (the issue this PR is trying to prevent). Consider using the fastssz SizeSSZ() result (with an overflow check) when sourceType.Size == 0 (and encoder is non-seekable), or conservatively skipping delegation when the size is unknown.
|
|
||
| if useDynamicMarshal { | ||
| if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.DynamicMarshaler); ok { | ||
| newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer()) | ||
| if err != nil { | ||
| return err | ||
| if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) { | ||
| newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| encoder.SetBuffer(newBuf) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
This block converts sourceType.Size (uint32) to int when comparing against MaxEncodeBufferSize() without guarding against platform overflow. On 32-bit platforms (or very large size hints), int(sourceType.Size) can wrap and incorrectly allow delegation. Consider checking sourceType.Size > math.MaxInt first (returning ErrPlatformOverflowFn) or performing comparisons in int64/uint64 safely.
| if useViewMarshaler { | ||
| if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.DynamicViewMarshaler); ok { | ||
| if marshalFn := marshaller.MarshalSSZDynView(*sourceType.CodegenInfo); marshalFn != nil { | ||
| newBuf, err := marshalFn(ctx.ds, encoder.GetBuffer()) | ||
| if err != nil { | ||
| return true, err | ||
| if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) { | ||
| newBuf, err := marshalFn(ctx.ds, encoder.GetBuffer()) | ||
| if err != nil { | ||
| return true, err | ||
| } | ||
| encoder.SetBuffer(newBuf) | ||
| return true, nil |
There was a problem hiding this comment.
Same potential uint32 -> int overflow issue here: int(sourceType.Size) is used implicitly via int(...) <= MaxEncodeBufferSize() in the condition. To avoid incorrect delegation decisions on platforms where int is smaller than uint32, add an explicit overflow guard (or compare in a wider type) before converting.
| @@ -94,12 +96,14 @@ func (ctx *ReflectionCtx) marshalType(sourceType *ssztypes.TypeDescriptor, sourc | |||
|
|
|||
| if useDynamicMarshal { | |||
| if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.DynamicMarshaler); ok { | |||
| newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer()) | |||
| if err != nil { | |||
| return err | |||
| if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) { | |||
| newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer()) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| encoder.SetBuffer(newBuf) | |||
| return nil | |||
| } | |||
There was a problem hiding this comment.
The new MaxEncodeBufferSize()-based delegation behavior (skipping buffer-based marshal methods on stream encoders when the payload is large) doesn’t appear to be covered by tests. Adding a unit test that asserts large values do not invoke FastsszMarshaler/DynamicMarshaler on StreamEncoder (but still do on BufferEncoder) would help prevent regressions.
| if useViewUnmarshaler { | ||
| if unmarshaler, ok := targetValue.Addr().Interface().(sszutils.DynamicViewUnmarshaler); ok { | ||
| if unmarshalFn := unmarshaler.UnmarshalSSZDynView(*targetType.CodegenInfo); unmarshalFn != nil { | ||
| bufLen := decoder.GetLength() | ||
| buf, err := decoder.DecodeBytesBuf(bufLen) | ||
| if err != nil { | ||
| return err | ||
| if bufLen <= decoder.MaxDecodeBufferSize() { | ||
| buf, err := decoder.DecodeBytesBuf(bufLen) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return unmarshalFn(ctx.ds, buf) | ||
| } | ||
| return unmarshalFn(ctx.ds, buf) | ||
| } |
There was a problem hiding this comment.
The new MaxDecodeBufferSize()-based delegation guards in the view/fastssz/dynamic unmarshaler paths don’t appear to have test coverage. Consider adding a test where a type implements Dynamic(Un)marshaler (or view unmarshaler) and the input length is above/below the threshold to verify delegation is correctly skipped only when expected.
Benchmark ResultsLibrary BenchmarksPerformance Benchmarks |
Prevent excessive buffer allocation in stream marshal/unmarshal
Problem
When using
MarshalSSZWriterorUnmarshalSSZReaderwith types that have generated buffer-based methods (DynamicMarshaler/DynamicUnmarshaler) but no streaming methods (DynamicEncoder/DynamicDecoder), the reflection layer immediately delegates to the buffer-based method for the entire object. For large types like BeaconState (200MB+), this defeats the purpose of streaming by allocating the full object size into a temporary buffer.Solution
Introduce a max delegation buffer size threshold (default 200KB) that controls when the stream encoder/decoder skips buffer-based delegation and falls through to reflection-based field-by-field processing instead. This is decoupled from the existing initial buffer size (default 2KB) which controls the internal I/O buffer.
When a type's serialized size exceeds the max delegation buffer, individual fields are processed via reflection — each field may still delegate to buffer-based methods if it fits within the threshold, so only the top-level container avoids the large allocation.
Changes
New
Decoder/Encoderinterface methods:MaxDecodeBufferSize() int— returns the configured max buffer for delegation decisionsMaxEncodeBufferSize() int— returns the configured max buffer for delegation decisionsBufferDecoder/BufferEncoderreturnmath.MaxInt(data already in memory, no concern)StreamDecoder/StreamEncoderreturn the configured max (default 200KB)Decoupled buffer sizes in
StreamDecoderandStreamEncoder:NewStreamDecoder(reader, totalLen, bufSize, maxBufSize)— separate initial read buffer from max delegation bufferNewStreamEncoder(writer, bufSize, maxBufSize)— separate internal write buffer from max delegation bufferDefaultStreamDecoderMaxBufSize,DefaultStreamEncoderMaxBufSize(200KB)New configuration options:
WithStreamReaderMaxBufferSize(size)— controls max delegation buffer forUnmarshalSSZReaderWithStreamWriterMaxBufferSize(size)— controls max delegation buffer forMarshalSSZWriterGuard checks in
reflection/unmarshal.go(3 delegation points):DynamicViewUnmarshaler: skip ifbufLen > decoder.MaxDecodeBufferSize()FastsszUnmarshaler: skip ifsszLen > decoder.MaxDecodeBufferSize()(exceptSszCustomType)DynamicUnmarshaler: skip ifsszLen > decoder.MaxDecodeBufferSize()Guard checks in
reflection/marshal.go(3 delegation points):FastsszMarshaler: skip ifsourceType.Size > encoder.MaxEncodeBufferSize()(exceptSszCustomType)DynamicMarshaler: skip unless encoder is seekable or static size fits within limitDynamicViewMarshaler: skip unless encoder is seekable or static size fits within limitFor the marshal path, dynamic types (unknown output size) on stream encoders conservatively skip delegation since the output size cannot be determined without marshalling. Buffer-based encoders (
Seekable() == true) always delegate since the data is already in memory.Breaking changes
NewStreamDecodersignature changed from(reader, totalLen, maxBufSize)to(reader, totalLen, bufSize, maxBufSize)NewStreamEncodersignature changed from(writer, bufSize)to(writer, bufSize, maxBufSize)0for defaults need an additional0argument