Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 58 additions & 46 deletions blockdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/binary"
"fmt"
"math"
"time"

"github.com/cockroachdb/pebble"
Expand Down Expand Up @@ -65,6 +66,12 @@ func makeKey(root []byte, blockType uint16) []byte {
return key
}

func makeKeyRange(root []byte) ([]byte, []byte) {
start := makeKey(root, 0)
end := makeKey(root, math.MaxUint16)
return start, end
}

// getComponent retrieves a single component from the database.
// Returns (data, version, timestamp, error). Returns nil data if not found.
func (e *PebbleEngine) getComponent(root []byte, blockType uint16) ([]byte, uint64, time.Time, error) {
Expand Down Expand Up @@ -92,48 +99,58 @@ func (e *PebbleEngine) getComponent(root []byte, blockType uint16) ([]byte, uint
return data, version, timestamp, nil
}

// setComponent stores a single component in the database.
func (e *PebbleEngine) setComponent(root []byte, blockType uint16, version uint64, data []byte) error {
key := makeKey(root, blockType)

// encodeComponentValue serializes a block component with its version and write timestamp.
func encodeComponentValue(version uint64, data []byte) []byte {
value := make([]byte, valueHeaderSize+len(data))
binary.BigEndian.PutUint64(value[:8], version)
binary.BigEndian.PutUint64(value[8:16], uint64(time.Now().UnixNano()))
copy(value[valueHeaderSize:], data)

return e.db.Set(key, value, nil)
return value
}

// componentExists checks if a component exists in the database.
func (e *PebbleEngine) componentExists(root []byte, blockType uint16) bool {
key := makeKey(root, blockType)
// getStoredComponents scans the component key range for a block and returns the stored flags.
func (e *PebbleEngine) getStoredComponents(root []byte) (types.BlockDataFlags, error) {
lowerBound, upperBound := makeKeyRange(root)

res, closer, err := e.db.Get(key)
if err == nil && len(res) >= valueHeaderSize {
closer.Close()
return true
iter, err := e.db.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
if err != nil {
return 0, err
}
return false
}
defer iter.Close()

// GetStoredComponents returns which components exist for a block.
func (e *PebbleEngine) GetStoredComponents(_ context.Context, _ uint64, root []byte) (types.BlockDataFlags, error) {
var flags types.BlockDataFlags
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
if len(key) < 4 || len(iter.Value()) < valueHeaderSize {
continue
}

if e.componentExists(root, BlockTypeHeader) {
flags |= types.BlockDataFlagHeader
}
if e.componentExists(root, BlockTypeBody) {
flags |= types.BlockDataFlagBody
}
if e.componentExists(root, BlockTypePayload) {
flags |= types.BlockDataFlagPayload
}
if e.componentExists(root, BlockTypeBal) {
flags |= types.BlockDataFlagBal
switch binary.BigEndian.Uint16(key[len(key)-2:]) {
case BlockTypeHeader:
flags |= types.BlockDataFlagHeader
case BlockTypeBody:
flags |= types.BlockDataFlagBody
case BlockTypePayload:
flags |= types.BlockDataFlagPayload
case BlockTypeBal:
flags |= types.BlockDataFlagBal
}

if flags == types.BlockDataFlagAll {
break
}
}

return flags, nil
return flags, iter.Error()
}

// GetStoredComponents returns which components exist for a block.
func (e *PebbleEngine) GetStoredComponents(_ context.Context, _ uint64, root []byte) (types.BlockDataFlags, error) {
return e.getStoredComponents(root)
}

// GetBlock retrieves block data with selective loading based on flags.
Expand Down Expand Up @@ -229,7 +246,7 @@ func (e *PebbleEngine) AddBlock(
dataCb func() (*types.BlockData, error),
) (bool, bool, error) {
// Check what components already exist
existingFlags, err := e.GetStoredComponents(context.Background(), 0, root)
existingFlags, err := e.getStoredComponents(root)
if err != nil {
return false, false, fmt.Errorf("failed to check existing components: %w", err)
}
Expand All @@ -241,19 +258,7 @@ func (e *PebbleEngine) AddBlock(
}

// Determine what new components we have
var newFlags types.BlockDataFlags
if len(blockData.HeaderData) > 0 {
newFlags |= types.BlockDataFlagHeader
}
if len(blockData.BodyData) > 0 {
newFlags |= types.BlockDataFlagBody
}
if blockData.PayloadVersion != 0 && len(blockData.PayloadData) > 0 {
newFlags |= types.BlockDataFlagPayload
}
if blockData.BalVersion != 0 && len(blockData.BalData) > 0 {
newFlags |= types.BlockDataFlagBal
}
newFlags := types.StoredFlagsFromBlockData(blockData)

// Calculate components to add (new components not in existing)
toAdd := newFlags &^ existingFlags
Expand All @@ -266,31 +271,38 @@ func (e *PebbleEngine) AddBlock(
isNew := existingFlags == 0
isUpdated := !isNew

batch := e.db.NewBatch()
defer batch.Close()

// Store new components
if toAdd.Has(types.BlockDataFlagHeader) {
if err := e.setComponent(root, BlockTypeHeader, blockData.HeaderVersion, blockData.HeaderData); err != nil {
if err := batch.Set(makeKey(root, BlockTypeHeader), encodeComponentValue(blockData.HeaderVersion, blockData.HeaderData), nil); err != nil {
return false, false, fmt.Errorf("failed to store header: %w", err)
}
}

if toAdd.Has(types.BlockDataFlagBody) {
if err := e.setComponent(root, BlockTypeBody, blockData.BodyVersion, blockData.BodyData); err != nil {
if err := batch.Set(makeKey(root, BlockTypeBody), encodeComponentValue(blockData.BodyVersion, blockData.BodyData), nil); err != nil {
return false, false, fmt.Errorf("failed to store body: %w", err)
}
}

if toAdd.Has(types.BlockDataFlagPayload) {
if err := e.setComponent(root, BlockTypePayload, blockData.PayloadVersion, blockData.PayloadData); err != nil {
if err := batch.Set(makeKey(root, BlockTypePayload), encodeComponentValue(blockData.PayloadVersion, blockData.PayloadData), nil); err != nil {
return false, false, fmt.Errorf("failed to store payload: %w", err)
}
}

if toAdd.Has(types.BlockDataFlagBal) {
if err := e.setComponent(root, BlockTypeBal, blockData.BalVersion, blockData.BalData); err != nil {
if err := batch.Set(makeKey(root, BlockTypeBal), encodeComponentValue(blockData.BalVersion, blockData.BalData), nil); err != nil {
return false, false, fmt.Errorf("failed to store BAL: %w", err)
}
}

if err := batch.Commit(nil); err != nil {
return false, false, fmt.Errorf("failed to commit block components: %w", err)
}

return isNew, isUpdated, nil
}

Expand Down
16 changes: 7 additions & 9 deletions blockdb/s3/execdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package s3
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"path"
"strconv"
"strings"

"github.com/minio/minio-go/v7"
Expand All @@ -17,7 +17,7 @@ import (
// getExecDataKey builds the S3 object key for execution data.
// Format: {pathPrefix}/{slot/10000}/{slot_padded}_{blockRootHex}_exec
func (e *S3Engine) getExecDataKey(slot uint64, blockRoot []byte) string {
rootHex := hex.EncodeToString(blockRoot[:4])
rootHex := encodeRootPrefix(blockRoot)
return path.Join(
e.pathPrefix,
fmt.Sprintf("%06d", slot/10000),
Expand Down Expand Up @@ -282,19 +282,17 @@ func isNotFound(err error) bool {
// Expected format: .../NNNNNNNNNN_HHHHHHHH[_exec]
// Returns 0 if parsing fails (safe default - won't match any pruning threshold).
func parseSlotFromKey(key string) uint64 {
parts := strings.Split(key, "/")
if len(parts) == 0 {
return 0
filenameStart := strings.LastIndexByte(key, '/')
if filenameStart >= 0 {
key = key[filenameStart+1:]
}
filename := parts[len(parts)-1]

underscoreIdx := strings.Index(filename, "_")
underscoreIdx := strings.IndexByte(key, '_')
if underscoreIdx <= 0 {
return 0
}

var slot uint64
_, err := fmt.Sscanf(filename[:underscoreIdx], "%d", &slot)
slot, err := strconv.ParseUint(key[:underscoreIdx], 10, 64)
if err != nil {
return 0
}
Expand Down
27 changes: 13 additions & 14 deletions blockdb/s3/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,21 @@ func (e *S3Engine) Close() error {
}

func (e *S3Engine) getObjectKey(root []byte, slot uint64) string {
rootHex := hex.EncodeToString(root[:4]) // First 4 bytes
rootHex := encodeRootPrefix(root)
return path.Join(e.pathPrefix, fmt.Sprintf("%06d", slot/10000), fmt.Sprintf("%010d_%s", slot, rootHex))
}

// encodeRootPrefix returns the hex-encoded 4-byte root prefix used in S3 object keys.
func encodeRootPrefix(root []byte) string {
if len(root) < 4 {
return hex.EncodeToString(root)
}

buf := make([]byte, 8)
hex.Encode(buf, root[:4])
return string(buf)
}

// GetStoredComponents returns which components exist for a block by reading metadata.
func (e *S3Engine) GetStoredComponents(ctx context.Context, slot uint64, root []byte) (types.BlockDataFlags, error) {
key := e.getObjectKey(root, slot)
Expand Down Expand Up @@ -446,19 +457,7 @@ func (e *S3Engine) AddBlock(
}

// Calculate what the new data provides
var newFlags types.BlockDataFlags
if len(blockData.HeaderData) > 0 {
newFlags |= types.BlockDataFlagHeader
}
if len(blockData.BodyData) > 0 {
newFlags |= types.BlockDataFlagBody
}
if blockData.PayloadVersion != 0 && len(blockData.PayloadData) > 0 {
newFlags |= types.BlockDataFlagPayload
}
if blockData.BalVersion != 0 && len(blockData.BalData) > 0 {
newFlags |= types.BlockDataFlagBal
}
newFlags := types.StoredFlagsFromBlockData(blockData)

// Check if we need to update (new data has more components)
needsUpdate := (newFlags &^ existingFlags) != 0
Expand Down
14 changes: 1 addition & 13 deletions blockdb/tiered/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,7 @@ func (e *TieredEngine) AddBlock(
existingFlags, _ := e.GetStoredComponents(ctx, slot, root)

// Determine what new data provides
var newFlags types.BlockDataFlags
if len(data.HeaderData) > 0 {
newFlags |= types.BlockDataFlagHeader
}
if len(data.BodyData) > 0 {
newFlags |= types.BlockDataFlagBody
}
if data.PayloadVersion != 0 && len(data.PayloadData) > 0 {
newFlags |= types.BlockDataFlagPayload
}
if data.BalVersion != 0 && len(data.BalData) > 0 {
newFlags |= types.BlockDataFlagBal
}
newFlags := types.StoredFlagsFromBlockData(data)

// Check if we need to update
needsUpdate := (newFlags &^ existingFlags) != 0
Expand Down
23 changes: 23 additions & 0 deletions blockdb/types/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,26 @@ func (f BlockDataFlags) Add(flag BlockDataFlags) BlockDataFlags {
func (f BlockDataFlags) Remove(flag BlockDataFlags) BlockDataFlags {
return f &^ flag
}

// StoredFlagsFromBlockData returns which block components are present in data.
func StoredFlagsFromBlockData(data *BlockData) BlockDataFlags {
if data == nil {
return 0
}

var flags BlockDataFlags
if len(data.HeaderData) > 0 {
flags |= BlockDataFlagHeader
}
if len(data.BodyData) > 0 {
flags |= BlockDataFlagBody
}
if data.PayloadVersion != 0 && len(data.PayloadData) > 0 {
flags |= BlockDataFlagPayload
}
if data.BalVersion != 0 && len(data.BalData) > 0 {
flags |= BlockDataFlagBal
}

return flags
}