Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 182 additions & 10 deletions clients/consensus/rpc/beaconapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/http"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/altair"
"github.com/attestantio/go-eth2-client/spec/bellatrix"
"github.com/attestantio/go-eth2-client/spec/capella"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/electra"
"github.com/attestantio/go-eth2-client/spec/fulu"
"github.com/attestantio/go-eth2-client/spec/phase0"
dynssz "github.com/pk910/dynamic-ssz"
"github.com/rs/zerolog"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -406,23 +411,190 @@ func (bc *BeaconClient) GetBlockBodyByBlockroot(ctx context.Context, blockroot p
return result.Data, nil
}

// GetState fetches a beacon state using streaming decoding to avoid buffering
// the full response (200MB+) in memory. Both SSZ and JSON responses are decoded
// directly from the HTTP response body.
func (bc *BeaconClient) GetState(ctx context.Context, stateRef string) (*spec.VersionedBeaconState, error) {
provider, isProvider := bc.clientSvc.(eth2client.BeaconStateProvider)
if !isProvider {
return nil, fmt.Errorf("get beacon state not supported")
reqURL := fmt.Sprintf("%s/eth/v2/debug/beacon/states/%s", bc.endpoint, stateRef)

req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodGet, reqURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create beacon state request: %w", err)
}

for headerKey, headerVal := range bc.headers {
req.Header.Set(headerKey, headerVal)
}

if bc.disableSSZ {
req.Header.Set("Accept", "application/json")
} else {
req.Header.Set("Accept", "application/octet-stream;q=1,application/json;q=0.9")
}

client := &nethttp.Client{Timeout: 10 * time.Minute}

resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to request beacon state: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != nethttp.StatusOK {
data, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to load beacon state (status %s): %s", resp.Status, string(data))
}

// Parse consensus version from Eth-Consensus-Version header.
consensusVersion, err := parseConsensusVersionHeader(resp)
if err != nil {
return nil, fmt.Errorf("failed to parse consensus version: %w", err)
}

state := &spec.VersionedBeaconState{
Version: consensusVersion,
}

// Determine content type from response header.
contentType := resp.Header.Get("Content-Type")
isSSZ := strings.HasPrefix(contentType, "application/octet-stream")

if isSSZ {
err = bc.streamDecodeStateSSZ(ctx, resp, state)
} else {
err = streamDecodeStateJSON(resp.Body, state)
}

result, err := provider.BeaconState(ctx, &api.BeaconStateOpts{
State: stateRef,
Common: api.CommonOpts{
Timeout: 0,
},
})
if err != nil {
return nil, err
}

return result.Data, nil
return state, nil
}

// parseConsensusVersionHeader extracts the consensus version from the
// Eth-Consensus-Version HTTP response header.
func parseConsensusVersionHeader(resp *nethttp.Response) (spec.DataVersion, error) {
versionHeader := resp.Header.Get("Eth-Consensus-Version")
if versionHeader == "" {
return spec.DataVersionUnknown, fmt.Errorf("missing Eth-Consensus-Version header")
}

version, err := spec.DataVersionFromString(versionHeader)
if err != nil {
return spec.DataVersionUnknown, fmt.Errorf("unrecognized consensus version %q: %w", versionHeader, err)
}

return version, nil
}

// streamDecodeStateSSZ decodes a beacon state from an SSZ-encoded HTTP response
// body using streaming deserialization via dynamic-ssz.
func (bc *BeaconClient) streamDecodeStateSSZ(ctx context.Context, resp *nethttp.Response, state *spec.VersionedBeaconState) error {
// Fetch spec data for dynamic SSZ decoding.
specProvider, isProvider := bc.clientSvc.(eth2client.SpecProvider)
if !isProvider {
return fmt.Errorf("spec provider not available for dynamic SSZ decoding")
}

specs, err := specProvider.Spec(ctx, &api.SpecOpts{})
if err != nil {
return fmt.Errorf("failed to fetch specs for SSZ decoding: %w", err)
}

ds := dynssz.NewDynSsz(specs.Data, dynssz.WithLogCb(func(format string, args ...any) {
bc.logger.Infof(format, args...)
}), dynssz.WithVerbose())
sszSize := int(resp.ContentLength) // -1 if unknown

switch state.Version {
case spec.DataVersionPhase0:
state.Phase0 = &phase0.BeaconState{}
err = ds.UnmarshalSSZReader(state.Phase0, resp.Body, sszSize)
case spec.DataVersionAltair:
state.Altair = &altair.BeaconState{}
err = ds.UnmarshalSSZReader(state.Altair, resp.Body, sszSize)
case spec.DataVersionBellatrix:
state.Bellatrix = &bellatrix.BeaconState{}
err = ds.UnmarshalSSZReader(state.Bellatrix, resp.Body, sszSize)
case spec.DataVersionCapella:
state.Capella = &capella.BeaconState{}
err = ds.UnmarshalSSZReader(state.Capella, resp.Body, sszSize)
case spec.DataVersionDeneb:
state.Deneb = &deneb.BeaconState{}
err = ds.UnmarshalSSZReader(state.Deneb, resp.Body, sszSize)
case spec.DataVersionElectra:
state.Electra = &electra.BeaconState{}
err = ds.UnmarshalSSZReader(state.Electra, resp.Body, sszSize)
case spec.DataVersionFulu:
state.Fulu = &fulu.BeaconState{}
err = ds.UnmarshalSSZReader(state.Fulu, resp.Body, sszSize)
default:
return fmt.Errorf("unsupported SSZ state version: %s", state.Version)
}

if err != nil {
return fmt.Errorf("failed to decode %s beacon state from SSZ: %w", state.Version, err)
}

return nil
}

