Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Creates a new stream encoder:
```go
import "github.com/pk910/dynamic-ssz/sszutils"

encoder := sszutils.NewStreamEncoder(writer, 0) // 0 = default 2KB buffer
encoder := sszutils.NewStreamEncoder(writer, 0, 0) // 0 = default 2KB write buffer, 0 = default 200KB max delegation buffer
```

The `StreamEncoder`:
Expand All @@ -182,7 +182,7 @@ The `StreamEncoder`:
Creates a new stream decoder:

```go
decoder := sszutils.NewStreamDecoder(reader, totalSize, 0) // 0 = default 2KB buffer
decoder := sszutils.NewStreamDecoder(reader, totalSize, 0, 0) // 0 = default 2KB read buffer, 0 = default 200KB max delegation buffer
```

The `StreamDecoder`:
Expand Down
4 changes: 2 additions & 2 deletions dynssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (d *DynSsz) MarshalSSZTo(source any, buf []byte, opts ...CallOption) ([]byt
// err = ds.MarshalSSZWriter(block, conn)
func (d *DynSsz) MarshalSSZWriter(source any, w io.Writer, opts ...CallOption) error {
cfg := applyCallOptions(opts)
encoder := sszutils.NewStreamEncoder(w, d.options.StreamWriterBufferSize)
encoder := sszutils.NewStreamEncoder(w, d.options.StreamWriterBufferSize, d.options.StreamWriterMaxBufferSize)

// Skip view descriptor logic for types implementing DynamicEncoder
if cfg == nil || cfg.viewDescriptor == nil {
Expand Down Expand Up @@ -662,7 +662,7 @@ func (d *DynSsz) UnmarshalSSZ(target any, ssz []byte, opts ...CallOption) error
// err = ds.UnmarshalSSZReader(&block, conn, -1)
func (d *DynSsz) UnmarshalSSZReader(target any, r io.Reader, size int, opts ...CallOption) error {
cfg := applyCallOptions(opts)
decoder := sszutils.NewStreamDecoder(r, size, d.options.StreamReaderBufferSize)
decoder := sszutils.NewStreamDecoder(r, size, d.options.StreamReaderBufferSize, d.options.StreamReaderMaxBufferSize)
decoder.PushLimit(size)

// Skip view descriptor logic for types implementing DynamicDecoder
Expand Down
42 changes: 33 additions & 9 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ type DynSszOption func(*DynSszOptions)

// DynSszOptions holds the configuration options for a DynSsz instance.
type DynSszOptions struct {
NoFastSsz bool
NoFastHash bool
ExtendedTypes bool
Verbose bool
LogCb func(format string, args ...any)
StreamWriterBufferSize int
StreamReaderBufferSize int
NoFastSsz bool
NoFastHash bool
ExtendedTypes bool
Verbose bool
LogCb func(format string, args ...any)
StreamWriterBufferSize int
StreamWriterMaxBufferSize int
StreamReaderBufferSize int
StreamReaderMaxBufferSize int
}

// WithNoFastSsz disables fastssz fallback for types that implement fastssz
Expand Down Expand Up @@ -68,14 +70,36 @@ func WithStreamWriterBufferSize(size int) DynSszOption {
}
}

// WithStreamReaderBufferSize sets the maximum internal buffer size for the
// streaming SSZ decoder used by UnmarshalSSZReader. Defaults to 2KB if not set.
// WithStreamWriterMaxBufferSize sets the maximum buffer size for delegating to
// buffer-based marshal methods during streaming SSZ encoding. When a type's
// serialized size exceeds this limit, the encoder falls through to
// reflection-based field-by-field marshalling instead of buffering the entire
// object. Defaults to 200KB if not set.
func WithStreamWriterMaxBufferSize(size int) DynSszOption {
return func(opts *DynSszOptions) {
opts.StreamWriterMaxBufferSize = size
}
}

// WithStreamReaderBufferSize sets the internal buffer size for the streaming
// SSZ decoder used by UnmarshalSSZReader. Defaults to 2KB if not set.
func WithStreamReaderBufferSize(size int) DynSszOption {
return func(opts *DynSszOptions) {
opts.StreamReaderBufferSize = size
}
}

// WithStreamReaderMaxBufferSize sets the maximum buffer size for delegating to
// buffer-based unmarshal methods during streaming SSZ decoding. When a type's
// serialized size exceeds this limit, the decoder falls through to
// reflection-based field-by-field unmarshalling instead of buffering the entire
// object. Defaults to 200KB if not set.
func WithStreamReaderMaxBufferSize(size int) DynSszOption {
return func(opts *DynSszOptions) {
opts.StreamReaderMaxBufferSize = size
}
}

// CallOption is a functional option for per-call configuration of MarshalSSZ,
// UnmarshalSSZ, and HashTreeRoot operations. These options allow runtime
// customization of SSZ encoding behavior without modifying the DynSsz instance.
Expand Down
36 changes: 21 additions & 15 deletions reflection/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@

if useFastSsz {
if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.FastsszMarshaler); ok {
newBuf, err := marshaller.MarshalSSZTo(encoder.GetBuffer())
if err != nil {
return err
if int(sourceType.Size) <= encoder.MaxEncodeBufferSize() || sourceType.SszType == ssztypes.SszCustomType {

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
newBuf, err := marshaller.MarshalSSZTo(encoder.GetBuffer())
if err != nil {
return err
}
encoder.SetBuffer(newBuf)
return nil
}
Comment on lines 75 to 84
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FastSSZ delegation guard uses sourceType.Size to decide whether to call MarshalSSZTo, but for dynamic containers TypeDescriptor.Size is typically 0 (unknown), which will always pass the <= MaxEncodeBufferSize() check and still trigger a full temporary buffer allocation on stream encoders (the issue this PR is trying to prevent). Consider using the fastssz SizeSSZ() result (with an overflow check) when sourceType.Size == 0 (and encoder is non-seekable), or conservatively skipping delegation when the size is unknown.

Copilot uses AI. Check for mistakes.
encoder.SetBuffer(newBuf)
return nil
}
}

Expand All @@ -94,12 +96,14 @@

if useDynamicMarshal {
if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.DynamicMarshaler); ok {
newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer())
if err != nil {
return err
if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) {

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
newBuf, err := marshaller.MarshalSSZDyn(ctx.ds, encoder.GetBuffer())
if err != nil {
return err
}
encoder.SetBuffer(newBuf)
return nil
}
Comment on lines 96 to 106
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block converts sourceType.Size (uint32) to int when comparing against MaxEncodeBufferSize() without guarding against platform overflow. On 32-bit platforms (or very large size hints), int(sourceType.Size) can wrap and incorrectly allow delegation. Consider checking sourceType.Size > math.MaxInt first (returning ErrPlatformOverflowFn) or performing comparisons in int64/uint64 safely.

Copilot uses AI. Check for mistakes.
Comment on lines 75 to 106
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new MaxEncodeBufferSize()-based delegation behavior (skipping buffer-based marshal methods on stream encoders when the payload is large) doesn’t appear to be covered by tests. Adding a unit test that asserts large values do not invoke FastsszMarshaler/DynamicMarshaler on StreamEncoder (but still do on BufferEncoder) would help prevent regressions.

Copilot uses AI. Check for mistakes.
encoder.SetBuffer(newBuf)
return nil
}
}
}
Expand Down Expand Up @@ -214,12 +218,14 @@
if useViewMarshaler {
if marshaller, ok := getPtr(sourceValue).Interface().(sszutils.DynamicViewMarshaler); ok {
if marshalFn := marshaller.MarshalSSZDynView(*sourceType.CodegenInfo); marshalFn != nil {
newBuf, err := marshalFn(ctx.ds, encoder.GetBuffer())
if err != nil {
return true, err
if encoder.Seekable() || (sourceType.Size > 0 && int(sourceType.Size) <= encoder.MaxEncodeBufferSize()) {

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
newBuf, err := marshalFn(ctx.ds, encoder.GetBuffer())
if err != nil {
return true, err
}
encoder.SetBuffer(newBuf)
return true, nil
Comment on lines 218 to +227
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same potential uint32 -> int overflow issue here: int(sourceType.Size) is used implicitly via int(...) <= MaxEncodeBufferSize() in the condition. To avoid incorrect delegation decisions on platforms where int is smaller than uint32, add an explicit overflow guard (or compare in a wider type) before converting.

Copilot uses AI. Check for mistakes.
}
encoder.SetBuffer(newBuf)
return true, nil
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion reflection/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@ func TestMarshalDynamicListNonSeekableSizeError(t *testing.T) {
listDesc.ElemDesc = &elemDescCopy

ctx := reflection.NewReflectionCtx(nil, nil, false, true)
encoder := sszutils.NewStreamEncoder(bytes.NewBuffer(nil), 0)
encoder := sszutils.NewStreamEncoder(bytes.NewBuffer(nil), 0, 0)
data := []DynElem{{Value: 1}, {Value: 2}}
err = ctx.MarshalSSZ(listDesc, reflect.ValueOf(data), encoder)
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion reflection/overflow_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestMarshalLargeVectorStreaming(t *testing.T) {
val := reflect.ValueOf(src)

// Use StreamEncoder writing to io.Discard so we don't buffer output.
enc := sszutils.NewStreamEncoder(io.Discard, 4096)
enc := sszutils.NewStreamEncoder(io.Discard, 4096, 0)

err := ctx.marshalType(td, val, enc, 0)
if err != nil {
Expand Down
30 changes: 18 additions & 12 deletions reflection/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ func (ctx *ReflectionCtx) unmarshalType(targetType *ssztypes.TypeDescriptor, tar
if unmarshaler, ok := targetValue.Addr().Interface().(sszutils.DynamicViewUnmarshaler); ok {
if unmarshalFn := unmarshaler.UnmarshalSSZDynView(*targetType.CodegenInfo); unmarshalFn != nil {
bufLen := decoder.GetLength()
buf, err := decoder.DecodeBytesBuf(bufLen)
if err != nil {
return err
if bufLen <= decoder.MaxDecodeBufferSize() {
buf, err := decoder.DecodeBytesBuf(bufLen)
if err != nil {
return err
}
return unmarshalFn(ctx.ds, buf)
}
return unmarshalFn(ctx.ds, buf)
}
}
}
Expand All @@ -109,11 +111,13 @@ func (ctx *ReflectionCtx) unmarshalType(targetType *ssztypes.TypeDescriptor, tar
}
sszLen = int(typeSize)
}
sszBuf, err := decoder.DecodeBytesBuf(sszLen)
if err != nil {
return err
if sszLen <= decoder.MaxDecodeBufferSize() || targetType.SszType == ssztypes.SszCustomType {
sszBuf, err := decoder.DecodeBytesBuf(sszLen)
if err != nil {
return err
}
return unmarshaller.UnmarshalSSZ(sszBuf)
}
return unmarshaller.UnmarshalSSZ(sszBuf)
}
}

Expand All @@ -136,11 +140,13 @@ func (ctx *ReflectionCtx) unmarshalType(targetType *ssztypes.TypeDescriptor, tar
}
sszLen = int(typeSize)
}
sszBuf, err := decoder.DecodeBytesBuf(sszLen)
if err != nil {
return err
if sszLen <= decoder.MaxDecodeBufferSize() {
sszBuf, err := decoder.DecodeBytesBuf(sszLen)
if err != nil {
return err
}
return unmarshaller.UnmarshalSSZDyn(ctx.ds, sszBuf)
}
return unmarshaller.UnmarshalSSZDyn(ctx.ds, sszBuf)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sszutils/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ type Decoder interface {
DecodeOffset() (uint32, error)
DecodeOffsetAt(pos int) uint32
SkipBytes(n int)
MaxDecodeBufferSize() int // max size for efficient DecodeBytesBuf without excessive allocation
}
7 changes: 7 additions & 0 deletions sszutils/decoder_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sszutils

import (
"encoding/binary"
"math"
)

// BufferDecoder is a seekable Decoder implementation backed by an in-memory
Expand Down Expand Up @@ -39,6 +40,12 @@ func (e *BufferDecoder) Seekable() bool {
return true
}

// MaxDecodeBufferSize returns math.MaxInt because the entire buffer is already
// in memory and DecodeBytesBuf only returns a slice—no allocation needed.
func (e *BufferDecoder) MaxDecodeBufferSize() int {
return math.MaxInt
}

// GetPosition returns the current read position in the buffer.
func (e *BufferDecoder) GetPosition() int {
return e.position
Expand Down
52 changes: 35 additions & 17 deletions sszutils/decoder_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (
"io"
)

// DefaultStreamDecoderBufSize is the default maximum buffer size for
// DefaultStreamDecoderBufSize is the default internal read buffer size for
// StreamDecoder (2KB).
const DefaultStreamDecoderBufSize = 2 * 1024

// DefaultStreamDecoderMaxBufSize is the default maximum buffer size for
// delegation to buffer-based unmarshal methods (200KB). Reads exceeding this
// threshold fall through to reflection-based field-by-field unmarshalling.
const DefaultStreamDecoderMaxBufSize = 200 * 1024

// StreamDecoder is a non-seekable Decoder implementation that reads SSZ data
// from an io.Reader. It uses an internal buffer for efficient sequential reads
// but does not support DecodeOffsetAt or SkipBytes.
Expand All @@ -24,23 +29,28 @@ type StreamDecoder struct {
position int

// Internal buffer for reading from stream
buffer []byte
bufferPos int // Current read position within buffer
bufferLen int // Amount of valid data in buffer
buffer []byte
bufferPos int // Current read position within buffer
bufferLen int // Amount of valid data in buffer
maxBufSize int // Configured maximum buffer size
}

var _ Decoder = (*StreamDecoder)(nil)

// NewStreamDecoder creates a new StreamDecoder that reads SSZ data from the
// provided io.Reader. totalLen specifies the total expected byte length of the
// SSZ payload. maxBufSize controls the maximum internal read buffer size; if
// <= 0, DefaultStreamDecoderBufSize is used.
func NewStreamDecoder(reader io.Reader, totalLen, maxBufSize int) *StreamDecoder {
// SSZ payload. bufSize controls the internal read buffer size (defaults to 2KB
// if <= 0). maxBufSize controls the maximum buffer size for delegation to
// buffer-based unmarshal methods (defaults to 200KB if <= 0).
func NewStreamDecoder(reader io.Reader, totalLen, bufSize, maxBufSize int) *StreamDecoder {
if bufSize <= 0 {
bufSize = DefaultStreamDecoderBufSize
}
if maxBufSize <= 0 {
maxBufSize = DefaultStreamDecoderBufSize
maxBufSize = DefaultStreamDecoderMaxBufSize
}
// Use smaller buffer for small streams
bufferSize := maxBufSize
bufferSize := bufSize
if totalLen < bufferSize {
bufferSize = totalLen
}
Expand All @@ -49,21 +59,29 @@ func NewStreamDecoder(reader io.Reader, totalLen, maxBufSize int) *StreamDecoder
}

return &StreamDecoder{
reader: reader,
limits: make([]int, 0, 16),
lastLimit: totalLen,
streamLen: totalLen,
position: 0,
buffer: make([]byte, bufferSize),
bufferPos: 0,
bufferLen: 0,
reader: reader,
limits: make([]int, 0, 16),
lastLimit: totalLen,
streamLen: totalLen,
position: 0,
buffer: make([]byte, bufferSize),
bufferPos: 0,
bufferLen: 0,
maxBufSize: maxBufSize,
}
}

func (e *StreamDecoder) Seekable() bool {
return false
}

// MaxDecodeBufferSize returns the configured maximum buffer size. Callers
// should avoid DecodeBytesBuf calls that exceed this size to prevent the
// stream decoder from allocating large temporary buffers.
func (e *StreamDecoder) MaxDecodeBufferSize() int {
return e.maxBufSize
}

func (e *StreamDecoder) GetPosition() int {
return e.position
}
Expand Down
1 change: 1 addition & 0 deletions sszutils/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type Encoder interface {
EncodeOffset(v uint32)
EncodeOffsetAt(pos int, v uint32)
EncodeZeroPadding(n int)
MaxEncodeBufferSize() int // max size for efficient GetBuffer/SetBuffer delegation
}
7 changes: 7 additions & 0 deletions sszutils/encoder_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sszutils

import (
"encoding/binary"
"math"
)

// BufferEncoder is a seekable Encoder implementation backed by an in-memory
Expand Down Expand Up @@ -33,6 +34,12 @@ func (e *BufferEncoder) Seekable() bool {
return true
}

// MaxEncodeBufferSize returns math.MaxInt because the entire output is already
// buffered in memory—no additional allocation overhead from delegation.
func (e *BufferEncoder) MaxEncodeBufferSize() int {
return math.MaxInt
}

// GetPosition returns the current write position in the buffer.
func (e *BufferEncoder) GetPosition() int {
return e.pos
Expand Down
Loading
Loading