From 1e464250d2dec74600b195dfc3b00b263ab84bdc Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 31 Mar 2026 22:37:28 +0300 Subject: [PATCH] policer: drop ants pool for EC checks Same reasons as in 4bcea7d3458e6552fdfbb66e222038e02bab4d10. Signed-off-by: Roman Khimov --- pkg/services/policer/ec.go | 36 +-------------------------------- pkg/services/policer/policer.go | 17 ++-------------- pkg/services/policer/process.go | 1 - 3 files changed, 3 insertions(+), 51 deletions(-) diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index 8cbcbf9195..198d8412b6 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -17,7 +17,6 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -43,8 +42,7 @@ func (p *Policer) processECPart(ctx context.Context, addr oid.Address, parent oi return } - p.tryScheduleCheckECPartsTask(ctx, addr.Container(), parent, rule, addr.Object(), pi) - + p.checkECParts(ctx, addr.Container(), parent, rule, pi.RuleIndex, pi.Index, addr.Object()) p.processECPartByRule(ctx, rule, addr, pi.Index, nodeLists[pi.RuleIndex]) } @@ -144,38 +142,6 @@ func (x *singleReplication) SubmitSuccessfulReplication(node netmap.NodeInfo) { x.netAddresses = slices.Collect(node.NetworkEndpoints()) } -func (p *Policer) tryScheduleCheckECPartsTask(ctx context.Context, cnr cid.ID, parent oid.ID, rule iec.Rule, localPartID oid.ID, localPartInfo iec.PartInfo) { - p.checkECPartsProgressMtx.Lock() - defer p.checkECPartsProgressMtx.Unlock() - - addr := oid.NewAddress(cnr, parent) - if _, ok := p.checkECPartsProgressMap[addr]; ok { - return - } - - err := p.checkECPartsWorkerPool.Submit(func() { - defer func() { - p.checkECPartsProgressMtx.Lock() - delete(p.checkECPartsProgressMap, addr) - p.checkECPartsProgressMtx.Unlock() - }() - - p.checkECParts(ctx, cnr, parent, rule, localPartInfo.RuleIndex, localPartInfo.Index, localPartID) - }) - if err != nil { - if errors.Is(err, ants.ErrPoolOverload) { - p.log.Info("pool of workers for EC parts checking is full, skip the task", - zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("local_part_id", localPartID)) - return - } - - p.log.Warn("unexpected error returned from pool of workers for EC part checking", zap.Error(err)) - return - } - - p.checkECPartsProgressMap[addr] = struct{}{} -} - func (p *Policer) checkECParts(ctx context.Context, cnr cid.ID, parent oid.ID, rule iec.Rule, ruleIdx, localPartIdx int, localPartID oid.ID) { parentAddr := oid.NewAddress(cnr, parent) diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index b6c0885d16..d0c5202e9b 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -2,7 +2,6 @@ package policer import ( "context" - "fmt" "io" "sync" "sync/atomic" @@ -18,7 +17,6 @@ import ( netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -56,10 +54,6 @@ type Policer struct { hadPlacementMismatch atomic.Bool signer neofscrypto.Signer - - checkECPartsProgressMtx sync.Mutex - checkECPartsProgressMap map[oid.Address]struct{} - checkECPartsWorkerPool *ants.Pool } // Option is an option for Policer constructor. @@ -147,11 +141,6 @@ func defaultCfg() *cfg { // New creates, initializes and returns Policer instance. func New(signer neofscrypto.Signer, opts ...Option) *Policer { - checkECPartsWorkerPool, err := ants.NewPool(100, ants.WithNonblocking(true)) - if err != nil { - panic(fmt.Errorf("ants.NewPool: %w", err)) - } - c := defaultCfg() for i := range opts { @@ -161,10 +150,8 @@ func New(signer neofscrypto.Signer, opts ...Option) *Policer { c.log = c.log.With(zap.String("component", "Object Policer")) return &Policer{ - cfg: c, - signer: signer, - checkECPartsProgressMap: make(map[oid.Address]struct{}, checkECPartsWorkerPool.Cap()), - checkECPartsWorkerPool: checkECPartsWorkerPool, + cfg: c, + signer: signer, } } diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index c50bfc43f7..1601737a73 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -18,7 +18,6 @@ import ( func (p *Policer) Run(ctx context.Context) { defer func() { - p.checkECPartsWorkerPool.Release() p.log.Info("routine stopped") }()