// streamDecodeStateJSON decodes a beacon state from a JSON-encoded HTTP response
// body using streaming JSON decoding. The response has the standard beacon API
// envelope: {"version":"...","data":{...}}.
// The decoder streams from the reader and populates the target struct directly,
// avoiding an intermediate raw-bytes buffer for the data field.
func streamDecodeStateJSON(body io.Reader, state *spec.VersionedBeaconState) error {
var err error

switch state.Version {
case spec.DataVersionPhase0:
state.Phase0 = &phase0.BeaconState{}
err = decodeJSONEnvelope(body, state.Phase0)
case spec.DataVersionAltair:
state.Altair = &altair.BeaconState{}
err = decodeJSONEnvelope(body, state.Altair)
case spec.DataVersionBellatrix:
state.Bellatrix = &bellatrix.BeaconState{}
err = decodeJSONEnvelope(body, state.Bellatrix)
case spec.DataVersionCapella:
state.Capella = &capella.BeaconState{}
err = decodeJSONEnvelope(body, state.Capella)
case spec.DataVersionDeneb:
state.Deneb = &deneb.BeaconState{}
err = decodeJSONEnvelope(body, state.Deneb)
case spec.DataVersionElectra:
state.Electra = &electra.BeaconState{}
err = decodeJSONEnvelope(body, state.Electra)
case spec.DataVersionFulu:
state.Fulu = &fulu.BeaconState{}
err = decodeJSONEnvelope(body, state.Fulu)
default:
return fmt.Errorf("unsupported JSON state version: %s", state.Version)
}

if err != nil {
return fmt.Errorf("failed to decode %s beacon state from JSON: %w", state.Version, err)
}

return nil
}

// decodeJSONEnvelope decodes a beacon API JSON envelope {"data": ...} directly
// into the target struct via streaming decoder, avoiding buffering the raw JSON.
func decodeJSONEnvelope[T any](body io.Reader, target T) error {
envelope := struct {
Data T `json:"data"`
}{
Data: target,
}

if err := json.NewDecoder(body).Decode(&envelope); err != nil {
return fmt.Errorf("failed to decode beacon state JSON: %w", err)
}

return nil
}

func (bc *BeaconClient) GetBlobSidecarsByBlockroot(ctx context.Context, blockroot []byte) ([]*deneb.BlobSidecar, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/mashingan/smapping v0.1.19
github.com/minio/minio-go/v7 v7.0.99
github.com/mitchellh/mapstructure v1.5.0
github.com/pk910/dynamic-ssz v1.2.3-0.20260318065836-323b83c1a387
github.com/pk910/dynamic-ssz v1.3.1-0.20260407212738-e97de623fd84
github.com/pressly/goose/v3 v3.27.0
github.com/probe-lab/eth-das-guardian v0.2.2
github.com/protolambda/bls12-381-util v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc=
github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8=
github.com/pion/webrtc/v4 v4.1.4 h1:/gK1ACGHXQmtyVVbJFQDxNoODg4eSRiFLB7t9r9pg8M=
github.com/pion/webrtc/v4 v4.1.4/go.mod h1:Oab9npu1iZtQRMic3K3toYq5zFPvToe/QBw7dMI2ok4=
github.com/pk910/dynamic-ssz v1.2.3-0.20260318065836-323b83c1a387 h1:XkL2iLFDP6/NKAOF0fysZrhaa/qtcAtqzhLUxSIy79s=
github.com/pk910/dynamic-ssz v1.2.3-0.20260318065836-323b83c1a387/go.mod h1:NmeFF4jxzVwWC8cnEhUB7xMI++8hd/0OZvZHFrUvFfs=
github.com/pk910/dynamic-ssz v1.3.1-0.20260407212738-e97de623fd84 h1:J3H3PiaO4+ej5HTK/nG/wnAj0jx+Ek2+0s8o+zujI4I=
github.com/pk910/dynamic-ssz v1.3.1-0.20260407212738-e97de623fd84/go.mod h1:NmeFF4jxzVwWC8cnEhUB7xMI++8hd/0OZvZHFrUvFfs=
github.com/pk910/hashtree-bindings v0.1.0 h1:w7NyRWFi2OaYEFvo9ADcE/QU6PMuVLl3hBgx92KiH9c=
github.com/pk910/hashtree-bindings v0.1.0/go.mod h1:zrWt88783JmhBfcgni6kkIMYRdXTZi/FL//OyI5T/l4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down