Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions dojo/finding/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,267 @@ def dedupe_batch_of_findings(findings, *args, **kwargs):
else:
deduplicationLogger.debug("dedupe: skipping dedupe because it's disabled in system settings get()")
return None


# ---------------------------------------------------------------------------
# False-positive history helpers
# ---------------------------------------------------------------------------


def _fp_candidates_qs(scope_filter, dedup_alg, findings, exclude_ids=None):
"""
Build and return a lazy QuerySet of existing findings that could be FP matches
for the given list of findings under the specified algorithm and scope.

Single source of truth for the algorithm dispatch, shared between
match_finding_to_existing_findings (returns the QS directly for chaining) and
_fetch_fp_candidates_for_batch (evaluates it into a keyed dict).

For the legacy algorithm, exclude_ids is intentionally ignored — this matches
the original match_finding_to_existing_findings behaviour.
"""
if dedup_alg == "hash_code":
hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None)}
if not hash_codes:
return Finding.objects.none()
qs = Finding.objects.filter(**scope_filter, hash_code__in=hash_codes).exclude(hash_code=None)
if exclude_ids:
qs = qs.exclude(id__in=exclude_ids)
return qs.order_by("id")

if dedup_alg == "unique_id_from_tool":
uids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None)}
if not uids:
return Finding.objects.none()
qs = Finding.objects.filter(**scope_filter, unique_id_from_tool__in=uids).exclude(unique_id_from_tool=None)
if exclude_ids:
qs = qs.exclude(id__in=exclude_ids)
return qs.order_by("id")

if dedup_alg == "unique_id_from_tool_or_hash_code":
hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None)}
uids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None)}
if not hash_codes and not uids:
return Finding.objects.none()
cond = Q()
if hash_codes:
cond |= Q(hash_code__isnull=False, hash_code__in=hash_codes)
if uids:
cond |= Q(unique_id_from_tool__isnull=False, unique_id_from_tool__in=uids)
qs = Finding.objects.filter(Q(**scope_filter)).filter(cond)
if exclude_ids:
qs = qs.exclude(id__in=exclude_ids)
return qs.order_by("id")

if dedup_alg == "legacy":
pairs = {
(f.title, f.severity, Finding.get_numerical_severity(f.severity))
for f in findings
if getattr(f, "title", None)
}
if not pairs:
return Finding.objects.none()
cond = Q()
for title, severity, num_sev in pairs:
cond |= Q(title__iexact=title, severity=severity, numerical_severity=num_sev)
# Legacy does not exclude by id — matches the original match_finding_to_existing_findings behaviour.
return Finding.objects.filter(**scope_filter).filter(cond).order_by("id")

logger.error(
"FALSE_POSITIVE_HISTORY: unexpected deduplication_algorithm '%s', returning empty candidates",
dedup_alg,
)
return Finding.objects.none()


def _fetch_fp_candidates_for_batch(findings, product, dedup_alg):
"""
Fetch all existing findings in the product that could be FP matches for a batch,
returning a dict keyed by match identifier for in-memory lookup.

For unique_id_from_tool_or_hash_code the return value is a tuple (by_uid, by_hash).
For all other algorithms it is a plain dict.
"""
scope_filter = {"test__engagement__product": product}
exclude_ids = {f.id for f in findings if f.id}
qs = _fp_candidates_qs(scope_filter, dedup_alg, findings, exclude_ids).only(
# Keep this list in sync with every field read from candidate objects in this function.
# Accessing a field not listed here causes Django to issue an extra SELECT per object,
# silently negating the .only() optimisation.
"id", "false_p", "active", "hash_code", "unique_id_from_tool", "title", "severity",
)

if dedup_alg == "unique_id_from_tool_or_hash_code":
by_hash: dict = {}
by_uid: dict = {}
for ef in qs:
if ef.hash_code:
by_hash.setdefault(ef.hash_code, []).append(ef)
if ef.unique_id_from_tool:
by_uid.setdefault(ef.unique_id_from_tool, []).append(ef)
return by_uid, by_hash

if dedup_alg == "hash_code":
result: dict = {}
for ef in qs:
result.setdefault(ef.hash_code, []).append(ef)
return result

if dedup_alg == "unique_id_from_tool":
result = {}
for ef in qs:
result.setdefault(ef.unique_id_from_tool, []).append(ef)
return result

if dedup_alg == "legacy":
result = {}
for ef in qs:
result.setdefault((ef.title.lower(), ef.severity), []).append(ef)
return result

return {}


def do_false_positive_history_batch(findings):
"""
Batch version of do_false_positive_history.

Processes a list of findings from the same product in a single DB round-trip
rather than one query per finding. All findings are expected to share the
same test (i.e. same deduplication_algorithm and same product), which is
guaranteed by both callers (post_process_findings_batch and bulk-edit).

Args:
findings: list of :model:`dojo.Finding` instances

"""
if not findings:
return

system_settings = System_Settings.objects.get()

product = findings[0].test.engagement.product
dedup_alg = findings[0].test.deduplication_algorithm

# Fetch all candidate existing findings with one DB query
candidates = _fetch_fp_candidates_for_batch(findings, product, dedup_alg)

to_mark_as_fp_ids: set = set()

