Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 121 additions & 23 deletions common/rst/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,43 @@
"context"
"errors"
"fmt"
"path"
"path/filepath"
"reflect"
"runtime"
"sort"
"sync"
"time"
"unsafe"

"github.com/thinkparq/beegfs-go/common/filesystem"
"github.com/thinkparq/protobuf/go/beeremote"
"github.com/thinkparq/protobuf/go/flex"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)

// JobBuilderClient is a special RST client that builders new job requests based on the information
// provided via flex.JobRequestCfg.
type JobBuilderClient struct {
ctx context.Context
log *zap.Logger
rstMap map[uint32]Provider
mountPoint filesystem.Provider
}

var _ Provider = &JobBuilderClient{}

func NewJobBuilderClient(ctx context.Context, rstMap map[uint32]Provider, mountPoint filesystem.Provider) *JobBuilderClient {
func NewJobBuilderClient(ctx context.Context, log *zap.Logger, rstMap map[uint32]Provider, mountPoint filesystem.Provider) *JobBuilderClient {
if log == nil {
log = zap.NewNop()
}
log = log.With(zap.String("component", path.Base(reflect.TypeFor[JobBuilderClient]().PkgPath())))

return &JobBuilderClient{
ctx: ctx,
log: log,
rstMap: rstMap,
mountPoint: mountPoint,
}
Expand Down Expand Up @@ -150,8 +163,8 @@
}

// GetRemotePathInfo is not implemented and should never be called.
func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) {
return 0, time.Time{}, false, false, ErrUnsupportedOpForRST
func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error) {
return nil, ErrUnsupportedOpForRST
}

// GenerateExternalId is not implemented and should never be called.
Expand Down Expand Up @@ -179,19 +192,81 @@
isPathDir = err == nil && stat.IsDir()
}

reschedule := false
var (
// jobRequestsBuffer holds job requests that need to be sorted before streaming to the caller.
// In practice, a thousand job requests occupy tens to a few hundred megabytes contingent on
// string lengths.
jobRequestsBuffer []*beeremote.JobRequest
// jobRequestsBufferSize maintains the estimated byte size of buffered job requests.
jobRequestsBufferSize int64
// jobRequestsBufferSizeThreshold forces the jobRequestsBuffer to be flushed; this is
// intended as a safety valve to ensure memory utilization remains controlled. This value
// should large enough to never be reached.
jobRequestsBufferSizeThreshold = int64(100 * 1024 * 1024)
// jobRequestsBufferMu mutex guards jobRequestsBuffer updates.
jobRequestsBufferMu = sync.Mutex{}
)

// submitJobRequests sends jobRequests to jobSubmissionChan unless skipBuffering is true. When
// buffering, requests with SortValues go into jobRequestsBuffer, which is sorted and flushed
// after all requests have been received.
//
// Only the parent context should be used to ensure the job requests are submitted and only failed
// when the caller's context is cancelled.
var submitJobRequests func(jobRequests []*beeremote.JobRequest, skipBuffering bool) (submitCount int, errorCount int)
submitJobRequests = func(jobRequests []*beeremote.JobRequest, skipBuffering bool) (submitCount int, errorCount int) {
for _, jobRequest := range jobRequests {
if !skipBuffering && len(jobRequest.SortValues) > 0 {

Check failure on line 219 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

jobRequest.SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)

Check failure on line 219 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

jobRequest.SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)
jobRequestsBufferMu.Lock()
jobRequestsBuffer = append(jobRequestsBuffer, jobRequest)
jobRequestsBufferSize += sizeOfJobRequest(jobRequest)

// Sort and flush jobRequestsBuffer when memory usage exceeds threshold. This will
// block other processes attempting to defer jobRequests until complete.
if jobRequestsBufferSize >= jobRequestsBufferSizeThreshold {
c.log.Warn("exceeded job request buffer memory usage threshold", zap.Int("jobRequests", len(jobRequestsBuffer)),
zap.Int64("bufferSizeBytes", jobRequestsBufferSize), zap.Int64("thresholdBytes", jobRequestsBufferSizeThreshold))

sortJobRequests(jobRequestsBuffer)
submits, errors := submitJobRequests(jobRequestsBuffer, true)
errorCount += errors
submitCount += submits
jobRequestsBuffer = []*beeremote.JobRequest{}
jobRequestsBufferSize = 0
}
jobRequestsBufferMu.Unlock()
continue
}

