diff --git a/clients/consensus/rpc/beaconapi.go b/clients/consensus/rpc/beaconapi.go index 6768091b..45f930dd 100644 --- a/clients/consensus/rpc/beaconapi.go +++ b/clients/consensus/rpc/beaconapi.go @@ -17,9 +17,14 @@ import ( v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/http" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/bellatrix" "github.com/attestantio/go-eth2-client/spec/capella" "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/electra" + "github.com/attestantio/go-eth2-client/spec/fulu" "github.com/attestantio/go-eth2-client/spec/phase0" + dynssz "github.com/pk910/dynamic-ssz" "github.com/rs/zerolog" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -406,23 +411,190 @@ func (bc *BeaconClient) GetBlockBodyByBlockroot(ctx context.Context, blockroot p return result.Data, nil } +// GetState fetches a beacon state using streaming decoding to avoid buffering +// the full response (200MB+) in memory. Both SSZ and JSON responses are decoded +// directly from the HTTP response body. func (bc *BeaconClient) GetState(ctx context.Context, stateRef string) (*spec.VersionedBeaconState, error) { - provider, isProvider := bc.clientSvc.(eth2client.BeaconStateProvider) - if !isProvider { - return nil, fmt.Errorf("get beacon state not supported") + reqURL := fmt.Sprintf("%s/eth/v2/debug/beacon/states/%s", bc.endpoint, stateRef) + + req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodGet, reqURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create beacon state request: %w", err) + } + + for headerKey, headerVal := range bc.headers { + req.Header.Set(headerKey, headerVal) + } + + if bc.disableSSZ { + req.Header.Set("Accept", "application/json") + } else { + req.Header.Set("Accept", "application/octet-stream;q=1,application/json;q=0.9") + } + + client := &nethttp.Client{Timeout: 10 * time.Minute} + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to request beacon state: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != nethttp.StatusOK { + data, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to load beacon state (status %s): %s", resp.Status, string(data)) + } + + // Parse consensus version from Eth-Consensus-Version header. + consensusVersion, err := parseConsensusVersionHeader(resp) + if err != nil { + return nil, fmt.Errorf("failed to parse consensus version: %w", err) + } + + state := &spec.VersionedBeaconState{ + Version: consensusVersion, + } + + // Determine content type from response header. + contentType := resp.Header.Get("Content-Type") + isSSZ := strings.HasPrefix(contentType, "application/octet-stream") + + if isSSZ { + err = bc.streamDecodeStateSSZ(ctx, resp, state) + } else { + err = streamDecodeStateJSON(resp.Body, state) } - result, err := provider.BeaconState(ctx, &api.BeaconStateOpts{ - State: stateRef, - Common: api.CommonOpts{ - Timeout: 0, - }, - }) if err != nil { return nil, err } - return result.Data, nil + return state, nil +} + +// parseConsensusVersionHeader extracts the consensus version from the +// Eth-Consensus-Version HTTP response header. +func parseConsensusVersionHeader(resp *nethttp.Response) (spec.DataVersion, error) { + versionHeader := resp.Header.Get("Eth-Consensus-Version") + if versionHeader == "" { + return spec.DataVersionUnknown, fmt.Errorf("missing Eth-Consensus-Version header") + } + + version, err := spec.DataVersionFromString(versionHeader) + if err != nil { + return spec.DataVersionUnknown, fmt.Errorf("unrecognized consensus version %q: %w", versionHeader, err) + } + + return version, nil +} + +// streamDecodeStateSSZ decodes a beacon state from an SSZ-encoded HTTP response +// body using streaming deserialization via dynamic-ssz. +func (bc *BeaconClient) streamDecodeStateSSZ(ctx context.Context, resp *nethttp.Response, state *spec.VersionedBeaconState) error { + // Fetch spec data for dynamic SSZ decoding. + specProvider, isProvider := bc.clientSvc.(eth2client.SpecProvider) + if !isProvider { + return fmt.Errorf("spec provider not available for dynamic SSZ decoding") + } + + specs, err := specProvider.Spec(ctx, &api.SpecOpts{}) + if err != nil { + return fmt.Errorf("failed to fetch specs for SSZ decoding: %w", err) + } + + ds := dynssz.NewDynSsz(specs.Data, dynssz.WithLogCb(func(format string, args ...any) { + bc.logger.Infof(format, args...) + }), dynssz.WithVerbose()) + sszSize := int(resp.ContentLength) // -1 if unknown + + switch state.Version { + case spec.DataVersionPhase0: + state.Phase0 = &phase0.BeaconState{} + err = ds.UnmarshalSSZReader(state.Phase0, resp.Body, sszSize) + case spec.DataVersionAltair: + state.Altair = &altair.BeaconState{} + err = ds.UnmarshalSSZReader(state.Altair, resp.Body, sszSize) + case spec.DataVersionBellatrix: + state.Bellatrix = &bellatrix.BeaconState{} + err = ds.UnmarshalSSZReader(state.Bellatrix, resp.Body, sszSize) + case spec.DataVersionCapella: + state.Capella = &capella.BeaconState{} + err = ds.UnmarshalSSZReader(state.Capella, resp.Body, sszSize) + case spec.DataVersionDeneb: + state.Deneb = &deneb.BeaconState{} + err = ds.UnmarshalSSZReader(state.Deneb, resp.Body, sszSize) + case spec.DataVersionElectra: + state.Electra = &electra.BeaconState{} + err = ds.UnmarshalSSZReader(state.Electra, resp.Body, sszSize) + case spec.DataVersionFulu: + state.Fulu = &fulu.BeaconState{} + err = ds.UnmarshalSSZReader(state.Fulu, resp.Body, sszSize) + default: + return fmt.Errorf("unsupported SSZ state version: %s", state.Version) + } + + if err != nil { + return fmt.Errorf("failed to decode %s beacon state from SSZ: %w", state.Version, err) + } + + return nil +} + +// streamDecodeStateJSON decodes a beacon state from a JSON-encoded HTTP response +// body using streaming JSON decoding. The response has the standard beacon API +// envelope: {"version":"...","data":{...}}. +// The decoder streams from the reader and populates the target struct directly, +// avoiding an intermediate raw-bytes buffer for the data field. +func streamDecodeStateJSON(body io.Reader, state *spec.VersionedBeaconState) error { + var err error + + switch state.Version { + case spec.DataVersionPhase0: + state.Phase0 = &phase0.BeaconState{} + err = decodeJSONEnvelope(body, state.Phase0) + case spec.DataVersionAltair: + state.Altair = &altair.BeaconState{} + err = decodeJSONEnvelope(body, state.Altair) + case spec.DataVersionBellatrix: + state.Bellatrix = &bellatrix.BeaconState{} + err = decodeJSONEnvelope(body, state.Bellatrix) + case spec.DataVersionCapella: + state.Capella = &capella.BeaconState{} + err = decodeJSONEnvelope(body, state.Capella) + case spec.DataVersionDeneb: + state.Deneb = &deneb.BeaconState{} + err = decodeJSONEnvelope(body, state.Deneb) + case spec.DataVersionElectra: + state.Electra = &electra.BeaconState{} + err = decodeJSONEnvelope(body, state.Electra) + case spec.DataVersionFulu: + state.Fulu = &fulu.BeaconState{} + err = decodeJSONEnvelope(body, state.Fulu) + default: + return fmt.Errorf("unsupported JSON state version: %s", state.Version) + } + + if err != nil { + return fmt.Errorf("failed to decode %s beacon state from JSON: %w", state.Version, err) + } + + return nil +} + +// decodeJSONEnvelope decodes a beacon API JSON envelope {"data": ...} directly +// into the target struct via streaming decoder, avoiding buffering the raw JSON. +func decodeJSONEnvelope[T any](body io.Reader, target T) error { + envelope := struct { + Data T `json:"data"` + }{ + Data: target, + } + + if err := json.NewDecoder(body).Decode(&envelope); err != nil { + return fmt.Errorf("failed to decode beacon state JSON: %w", err) + } + + return nil } func (bc *BeaconClient) GetBlobSidecarsByBlockroot(ctx context.Context, blockroot []byte) ([]*deneb.BlobSidecar, error) { diff --git a/go.mod b/go.mod index bb5afdfb..cb5036e1 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/mashingan/smapping v0.1.19 github.com/minio/minio-go/v7 v7.0.99 github.com/mitchellh/mapstructure v1.5.0 - github.com/pk910/dynamic-ssz v1.2.3-0.20260318065836-323b83c1a387 + github.com/pk910/dynamic-ssz v1.3.1-0.20260407212738-e97de623fd84 github.com/pressly/goose/v3 v3.27.0 github.com/probe-lab/eth-das-guardian v0.2.2 github.com/protolambda/bls12-381-util v0.1.0 diff --git a/go.sum b/go.sum index e07ccd85..37067095 100644 --- a/go.sum +++ b/go.sum @@ -555,8 +555,8 @@ github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc= github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8= github.com/pion/webrtc/v4 v4.1.4 h1:/gK1ACGHXQmtyVVbJFQDxNoODg4eSRiFLB7t9r9pg8M= github.com/pion/webrtc/v4 v4.1.4/go.mod h1:Oab9npu1iZtQRMic3K3toYq5zFPvToe/QBw7dMI2ok4= -github.com/pk910/dynamic-ssz v1.2.3-0.20260318065836-323b83c1a387 h1:XkL2iLFDP6/NKAOF0fysZrhaa/qtcAtqzhLUxSIy79s= -github.com/pk910/dynamic-ssz v1.2.3-0.20260318065836-323b83c1a387/go.mod h1:NmeFF4jxzVwWC8cnEhUB7xMI++8hd/0OZvZHFrUvFfs= +github.com/pk910/dynamic-ssz v1.3.1-0.20260407212738-e97de623fd84 h1:J3H3PiaO4+ej5HTK/nG/wnAj0jx+Ek2+0s8o+zujI4I= +github.com/pk910/dynamic-ssz v1.3.1-0.20260407212738-e97de623fd84/go.mod h1:NmeFF4jxzVwWC8cnEhUB7xMI++8hd/0OZvZHFrUvFfs= github.com/pk910/hashtree-bindings v0.1.0 h1:w7NyRWFi2OaYEFvo9ADcE/QU6PMuVLl3hBgx92KiH9c= github.com/pk910/hashtree-bindings v0.1.0/go.mod h1:zrWt88783JmhBfcgni6kkIMYRdXTZi/FL//OyI5T/l4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=