for finding in findings:
# Resolve candidate list(s) for this finding
if dedup_alg == "unique_id_from_tool_or_hash_code":
by_uid, by_hash = candidates # type: ignore[misc]
uid_matches = by_uid.get(finding.unique_id_from_tool, []) if finding.unique_id_from_tool else []
hash_matches = by_hash.get(finding.hash_code, []) if finding.hash_code else []
# Deduplicate by id while preserving both uid and hash matches
seen: dict = {}
for ef in uid_matches + hash_matches:
seen.setdefault(ef.id, ef)
existing = list(seen.values())
elif dedup_alg == "hash_code":
existing = candidates.get(finding.hash_code, []) if finding.hash_code else []
elif dedup_alg == "unique_id_from_tool":
existing = candidates.get(finding.unique_id_from_tool, []) if finding.unique_id_from_tool else []
elif dedup_alg == "legacy":
key = (finding.title.lower(), finding.severity) if finding.title else None
existing = candidates.get(key, []) if key else []
else:
existing = []

existing_fps = [ef for ef in existing if ef.false_p]

if existing_fps:
finding.false_p = True
if finding.id:
to_mark_as_fp_ids.add(finding.id)

if system_settings.retroactive_false_positive_history and finding.false_p:
for ef in existing:
if ef.active and not ef.false_p:
to_mark_as_fp_ids.add(ef.id)

if to_mark_as_fp_ids:
deduplicationLogger.debug(
"FALSE_POSITIVE_HISTORY (batch): marking %i finding(s) as false positive: %s",
len(to_mark_as_fp_ids),
sorted(to_mark_as_fp_ids),
)
# QuerySet.update() bypasses Django signals — intentional, mimicking the previous
# super(Finding, find).save(skip_validation=True) calls that also skipped all post-save processing.
# Note: .only() does not constrain update() — Django generates the UPDATE SQL independently.
Finding.objects.filter(id__in=to_mark_as_fp_ids).update(false_p=True, active=False, verified=False)


def do_false_positive_history(finding, *args, **kwargs):
"""
Replicate false positives across product.

Mark finding as false positive if the same finding was previously marked
as false positive in the same product, beyond that, retroactively mark
all equal findings in the product as false positive (if they weren't already).
The retroactively replication will be also trigerred if the finding passed as
an argument already is a false positive. With this feature we can assure that
on each call of this method all findings in the product complies to the rule
(if one finding is a false positive, all equal findings in the same product also are).

Args:
finding (:model:`dojo.Finding`): Finding to be replicated

"""
do_false_positive_history_batch([finding])


def match_finding_to_existing_findings(finding, product=None, engagement=None, test=None):
"""
Customizable lookup that returns all existing findings for a given finding.

Takes one finding as an argument and returns all findings that are equal to it
on the same product, engagement or test. For now, only one custom filter can
be used, so you should choose between product, engagement or test.
The lookup is done based on the deduplication_algorithm of the given finding test.

Args:
finding (:model:`dojo.Finding`): Finding to be matched
product (:model:`dojo.Product`, optional): Product to filter findings by
engagement (:model:`dojo.Engagement`, optional): Engagement to filter findings by
test (:model:`dojo.Test`, optional): Test to filter findings by

"""
if product:
custom_filter_type = "product"
custom_filter = {"test__engagement__product": product}

elif engagement:
custom_filter_type = "engagement"
custom_filter = {"test__engagement": engagement}

elif test:
custom_filter_type = "test"
custom_filter = {"test": test}

else:
msg = "No product, engagement or test provided as argument."
raise ValueError(msg)

deduplication_algorithm = finding.test.deduplication_algorithm

deduplicationLogger.debug(
"Matching finding %i:%s to existing findings in %s %s using %s as deduplication algorithm.",
finding.id, finding.title, custom_filter_type, list(custom_filter.values())[0], deduplication_algorithm,
)

if deduplication_algorithm == "legacy":
# This is the legacy reimport behavior. Although it's pretty flawed and
# doesn't match the legacy algorithm for deduplication, this is left as is for simplicity.
# Re-writing the legacy deduplication here would be complicated and counter-productive.
# If you have use cases going through this section, you're advised to create a deduplication configuration for your parser
logger.debug("Legacy dedupe. In case of issue, you're advised to create a deduplication configuration in order not to go through this section")

exclude_ids = {finding.id} if finding.id else set()
qs = _fp_candidates_qs(custom_filter, deduplication_algorithm, [finding], exclude_ids=exclude_ids)

if deduplication_algorithm == "unique_id_from_tool_or_hash_code":
deduplicationLogger.debug(qs.query)

return qs
6 changes: 3 additions & 3 deletions dojo/finding/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from dojo.finding.deduplication import (
dedupe_batch_of_findings,
do_dedupe_finding_task_internal,
do_false_positive_history,
do_false_positive_history_batch,
get_finding_models_for_deduplication,
)
from dojo.jira_link.helper import is_keep_in_sync_with_jira
Expand All @@ -46,7 +48,6 @@
from dojo.utils import (
calculate_grade,
close_external_issue,
do_false_positive_history,
get_current_user,
get_object_or_none,
mass_model_updater,
Expand Down Expand Up @@ -501,8 +502,7 @@ def post_process_findings_batch(
if system_settings.enable_deduplication:
deduplicationLogger.warning("skipping false positive history because deduplication is also enabled")
else:
for finding in findings:
do_false_positive_history(finding, *args, **kwargs)
do_false_positive_history_batch(findings)

# Non-status changing tasks
if issue_updater_option:
Expand Down
Loading