status := jobRequest.GetGenerationStatus()
if status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) {
errorCount++
}
select {
case <-ctx.Done():
case jobSubmissionChan <- jobRequest:
submitCount++
}
}
return
}

g, gCtx := errgroup.WithContext(ctx)
builderStateMu := sync.Mutex{}
reschedule := false

maxWorkers := runtime.GOMAXPROCS(0)
walkDoneChan := make(chan struct{}, maxWorkers)
defer close(walkDoneChan)
createJobRequests := func() error {
var err error
var inMountPath string
var remotePath string

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-gCtx.Done():
return gCtx.Err()
case walkResp, ok := <-walkChan:
if !ok {
select {
Expand Down Expand Up @@ -238,28 +313,17 @@
}
}

jobRequests, err := BuildJobRequests(ctx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg)
jobRequests, err := BuildJobRequests(gCtx, c.rstMap, c.mountPoint, inMountPath, remotePath, cfg)
if err != nil {
// BuildJobRequest should only return fatal errors, or if there are no RSTs
// specified/configured on an entry and there is no other way to return the
// error other then aborting the builder job entirely.
return err
}

errorCount := 0
for _, jobRequest := range jobRequests {
status := jobRequest.GetGenerationStatus()
if status != nil && (status.State == beeremote.JobRequest_GenerationStatus_ERROR || status.State == beeremote.JobRequest_GenerationStatus_FAILED_PRECONDITION) {
errorCount++
}
select {
case <-ctx.Done():
case jobSubmissionChan <- jobRequest:
}
}

submitCount, errorCount := submitJobRequests(jobRequests, false)
builderStateMu.Lock()
builder.Submitted += int32(len(jobRequests))
builder.Submitted += int32(submitCount)
builder.Errors += int32(errorCount)
builderStateMu.Unlock()
}
Expand All @@ -269,14 +333,13 @@
// (up to GOMAXPROCS) when the job submission channel stays near empty, indicating the consumer is
// draining faster than we can fill it. This keeps throughput balanced without over saturating
// the system.
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
workers := 1
lowThresholdTicks := 0
g.Go(createJobRequests)
for {
select {
case <-ctx.Done():
case <-gCtx.Done():
return nil
case <-walkDoneChan:
return nil
Expand All @@ -300,9 +363,21 @@
}
}
})
if err := g.Wait(); err != nil {
err := g.Wait()

// Sort and submit any remaining job requests. These requests need to be submitted regardless of
// any errors that may have occurred since the job request was successfully created.
if len(jobRequestsBuffer) > 0 {
sortJobRequests(jobRequestsBuffer)
submitCount, errorCount := submitJobRequests(jobRequestsBuffer, true)
builder.Submitted += int32(submitCount)
builder.Errors += int32(errorCount)
}

if err != nil {
return false, fmt.Errorf("job builder request was aborted: %w", err)
}

if reschedule {
return true, nil
}
Expand Down Expand Up @@ -333,6 +408,29 @@
return false, nil
}

func sortJobRequests(buffer []*beeremote.JobRequest) {
sort.SliceStable(buffer, func(i, j int) bool {
a := buffer[i].SortValues

Check failure on line 413 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

buffer[i].SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)

Check failure on line 413 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

buffer[i].SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)
b := buffer[j].SortValues

Check failure on line 414 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

buffer[j].SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)

Check failure on line 414 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

buffer[j].SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)
for k := 0; k < len(a) && k < len(b); k++ {
if a[k] == b[k] {
continue
}
return a[k] < b[k]
}
return len(a) < len(b)
})
}

// sizeOfJobRequest approximates the size of jobRequest. The actual heap usage may end up higher or
// lower than the estimate based on runtime overhead and whether heap usages were reused.
func sizeOfJobRequest(jobRequests *beeremote.JobRequest) int64 {
size := int64(proto.Size(jobRequests))
size += int64(unsafe.Sizeof(*jobRequests))
size += int64(len(jobRequests.SortValues)) * int64(unsafe.Sizeof(""))

Check failure on line 430 in common/rst/builder.go

View workflow job for this annotation

GitHub Actions / checks

jobRequests.SortValues undefined (type *beeremote.JobRequest has no field or method SortValues)
return size
}

