Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 1 addition & 35 deletions pkg/services/policer/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand All @@ -43,8 +42,7 @@ func (p *Policer) processECPart(ctx context.Context, addr oid.Address, parent oi
return
}

p.tryScheduleCheckECPartsTask(ctx, addr.Container(), parent, rule, addr.Object(), pi)

p.checkECParts(ctx, addr.Container(), parent, rule, pi.RuleIndex, pi.Index, addr.Object())
p.processECPartByRule(ctx, rule, addr, pi.Index, nodeLists[pi.RuleIndex])
}

Expand Down Expand Up @@ -144,38 +142,6 @@ func (x *singleReplication) SubmitSuccessfulReplication(node netmap.NodeInfo) {
x.netAddresses = slices.Collect(node.NetworkEndpoints())
}

func (p *Policer) tryScheduleCheckECPartsTask(ctx context.Context, cnr cid.ID, parent oid.ID, rule iec.Rule, localPartID oid.ID, localPartInfo iec.PartInfo) {
p.checkECPartsProgressMtx.Lock()
defer p.checkECPartsProgressMtx.Unlock()

addr := oid.NewAddress(cnr, parent)
if _, ok := p.checkECPartsProgressMap[addr]; ok {
return
}

err := p.checkECPartsWorkerPool.Submit(func() {
defer func() {
p.checkECPartsProgressMtx.Lock()
delete(p.checkECPartsProgressMap, addr)
p.checkECPartsProgressMtx.Unlock()
}()

p.checkECParts(ctx, cnr, parent, rule, localPartInfo.RuleIndex, localPartInfo.Index, localPartID)
})
if err != nil {
if errors.Is(err, ants.ErrPoolOverload) {
p.log.Info("pool of workers for EC parts checking is full, skip the task",
zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("local_part_id", localPartID))
return
}

p.log.Warn("unexpected error returned from pool of workers for EC part checking", zap.Error(err))
return
}

p.checkECPartsProgressMap[addr] = struct{}{}
}

func (p *Policer) checkECParts(ctx context.Context, cnr cid.ID, parent oid.ID, rule iec.Rule, ruleIdx, localPartIdx int, localPartID oid.ID) {
parentAddr := oid.NewAddress(cnr, parent)

Expand Down
17 changes: 2 additions & 15 deletions pkg/services/policer/policer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package policer

import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
Expand All @@ -18,7 +17,6 @@ import (
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,10 +54,6 @@ type Policer struct {
hadPlacementMismatch atomic.Bool

signer neofscrypto.Signer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this removal be a problem? in policer, it was added for a certain reason: #1410

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant checkECPartsProgressMtx, GH links a strange line

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means having at least two parts of the same object on the same node. Not the most probable scenario normally, only temporary in case of object redistribution. Then also these objects should have sufficiently large common OID prefix to appear in the same batch taken from metabase which can also happen, but just not very likely.

checkECPartsProgressMtx sync.Mutex
checkECPartsProgressMap map[oid.Address]struct{}
checkECPartsWorkerPool *ants.Pool
}

// Option is an option for Policer constructor.
Expand Down Expand Up @@ -147,11 +141,6 @@ func defaultCfg() *cfg {

// New creates, initializes and returns Policer instance.
func New(signer neofscrypto.Signer, opts ...Option) *Policer {
checkECPartsWorkerPool, err := ants.NewPool(100, ants.WithNonblocking(true))
if err != nil {
panic(fmt.Errorf("ants.NewPool: %w", err))
}

c := defaultCfg()

for i := range opts {
Expand All @@ -161,10 +150,8 @@ func New(signer neofscrypto.Signer, opts ...Option) *Policer {
c.log = c.log.With(zap.String("component", "Object Policer"))

return &Policer{
cfg: c,
signer: signer,
checkECPartsProgressMap: make(map[oid.Address]struct{}, checkECPartsWorkerPool.Cap()),
checkECPartsWorkerPool: checkECPartsWorkerPool,
cfg: c,
signer: signer,
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/services/policer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

func (p *Policer) Run(ctx context.Context) {
defer func() {
p.checkECPartsWorkerPool.Release()
p.log.Info("routine stopped")
}()

Expand Down
Loading