From 863f6fd380c2da5156cb30fb70b20fd0a71413b8 Mon Sep 17 00:00:00 2001 From: Nathan Swartz <112000252+swartzn@users.noreply.github.com> Date: Fri, 9 Jan 2026 12:43:59 -0600 Subject: [PATCH 1/2] feat(rst): add s3 placement hints and optional builder job request sorting Add support for JobRequest SortValues. - Generically allows RST clients to optionally provide an order string slice to sort job requests. Add support for s3 storage class placement hints. - Supports string, integer and float s3 metadata value types. - Includes unit tests Unify rst.GetRemotePathInfo return values into RemotePathInfo type. --- common/rst/builder.go | 129 ++++++++++++++++++++++++----- common/rst/mock.go | 4 +- common/rst/rst.go | 28 +++++-- common/rst/s3.go | 170 +++++++++++++++++++++++++++++++------- common/rst/s3_test.go | 120 +++++++++++++++++++++++++++ ctl/pkg/ctl/rst/status.go | 6 +- 6 files changed, 391 insertions(+), 66 deletions(-) diff --git a/common/rst/builder.go b/common/rst/builder.go index 007ce8411..ad8196b4d 100644 --- a/common/rst/builder.go +++ b/common/rst/builder.go @@ -6,13 +6,16 @@ import ( "fmt" "path/filepath" "runtime" + "sort" "sync" "time" + "unsafe" "github.com/thinkparq/beegfs-go/common/filesystem" "github.com/thinkparq/protobuf/go/beeremote" "github.com/thinkparq/protobuf/go/flex" "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" ) // JobBuilderClient is a special RST client that builders new job requests based on the information @@ -150,8 +153,8 @@ func (c *JobBuilderClient) SanitizeRemotePath(remotePath string) string { } // GetRemotePathInfo is not implemented and should never be called. -func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) { - return 0, time.Time{}, false, false, ErrUnsupportedOpForRST +func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error) { + return nil, ErrUnsupportedOpForRST } // GenerateExternalId is not implemented and should never be called. @@ -179,8 +182,66 @@ func (c *JobBuilderClient) executeJobBuilderRequest( isPathDir = err == nil && stat.IsDir() } - reschedule := false + var ( + // jobRequestsBuffer holds job requests that need to be sorted before streaming to the caller. + // In practice, a thousand job requests occupy tens to a few hundred megabytes contingent on + // string lengths. + jobRequestsBuffer []*beeremote.JobRequest + // jobRequestsBufferSize maintains the estimated byte size of buffered job requests. + jobRequestsBufferSize int64 + // jobRequestsBufferSizeThreshold forces the jobRequestsBuffer to be flushed; this is + // intended as a safety valve to ensure memory utilization remains controlled. This value + // should large enough to never be reached. + jobRequestsBufferSizeThreshold = int64(100 * 1024 * 1024) + // jobRequestsBufferMu mutex guards jobRequestsBuffer updates. + jobRequestsBufferMu = sync.Mutex{} + ) + + // submitJobRequests sends jobRequests to jobSubmissionChan unless skipBuffering is true. When + // buffering, requests with SortValues go into jobRequestsBuffer, which is sorted and flushed + // after all requests have been received. + // + // Only the parent context should be used to ensure the job requests are submitted and only failed + // when the caller's context is cancelled. + var submitJobRequests func(jobRequests []*beeremote.JobRequest, skipBuffering bool) (submitCount int, errorCount int) + submitJobRequests = func(jobRequests []*beeremote.JobRequest, skipBuffering bool) (submitCount int, errorCount int) { + for _, jobRequest := range jobRequests { + if !skipBuffering && len(jobRequest.SortValues) > 0 { + jobRequestsBufferMu.Lock() + jobRequestsBuffer = append(jobRequestsBuffer, jobRequest) + jobRequestsBufferSize += sizeOfJobRequest(jobRequest) + + // Sort and flush jobRequestsBuffer when memory usage exceeds threshold. This will + // block other processes attempting to defer jobRequests until complete. + if jobRequestsBufferSize >= jobRequestsBufferSizeThreshold { + sortJobRequests(jobRequestsBuffer) + submits, errors := submitJobRequests(jobRequestsBuffer, true) + errorCount += errors + submitCount += submits + jobRequestsBuffer = []*beeremote.JobRequest{} + jobRequestsBufferSize = 0 + } + jobRequestsBufferMu.Unlock() + continue + } + + status := jobRequest.GetGenerationStatus() + if status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) { + errorCount++ + } + select { + case <-ctx.Done(): + case jobSubmissionChan <- jobRequest: + submitCount++ + } + } + return + } + + g, gCtx := errgroup.WithContext(ctx) builderStateMu := sync.Mutex{} + reschedule := false + maxWorkers := runtime.GOMAXPROCS(0) walkDoneChan := make(chan struct{}, maxWorkers) defer close(walkDoneChan) @@ -188,10 +249,11 @@ func (c *JobBuilderClient) executeJobBuilderRequest( var err error var inMountPath string var remotePath string + for { select { - case <-ctx.Done(): - return ctx.Err() + case <-gCtx.Done(): + return gCtx.Err() case walkResp, ok := <-walkChan: if !ok { select { @@ -238,7 +300,7 @@ func (c *JobBuilderClient) executeJobBuilderRequest( } } - jobRequests, err := BuildJobRequests(ctx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg) + jobRequests, err := BuildJobRequests(gCtx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg) if err != nil { // BuildJobRequest should only return fatal errors, or if there are no RSTs // specified/configured on an entry and there is no other way to return the @@ -246,20 +308,9 @@ func (c *JobBuilderClient) executeJobBuilderRequest( return err } - errorCount := 0 - for _, jobRequest := range jobRequests { - status := jobRequest.GetGenerationStatus() - if status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) { - errorCount++ - } - select { - case <-ctx.Done(): - case jobSubmissionChan <- jobRequest: - } - } - + submitCount, errorCount := submitJobRequests(jobRequests, false) builderStateMu.Lock() - builder.Submitted += int32(len(jobRequests)) + builder.Submitted += int32(submitCount) builder.Errors += int32(errorCount) builderStateMu.Unlock() } @@ -269,14 +320,13 @@ func (c *JobBuilderClient) executeJobBuilderRequest( // (up to GOMAXPROCS) when the job submission channel stays near empty, indicating the consumer is // draining faster than we can fill it. This keeps throughput balanced without over saturating // the system. - g, ctx := errgroup.WithContext(ctx) g.Go(func() error { workers := 1 lowThresholdTicks := 0 g.Go(createJobRequests) for { select { - case <-ctx.Done(): + case <-gCtx.Done(): return nil case <-walkDoneChan: return nil @@ -300,9 +350,21 @@ func (c *JobBuilderClient) executeJobBuilderRequest( } } }) - if err := g.Wait(); err != nil { + err := g.Wait() + + // Sort and submit any remaining job requests. These requests need to be submitted regardless of + // any errors that may have occurred since the job request was successfully created. + if len(jobRequestsBuffer) > 0 { + sortJobRequests(jobRequestsBuffer) + submitCount, errorCount := submitJobRequests(jobRequestsBuffer, true) + builder.Submitted += int32(submitCount) + builder.Errors += int32(errorCount) + } + + if err != nil { return false, fmt.Errorf("job builder request was aborted: %w", err) } + if reschedule { return true, nil } @@ -333,6 +395,29 @@ func (c *JobBuilderClient) executeJobBuilderRequest( return false, nil } +func sortJobRequests(buffer []*beeremote.JobRequest) { + sort.SliceStable(buffer, func(i, j int) bool { + a := buffer[i].SortValues + b := buffer[j].SortValues + for k := 0; k < len(a) && k < len(b); k++ { + if a[k] == b[k] { + continue + } + return a[k] < b[k] + } + return len(a) < len(b) + }) +} + +// sizeOfJobRequest approximates the size of jobRequest. The actual heap usage may end up higher or +// lower than the estimate based on runtime overhead and whether heap usages were reused. +func sizeOfJobRequest(jobRequests *beeremote.JobRequest) int64 { + size := int64(proto.Size(jobRequests)) + size += int64(unsafe.Sizeof(*jobRequests)) + size += int64(len(jobRequests.SortValues)) * int64(unsafe.Sizeof("")) + return size +} + func walkLocalPathInsteadOfRemote(cfg *flex.JobRequestCfg) bool { return cfg.RemotePath == "" } diff --git a/common/rst/mock.go b/common/rst/mock.go index f404b4983..8c80fd4ab 100644 --- a/common/rst/mock.go +++ b/common/rst/mock.go @@ -120,8 +120,8 @@ func (r *MockClient) SanitizeRemotePath(remotePath string) string { return remotePath } -func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) { - return 0, time.Time{}, false, false, ErrUnsupportedOpForRST +func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error) { + return nil, ErrUnsupportedOpForRST } func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) { diff --git a/common/rst/rst.go b/common/rst/rst.go index ceaffa7cb..d2e13f49c 100644 --- a/common/rst/rst.go +++ b/common/rst/rst.go @@ -64,6 +64,14 @@ var SupportedRSTTypes = map[string]func() (any, any){ // Mock could be included here if it ever made sense to allow configuration using a file. } +type RemotePathInfo struct { + Size int64 + Mtime time.Time + IsArchived bool + IsArchiveRestoreAllowed bool + SortValues []string +} + type Provider interface { // GetJobRequest builds a provider-specific job request. GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest @@ -109,12 +117,12 @@ type Provider interface { GetWalk(ctx context.Context, path string, chanSize int, resumeToken string, maxRequests int) (<-chan *filesystem.StreamPathResult, error) // SanitizeRemotePath normalizes the remote path format for the provider. SanitizeRemotePath(remotePath string) string - // GetRemotePathInfo must return the remote file or object's size, last beegfs-mtime. + // GetRemotePathInfo must return the remote file or object's size and last beegfs-mtime. // // It is important for providers to maintain beegfs-mtime which is the file's last modification // time of the prior upload operation. Beegfs-mtime is used in conjunction with the file's size // to determine whether the file is sync. - GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, isArchived bool, isArchiveRestoreAllowed bool, err error) + GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error) // GenerateExternalId can be used to generate an identifier for remote operations. GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (externalId string, err error) // IsWorkRequestReady is used to indicate when the work request is ready and will be used to @@ -438,18 +446,22 @@ func BuildJobRequest(ctx context.Context, client Provider, mountPoint filesystem } } - remoteSize, remoteMtime, isArchived, isArchiveRestoreAllowed, err := client.GetRemotePathInfo(ctx, cfg) + remoteInfo, err := client.GetRemotePathInfo(ctx, cfg) if err != nil && (cfg.Download || !errors.Is(err, os.ErrNotExist)) { return getRequestWithFailedPrecondition(fmt.Sprintf("unable to retrieve remote path information: %s", err.Error())) } - if cfg.Download && isArchived && !isArchiveRestoreAllowed { + if cfg.Download && remoteInfo.IsArchived && !remoteInfo.IsArchiveRestoreAllowed { return getRequestWithFailedPrecondition(fmt.Sprintf("remote object is archived and restore is not permitted; rerun with --%s to continue", AllowRestoreFlag)) } - lockedInfo.SetRemoteSize(remoteSize) - lockedInfo.SetRemoteMtime(timestamppb.New(remoteMtime)) - lockedInfo.SetIsArchived(isArchived) + lockedInfo.SetRemoteSize(remoteInfo.Size) + lockedInfo.SetRemoteMtime(timestamppb.New(remoteInfo.Mtime)) + lockedInfo.SetIsArchived(remoteInfo.IsArchived) - return client.GetJobRequest(cfg) + request := client.GetJobRequest(cfg) + if len(remoteInfo.SortValues) > 0 { + request.SetSortValues(remoteInfo.SortValues) + } + return request } // updateRstConfig applies the RST configuration from the job request to the file entry by calling SetFileRstIds directly. diff --git a/common/rst/s3.go b/common/rst/s3.go index 514553b95..8b9daa1f1 100644 --- a/common/rst/s3.go +++ b/common/rst/s3.go @@ -9,10 +9,13 @@ import ( "fmt" "io" "io/fs" + "math" "net/url" "os" "path/filepath" + "regexp" "sort" + "strconv" "strings" "sync" "time" @@ -35,13 +38,19 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +type s3PlacementHint struct { + name string + hintType s3PlacementHintType +} + type S3StorageClass struct { - retrievalTier types.Tier - archival bool - retentionDays int32 // defines how long the retrieved object will be available in days. - checkTime time.Duration // defines retry time after initiating the restore. - recheckTime time.Duration // defines retry time when restore was previously initiated. - autoRestore bool // defines whether archived objects should be permitted to be restored. + retrievalTier types.Tier + archival bool + retentionDays int32 // defines how long the retrieved object will be available in days. + checkTime time.Duration // defines retry time after initiating the restore. + recheckTime time.Duration // defines retry time when restore was previously initiated. + autoRestore bool // defines whether archived objects should be permitted to be restored. + placementHints []s3PlacementHint // defines the metadata keys to sort by. } type S3Client struct { @@ -127,13 +136,25 @@ func newS3(ctx context.Context, rstConfig *flex.RemoteStorageTarget, mountPoint return nil, fmt.Errorf("storage class, %s, must specify recheckTime >= '1s'", name) } + var placementHints []s3PlacementHint + if archive.PlacementHints != "" { + for hint := range strings.SplitSeq(archive.PlacementHints, ",") { + s3PlacementHint, err := news3PlacementHint(hint) + if err != nil { + return nil, fmt.Errorf("storage class, %s, has invalid placement hint: %w", name, err) + } + placementHints = append(placementHints, s3PlacementHint) + } + } + s3Client.storageClasses[name] = S3StorageClass{ - retrievalTier: retrievalTier, - archival: true, - retentionDays: retentionsDays, - checkTime: checkTime, - recheckTime: recheckTime, - autoRestore: archive.GetAutoRestore(), + retrievalTier: retrievalTier, + archival: true, + retentionDays: retentionsDays, + checkTime: checkTime, + recheckTime: recheckTime, + autoRestore: archive.GetAutoRestore(), + placementHints: placementHints, } } @@ -279,7 +300,7 @@ func (r *S3Client) IsWorkRequestReady(ctx context.Context, request *flex.WorkReq lockedInfo := sync.GetLockedInfo() if sync.Operation == flex.SyncJob_DOWNLOAD && lockedInfo.IsArchived { - _, _, archiveStatus, err := r.getObjectMetadata(ctx, sync.RemotePath, true) + _, _, _, archiveStatus, err := r.getObjectMetadata(ctx, sync.RemotePath, true) if err != nil { return false, 0, err } @@ -586,16 +607,19 @@ func decodeResumeToken(s string) (s3ResumeToken, error) { return token, nil } -func (r *S3Client) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) { - remoteSize, remoteMtime, archiveStatus, err := r.getObjectMetadata(ctx, cfg.RemotePath, cfg.Download) - if archiveStatus == nil { - return remoteSize, remoteMtime, false, false, err +func (r *S3Client) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error) { + remoteSize, remoteMtime, sortValues, archiveStatus, err := r.getObjectMetadata(ctx, cfg.RemotePath, cfg.Download) + remotePathInfo := &RemotePathInfo{ + Size: remoteSize, + Mtime: remoteMtime, + SortValues: sortValues, } - isArchived := (*archiveStatus).IsArchived - isArchiveRestoreAllowed := (cfg.AllowRestore != nil && *cfg.AllowRestore) || (cfg.AllowRestore == nil && archiveStatus.Info.autoRestore) - - return remoteSize, remoteMtime, isArchived, isArchiveRestoreAllowed, err + if archiveStatus != nil { + remotePathInfo.IsArchived = archiveStatus.IsArchived + remotePathInfo.IsArchiveRestoreAllowed = (cfg.AllowRestore != nil && *cfg.AllowRestore) || (cfg.AllowRestore == nil && archiveStatus.Info.autoRestore) + } + return remotePathInfo, err } func (r *S3Client) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) { @@ -711,7 +735,7 @@ func (r *S3Client) completeSyncWorkRequests_Download(ctx context.Context, job *b request := job.GetRequest() sync := request.GetSync() - _, mtime, _, err := r.getObjectMetadata(ctx, sync.RemotePath, false) + _, mtime, _, _, err := r.getObjectMetadata(ctx, sync.RemotePath, false) if err != nil { return fmt.Errorf("unable to verify the remote object has not changed: %w", err) } @@ -821,16 +845,17 @@ func (r *S3Client) archiveStatus(storageClass types.StorageClass, restoreMsg *st } // getObjectMetadata returns the object's size in bytes, modification time if it exists. -func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExist bool) (int64, time.Time, *s3ArchiveInfo, error) { +func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExist bool) (int64, time.Time, []string, *s3ArchiveInfo, error) { if key == "" { if keyMustExist { - return 0, time.Time{}, nil, fmt.Errorf("unable to retrieve object metadata! --%s must be specified", RemotePathFlag) + return 0, time.Time{}, nil, nil, fmt.Errorf("unable to retrieve object metadata! --%s must be specified", RemotePathFlag) } - return 0, time.Time{}, nil, nil + return 0, time.Time{}, nil, nil, nil } + config := r.config.GetS3() headObjectInput := &s3.HeadObjectInput{ - Bucket: aws.String(r.config.GetS3().Bucket), + Bucket: aws.String(config.Bucket), Key: aws.String(key), } @@ -839,25 +864,34 @@ func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExi var apiErr smithy.APIError if errors.As(err, &apiErr) { if apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey" { - return 0, time.Time{}, nil, os.ErrNotExist + return 0, time.Time{}, nil, nil, os.ErrNotExist } } - return 0, time.Time{}, nil, err + return 0, time.Time{}, nil, nil, err } archivedStatus := r.archiveStatus(resp.StorageClass, resp.Restore) + var placementHintValues []string // contains the metadata values for each placement hint metadata keys. + for _, hint := range r.storageClasses[resp.StorageClass].placementHints { + value, ok := resp.Metadata[hint.name] + if !ok { + continue + } + placementHintValues = append(placementHintValues, hint.SortableKey(value)) + } + beegfsMtime, ok := resp.Metadata["beegfs-mtime"] if !ok { - return *resp.ContentLength, *resp.LastModified, archivedStatus, nil + return *resp.ContentLength, *resp.LastModified, placementHintValues, archivedStatus, nil } mtime, err := time.Parse(time.RFC3339, beegfsMtime) if err != nil { - return *resp.ContentLength, *resp.LastModified, archivedStatus, fmt.Errorf("unable to parse remote object's beegfs-mtime") + return *resp.ContentLength, *resp.LastModified, placementHintValues, archivedStatus, fmt.Errorf("unable to parse remote object's beegfs-mtime") } - return *resp.ContentLength, mtime, archivedStatus, nil + return *resp.ContentLength, mtime, placementHintValues, archivedStatus, nil } func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Time, metadata map[string]string, tagging *string, storageClass *string) (uploadID string, err error) { @@ -1066,3 +1100,77 @@ func (r *S3Client) recommendedSegments(fileSize int64) (int64, int32) { // consideration file size and number of workers for this RST type. return 4, 1 } + +type s3PlacementHintType int + +const ( + s3PlacementHintString s3PlacementHintType = 0 + s3PlacementHintInteger s3PlacementHintType = 1 + s3PlacementHintFloat s3PlacementHintType = 2 +) + +// s3PlacementHintRe permits the common metadata key nomenclature but is stricter than S3’s +// HTTP-token rules which are permitted which would include characters set [!#$%&'*+^`\|~]. +var s3PlacementHintRe = regexp.MustCompile(`^([a-zA-Z0-9_.-]+):(?:(str|int|float))$`) + +func news3PlacementHint(definition string) (s3PlacementHint, error) { + definition = strings.TrimSpace(definition) + definition = strings.ToLower(definition) + parts := s3PlacementHintRe.FindStringSubmatch(definition) + if len(parts) != 3 { + return s3PlacementHint{}, fmt.Errorf("invalid placement hint definition! Must be in the form :[str|int|float]") + } + + hint := s3PlacementHint{name: parts[1], hintType: s3PlacementHintString} + switch parts[2] { + case "str": + hint.hintType = s3PlacementHintString + case "int": + hint.hintType = s3PlacementHintInteger + case "float": + hint.hintType = s3PlacementHintFloat + default: + return s3PlacementHint{}, fmt.Errorf("invalid placement hint definition! Must be in the form :[str|int|float]") + } + + return hint, nil +} + +// SortableKey returns the value in a lexicographically sortable string based on the hint's type. If there's +// a numerical type parsing error occurs then the value will be returned. The original value will be +// returned if there's an error. +func (p s3PlacementHint) SortableKey(value string) string { + switch p.hintType { + case s3PlacementHintString: + return value + case s3PlacementHintInteger: + integer, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return value + } + // Convert the int64 into a lexicographically sortable hexadecimal based on the + // transformation of the unsigned 64-bit representation. The transformation simply flips the + // int64's sign bit to preserve the numerical order. + unsignedBits := uint64(integer) ^ (1 << 63) + return fmt.Sprintf("%016x", unsignedBits) + + case s3PlacementHintFloat: + float, err := strconv.ParseFloat(value, 64) + if err != nil { + return value + } + + // Convert the float into a lexicographically sortable hexadecimal based on the float's + // unsigned 64-bit integer representation. The bitwise operations below simply invert the + // negative values and move the positive values above them. + unsignedBits := math.Float64bits(float) + if unsignedBits&(1<<63) != 0 { + unsignedBits = ^unsignedBits + } else { + unsignedBits ^= 1 << 63 + } + return fmt.Sprintf("%016x", unsignedBits) + } + + return value +} diff --git a/common/rst/s3_test.go b/common/rst/s3_test.go index 107b1de29..eae3070f9 100644 --- a/common/rst/s3_test.go +++ b/common/rst/s3_test.go @@ -104,3 +104,123 @@ func TestCompleteRequests(t *testing.T) { err = testS3Client.CompleteWorkRequests(context.Background(), jobSyncInvalid, workResponses, true) assert.ErrorIs(t, err, ErrUnsupportedOpForRST) } + +func TestNewS3PlacementHint(t *testing.T) { + tests := []struct { + name string + definition string + wantName string + wantType s3PlacementHintType + wantErr bool + }{ + { + name: "StringType", + definition: "user:str", + wantName: "user", + wantType: s3PlacementHintString, + }, + { + name: "IntegerTypeTrimmedAndLowercased", + definition: " COUNT:INT ", + wantName: "count", + wantType: s3PlacementHintInteger, + }, + { + name: "FloatType", + definition: "ratio:float", + wantName: "ratio", + wantType: s3PlacementHintFloat, + }, + { + name: "InvalidDefinition", + definition: "missingType", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hint, err := news3PlacementHint(tt.definition) + + if tt.wantErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.wantName, hint.name) + assert.Equal(t, tt.wantType, hint.hintType) + }) + } +} + +func TestS3PlacementHintString(t *testing.T) { + tests := []struct { + name string + hintType s3PlacementHintType + value string + expected string + }{ + { + name: "StringPassthrough", + hintType: s3PlacementHintString, + value: "MiXeDCasE", + expected: "MiXeDCasE", + }, + { + name: "IntegerPositive", + hintType: s3PlacementHintInteger, + value: "10", + expected: "800000000000000a", + }, + { + name: "IntegerZero", + hintType: s3PlacementHintInteger, + value: "0", + expected: "8000000000000000", + }, + { + name: "IntegerNegative", + hintType: s3PlacementHintInteger, + value: "-10", + expected: "7ffffffffffffff6", + }, + { + name: "IntegerParseError", + hintType: s3PlacementHintInteger, + value: "not-an-int", + expected: "not-an-int", + }, + { + name: "FloatPositive", + hintType: s3PlacementHintFloat, + value: "1.5", + expected: "bff8000000000000", + }, + { + name: "FloatZero", + hintType: s3PlacementHintFloat, + value: "0", + expected: "8000000000000000", + }, + { + name: "FloatNegative", + hintType: s3PlacementHintFloat, + value: "-1.5", + expected: "4007ffffffffffff", + }, + { + name: "FloatParseError", + hintType: s3PlacementHintFloat, + value: "abc", + expected: "abc", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + hint := s3PlacementHint{hintType: test.hintType} + assert.Equal(t, test.expected, hint.SortableKey(test.value)) + }) + } +} diff --git a/ctl/pkg/ctl/rst/status.go b/ctl/pkg/ctl/rst/status.go index af0aa9443..95adc0b1a 100644 --- a/ctl/pkg/ctl/rst/status.go +++ b/ctl/pkg/ctl/rst/status.go @@ -491,7 +491,7 @@ func getPathStatusFromTarget( continue } - remoteSize, remoteMtime, _, _, err := client.GetRemotePathInfo(ctx, &flex.JobRequestCfg{Path: fsPath, RemotePath: client.SanitizeRemotePath(fsPath)}) + remoteInfo, err := client.GetRemotePathInfo(ctx, &flex.JobRequestCfg{Path: fsPath, RemotePath: client.SanitizeRemotePath(fsPath)}) if err != nil { if errors.Is(err, os.ErrNotExist) { result.SyncStatus = NotAttempted @@ -500,8 +500,8 @@ func getPathStatusFromTarget( } return nil, fmt.Errorf("unable to get remote resource info: %w", err) } - lockedInfo.SetRemoteSize(remoteSize) - lockedInfo.SetRemoteMtime(timestamppb.New(remoteMtime)) + lockedInfo.SetRemoteSize(remoteInfo.Size) + lockedInfo.SetRemoteMtime(timestamppb.New(remoteInfo.Mtime)) if rst.IsFileAlreadySynced(lockedInfo) { syncReason.WriteString(fmt.Sprintf("Target %d: File is synced based on the remote storage target.\n", tgt)) From 1fc7795839e534366e8db10cebd9677a9701a468 Mon Sep 17 00:00:00 2001 From: Nathan Swartz <112000252+swartzn@users.noreply.github.com> Date: Fri, 9 Jan 2026 12:49:40 -0600 Subject: [PATCH 2/2] feat(rst): add optional logging to JobBuilderClient Add warning message when builder's job request buffer exceeds memory utilization threshold. --- common/rst/builder.go | 15 ++++++++++++++- common/rst/jobrequest.go | 4 ++-- common/rst/store.go | 2 +- rst/remote/internal/workermgr/manager.go | 2 +- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/common/rst/builder.go b/common/rst/builder.go index ad8196b4d..c5ac60ce6 100644 --- a/common/rst/builder.go +++ b/common/rst/builder.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "path" "path/filepath" + "reflect" "runtime" "sort" "sync" @@ -14,6 +16,7 @@ import ( "github.com/thinkparq/beegfs-go/common/filesystem" "github.com/thinkparq/protobuf/go/beeremote" "github.com/thinkparq/protobuf/go/flex" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) @@ -22,15 +25,22 @@ import ( // provided via flex.JobRequestCfg. type JobBuilderClient struct { ctx context.Context + log *zap.Logger rstMap map[uint32]Provider mountPoint filesystem.Provider } var _ Provider = &JobBuilderClient{} -func NewJobBuilderClient(ctx context.Context, rstMap map[uint32]Provider, mountPoint filesystem.Provider) *JobBuilderClient { +func NewJobBuilderClient(ctx context.Context, log *zap.Logger, rstMap map[uint32]Provider, mountPoint filesystem.Provider) *JobBuilderClient { + if log == nil { + log = zap.NewNop() + } + log = log.With(zap.String("component", path.Base(reflect.TypeFor[JobBuilderClient]().PkgPath()))) + return &JobBuilderClient{ ctx: ctx, + log: log, rstMap: rstMap, mountPoint: mountPoint, } @@ -214,6 +224,9 @@ func (c *JobBuilderClient) executeJobBuilderRequest( // Sort and flush jobRequestsBuffer when memory usage exceeds threshold. This will // block other processes attempting to defer jobRequests until complete. if jobRequestsBufferSize >= jobRequestsBufferSizeThreshold { + c.log.Warn("exceeded job request buffer memory usage threshold", zap.Int("jobRequests", len(jobRequestsBuffer)), + zap.Int64("bufferSizeBytes", jobRequestsBufferSize), zap.Int64("thresholdBytes", jobRequestsBufferSizeThreshold)) + sortJobRequests(jobRequestsBuffer) submits, errors := submitJobRequests(jobRequestsBuffer, true) errorCount += errors diff --git a/common/rst/jobrequest.go b/common/rst/jobrequest.go index afc2aa891..62d9915bd 100644 --- a/common/rst/jobrequest.go +++ b/common/rst/jobrequest.go @@ -145,7 +145,7 @@ func prepareJobRequests(ctx context.Context, remote beeremote.BeeRemoteClient, c } if jobBuilder { - client := NewJobBuilderClient(ctx, nil, nil) + client := NewJobBuilderClient(ctx, nil, nil, nil) request := client.GetJobRequest(cfg) return []*beeremote.JobRequest{request}, nil } @@ -196,7 +196,7 @@ func prepareJobRequests(ctx context.Context, remote beeremote.BeeRemoteClient, c return []*beeremote.JobRequest{request}, nil } - client := NewJobBuilderClient(ctx, nil, nil) + client := NewJobBuilderClient(ctx, nil, nil, nil) request := client.GetJobRequest(cfg) return []*beeremote.JobRequest{request}, nil } diff --git a/common/rst/store.go b/common/rst/store.go index a511e10d5..a994289d6 100644 --- a/common/rst/store.go +++ b/common/rst/store.go @@ -86,7 +86,7 @@ func (s *ClientStore) UpdateConfig(ctx context.Context, rstConfigs []*flex.Remot } rstMap[config.Id] = rst } - rstMap[JobBuilderRstId] = NewJobBuilderClient(ctx, rstMap, s.mountPoint) + rstMap[JobBuilderRstId] = NewJobBuilderClient(ctx, nil, rstMap, s.mountPoint) s.clients = rstMap } return nil diff --git a/rst/remote/internal/workermgr/manager.go b/rst/remote/internal/workermgr/manager.go index 089df1baf..19f2587cb 100644 --- a/rst/remote/internal/workermgr/manager.go +++ b/rst/remote/internal/workermgr/manager.go @@ -91,7 +91,7 @@ func NewManager(ctx context.Context, log *zap.Logger, managerConfig Config, work rstMap[configId] = rst } - rstMap[rst.JobBuilderRstId] = rst.NewJobBuilderClient(ctx, rstMap, mountPoint) + rstMap[rst.JobBuilderRstId] = rst.NewJobBuilderClient(ctx, log, rstMap, mountPoint) nodePools := make(map[worker.Type]*Pool, 0) nodes, err := worker.NewWorkerNodesFromConfig(log, workerConfigs)