func walkLocalPathInsteadOfRemote(cfg *flex.JobRequestCfg) bool {
return cfg.RemotePath == ""
}
4 changes: 2 additions & 2 deletions common/rst/jobrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func prepareJobRequests(ctx context.Context, remote beeremote.BeeRemoteClient, c
}

if jobBuilder {
client := NewJobBuilderClient(ctx, nil, nil)
client := NewJobBuilderClient(ctx, nil, nil, nil)
request := client.GetJobRequest(cfg)
return []*beeremote.JobRequest{request}, nil
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func prepareJobRequests(ctx context.Context, remote beeremote.BeeRemoteClient, c
return []*beeremote.JobRequest{request}, nil
}

client := NewJobBuilderClient(ctx, nil, nil)
client := NewJobBuilderClient(ctx, nil, nil, nil)
request := client.GetJobRequest(cfg)
return []*beeremote.JobRequest{request}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions common/rst/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func (r *MockClient) SanitizeRemotePath(remotePath string) string {
return remotePath
}

func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, bool, bool, error) {
return 0, time.Time{}, false, false, ErrUnsupportedOpForRST
func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error) {
return nil, ErrUnsupportedOpForRST
}

func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) {
Expand Down
28 changes: 20 additions & 8 deletions common/rst/rst.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@
// Mock could be included here if it ever made sense to allow configuration using a file.
}

type RemotePathInfo struct {
Size int64
Mtime time.Time
IsArchived bool
IsArchiveRestoreAllowed bool
SortValues []string
}

type Provider interface {
// GetJobRequest builds a provider-specific job request.
GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest
Expand Down Expand Up @@ -109,12 +117,12 @@
GetWalk(ctx context.Context, path string, chanSize int, resumeToken string, maxRequests int) (<-chan *filesystem.StreamPathResult, error)
// SanitizeRemotePath normalizes the remote path format for the provider.
SanitizeRemotePath(remotePath string) string
// GetRemotePathInfo must return the remote file or object's size, last beegfs-mtime.
// GetRemotePathInfo must return the remote file or object's size and last beegfs-mtime.
//
// It is important for providers to maintain beegfs-mtime which is the file's last modification
// time of the prior upload operation. Beegfs-mtime is used in conjunction with the file's size
// to determine whether the file is sync.
GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, isArchived bool, isArchiveRestoreAllowed bool, err error)
GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (*RemotePathInfo, error)
// GenerateExternalId can be used to generate an identifier for remote operations.
GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (externalId string, err error)
// IsWorkRequestReady is used to indicate when the work request is ready and will be used to
Expand Down Expand Up @@ -438,18 +446,22 @@
}
}

remoteSize, remoteMtime, isArchived, isArchiveRestoreAllowed, err := client.GetRemotePathInfo(ctx, cfg)
remoteInfo, err := client.GetRemotePathInfo(ctx, cfg)
if err != nil && (cfg.Download || !errors.Is(err, os.ErrNotExist)) {
return getRequestWithFailedPrecondition(fmt.Sprintf("unable to retrieve remote path information: %s", err.Error()))
}
if cfg.Download && isArchived && !isArchiveRestoreAllowed {
if cfg.Download && remoteInfo.IsArchived && !remoteInfo.IsArchiveRestoreAllowed {
return getRequestWithFailedPrecondition(fmt.Sprintf("remote object is archived and restore is not permitted; rerun with --%s to continue", AllowRestoreFlag))
}
lockedInfo.SetRemoteSize(remoteSize)
lockedInfo.SetRemoteMtime(timestamppb.New(remoteMtime))
lockedInfo.SetIsArchived(isArchived)
lockedInfo.SetRemoteSize(remoteInfo.Size)
lockedInfo.SetRemoteMtime(timestamppb.New(remoteInfo.Mtime))
lockedInfo.SetIsArchived(remoteInfo.IsArchived)

return client.GetJobRequest(cfg)
request := client.GetJobRequest(cfg)
if len(remoteInfo.SortValues) > 0 {
request.SetSortValues(remoteInfo.SortValues)

Check failure on line 462 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

request.SetSortValues undefined (type *beeremote.JobRequest has no field or method SetSortValues)
}
return request
}

// updateRstConfig applies the RST configuration from the job request to the file entry by calling SetFileRstIds directly.
Expand Down
Loading
Loading