diff --git a/dojo/finding/deduplication.py b/dojo/finding/deduplication.py index c55d473a96..8410c979a5 100644 --- a/dojo/finding/deduplication.py +++ b/dojo/finding/deduplication.py @@ -764,3 +764,267 @@ def dedupe_batch_of_findings(findings, *args, **kwargs): else: deduplicationLogger.debug("dedupe: skipping dedupe because it's disabled in system settings get()") return None + + +# --------------------------------------------------------------------------- +# False-positive history helpers +# --------------------------------------------------------------------------- + + +def _fp_candidates_qs(scope_filter, dedup_alg, findings, exclude_ids=None): + """ + Build and return a lazy QuerySet of existing findings that could be FP matches + for the given list of findings under the specified algorithm and scope. + + Single source of truth for the algorithm dispatch, shared between + match_finding_to_existing_findings (returns the QS directly for chaining) and + _fetch_fp_candidates_for_batch (evaluates it into a keyed dict). + + For the legacy algorithm, exclude_ids is intentionally ignored — this matches + the original match_finding_to_existing_findings behaviour. + """ + if dedup_alg == "hash_code": + hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None)} + if not hash_codes: + return Finding.objects.none() + qs = Finding.objects.filter(**scope_filter, hash_code__in=hash_codes).exclude(hash_code=None) + if exclude_ids: + qs = qs.exclude(id__in=exclude_ids) + return qs.order_by("id") + + if dedup_alg == "unique_id_from_tool": + uids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None)} + if not uids: + return Finding.objects.none() + qs = Finding.objects.filter(**scope_filter, unique_id_from_tool__in=uids).exclude(unique_id_from_tool=None) + if exclude_ids: + qs = qs.exclude(id__in=exclude_ids) + return qs.order_by("id") + + if dedup_alg == "unique_id_from_tool_or_hash_code": + hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None)} + uids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None)} + if not hash_codes and not uids: + return Finding.objects.none() + cond = Q() + if hash_codes: + cond |= Q(hash_code__isnull=False, hash_code__in=hash_codes) + if uids: + cond |= Q(unique_id_from_tool__isnull=False, unique_id_from_tool__in=uids) + qs = Finding.objects.filter(Q(**scope_filter)).filter(cond) + if exclude_ids: + qs = qs.exclude(id__in=exclude_ids) + return qs.order_by("id") + + if dedup_alg == "legacy": + pairs = { + (f.title, f.severity, Finding.get_numerical_severity(f.severity)) + for f in findings + if getattr(f, "title", None) + } + if not pairs: + return Finding.objects.none() + cond = Q() + for title, severity, num_sev in pairs: + cond |= Q(title__iexact=title, severity=severity, numerical_severity=num_sev) + # Legacy does not exclude by id — matches the original match_finding_to_existing_findings behaviour. + return Finding.objects.filter(**scope_filter).filter(cond).order_by("id") + + logger.error( + "FALSE_POSITIVE_HISTORY: unexpected deduplication_algorithm '%s', returning empty candidates", + dedup_alg, + ) + return Finding.objects.none() + + +def _fetch_fp_candidates_for_batch(findings, product, dedup_alg): + """ + Fetch all existing findings in the product that could be FP matches for a batch, + returning a dict keyed by match identifier for in-memory lookup. + + For unique_id_from_tool_or_hash_code the return value is a tuple (by_uid, by_hash). + For all other algorithms it is a plain dict. + """ + scope_filter = {"test__engagement__product": product} + exclude_ids = {f.id for f in findings if f.id} + qs = _fp_candidates_qs(scope_filter, dedup_alg, findings, exclude_ids).only( + # Keep this list in sync with every field read from candidate objects in this function. + # Accessing a field not listed here causes Django to issue an extra SELECT per object, + # silently negating the .only() optimisation. + "id", "false_p", "active", "hash_code", "unique_id_from_tool", "title", "severity", + ) + + if dedup_alg == "unique_id_from_tool_or_hash_code": + by_hash: dict = {} + by_uid: dict = {} + for ef in qs: + if ef.hash_code: + by_hash.setdefault(ef.hash_code, []).append(ef) + if ef.unique_id_from_tool: + by_uid.setdefault(ef.unique_id_from_tool, []).append(ef) + return by_uid, by_hash + + if dedup_alg == "hash_code": + result: dict = {} + for ef in qs: + result.setdefault(ef.hash_code, []).append(ef) + return result + + if dedup_alg == "unique_id_from_tool": + result = {} + for ef in qs: + result.setdefault(ef.unique_id_from_tool, []).append(ef) + return result + + if dedup_alg == "legacy": + result = {} + for ef in qs: + result.setdefault((ef.title.lower(), ef.severity), []).append(ef) + return result + + return {} + + +def do_false_positive_history_batch(findings): + """ + Batch version of do_false_positive_history. + + Processes a list of findings from the same product in a single DB round-trip + rather than one query per finding. All findings are expected to share the + same test (i.e. same deduplication_algorithm and same product), which is + guaranteed by both callers (post_process_findings_batch and bulk-edit). + + Args: + findings: list of :model:`dojo.Finding` instances + + """ + if not findings: + return + + system_settings = System_Settings.objects.get() + + product = findings[0].test.engagement.product + dedup_alg = findings[0].test.deduplication_algorithm + + # Fetch all candidate existing findings with one DB query + candidates = _fetch_fp_candidates_for_batch(findings, product, dedup_alg) + + to_mark_as_fp_ids: set = set() + + for finding in findings: + # Resolve candidate list(s) for this finding + if dedup_alg == "unique_id_from_tool_or_hash_code": + by_uid, by_hash = candidates # type: ignore[misc] + uid_matches = by_uid.get(finding.unique_id_from_tool, []) if finding.unique_id_from_tool else [] + hash_matches = by_hash.get(finding.hash_code, []) if finding.hash_code else [] + # Deduplicate by id while preserving both uid and hash matches + seen: dict = {} + for ef in uid_matches + hash_matches: + seen.setdefault(ef.id, ef) + existing = list(seen.values()) + elif dedup_alg == "hash_code": + existing = candidates.get(finding.hash_code, []) if finding.hash_code else [] + elif dedup_alg == "unique_id_from_tool": + existing = candidates.get(finding.unique_id_from_tool, []) if finding.unique_id_from_tool else [] + elif dedup_alg == "legacy": + key = (finding.title.lower(), finding.severity) if finding.title else None + existing = candidates.get(key, []) if key else [] + else: + existing = [] + + existing_fps = [ef for ef in existing if ef.false_p] + + if existing_fps: + finding.false_p = True + if finding.id: + to_mark_as_fp_ids.add(finding.id) + + if system_settings.retroactive_false_positive_history and finding.false_p: + for ef in existing: + if ef.active and not ef.false_p: + to_mark_as_fp_ids.add(ef.id) + + if to_mark_as_fp_ids: + deduplicationLogger.debug( + "FALSE_POSITIVE_HISTORY (batch): marking %i finding(s) as false positive: %s", + len(to_mark_as_fp_ids), + sorted(to_mark_as_fp_ids), + ) + # QuerySet.update() bypasses Django signals — intentional, mimicking the previous + # super(Finding, find).save(skip_validation=True) calls that also skipped all post-save processing. + # Note: .only() does not constrain update() — Django generates the UPDATE SQL independently. + Finding.objects.filter(id__in=to_mark_as_fp_ids).update(false_p=True, active=False, verified=False) + + +def do_false_positive_history(finding, *args, **kwargs): + """ + Replicate false positives across product. + + Mark finding as false positive if the same finding was previously marked + as false positive in the same product, beyond that, retroactively mark + all equal findings in the product as false positive (if they weren't already). + The retroactively replication will be also trigerred if the finding passed as + an argument already is a false positive. With this feature we can assure that + on each call of this method all findings in the product complies to the rule + (if one finding is a false positive, all equal findings in the same product also are). + + Args: + finding (:model:`dojo.Finding`): Finding to be replicated + + """ + do_false_positive_history_batch([finding]) + + +def match_finding_to_existing_findings(finding, product=None, engagement=None, test=None): + """ + Customizable lookup that returns all existing findings for a given finding. + + Takes one finding as an argument and returns all findings that are equal to it + on the same product, engagement or test. For now, only one custom filter can + be used, so you should choose between product, engagement or test. + The lookup is done based on the deduplication_algorithm of the given finding test. + + Args: + finding (:model:`dojo.Finding`): Finding to be matched + product (:model:`dojo.Product`, optional): Product to filter findings by + engagement (:model:`dojo.Engagement`, optional): Engagement to filter findings by + test (:model:`dojo.Test`, optional): Test to filter findings by + + """ + if product: + custom_filter_type = "product" + custom_filter = {"test__engagement__product": product} + + elif engagement: + custom_filter_type = "engagement" + custom_filter = {"test__engagement": engagement} + + elif test: + custom_filter_type = "test" + custom_filter = {"test": test} + + else: + msg = "No product, engagement or test provided as argument." + raise ValueError(msg) + + deduplication_algorithm = finding.test.deduplication_algorithm + + deduplicationLogger.debug( + "Matching finding %i:%s to existing findings in %s %s using %s as deduplication algorithm.", + finding.id, finding.title, custom_filter_type, list(custom_filter.values())[0], deduplication_algorithm, + ) + + if deduplication_algorithm == "legacy": + # This is the legacy reimport behavior. Although it's pretty flawed and + # doesn't match the legacy algorithm for deduplication, this is left as is for simplicity. + # Re-writing the legacy deduplication here would be complicated and counter-productive. + # If you have use cases going through this section, you're advised to create a deduplication configuration for your parser + logger.debug("Legacy dedupe. In case of issue, you're advised to create a deduplication configuration in order not to go through this section") + + exclude_ids = {finding.id} if finding.id else set() + qs = _fp_candidates_qs(custom_filter, deduplication_algorithm, [finding], exclude_ids=exclude_ids) + + if deduplication_algorithm == "unique_id_from_tool_or_hash_code": + deduplicationLogger.debug(qs.query) + + return qs diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 9d44896bbc..f9551e767d 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -21,6 +21,8 @@ from dojo.finding.deduplication import ( dedupe_batch_of_findings, do_dedupe_finding_task_internal, + do_false_positive_history, + do_false_positive_history_batch, get_finding_models_for_deduplication, ) from dojo.jira_link.helper import is_keep_in_sync_with_jira @@ -46,7 +48,6 @@ from dojo.utils import ( calculate_grade, close_external_issue, - do_false_positive_history, get_current_user, get_object_or_none, mass_model_updater, @@ -501,8 +502,7 @@ def post_process_findings_batch( if system_settings.enable_deduplication: deduplicationLogger.warning("skipping false positive history because deduplication is also enabled") else: - for finding in findings: - do_false_positive_history(finding, *args, **kwargs) + do_false_positive_history_batch(findings) # Non-status changing tasks if issue_updater_option: diff --git a/dojo/finding/views.py b/dojo/finding/views.py index 607d3daf2c..87d35f1658 100644 --- a/dojo/finding/views.py +++ b/dojo/finding/views.py @@ -50,6 +50,11 @@ TestImportFilter, TestImportFindingActionFilter, ) +from dojo.finding.deduplication import ( + _fetch_fp_candidates_for_batch, + do_false_positive_history_batch, + match_finding_to_existing_findings, +) from dojo.finding.queries import get_authorized_findings, get_authorized_findings_for_queryset, prefetch_for_findings from dojo.forms import ( ApplyFindingTemplateForm, @@ -112,14 +117,12 @@ add_field_errors_to_response, add_success_message_to_response, calculate_grade, - do_false_positive_history, get_page_items, get_page_items_and_count, get_return_url, get_system_setting, get_visible_scan_types, get_words_for_field, - match_finding_to_existing_findings, process_tag_notifications, redirect, redirect_to_return_url_or_else, @@ -880,26 +883,27 @@ def process_mitigated_data(self, request: HttpRequest, finding: Finding, context status.last_modified = timezone.now() status.save() - def process_false_positive_history(self, finding: Finding): + def process_false_positive_history(self, finding: Finding, *, old_false_p: bool = False): if get_system_setting("false_positive_history", False): # If the finding is being marked as a false positive we dont need to call the - # fp history function because it will be called by the save function - # If finding was a false positive and is being reactivated: retroactively reactivates all equal findings - if finding.false_p and not finding.false_p and get_system_setting("retroactive_false_positive_history"): + # fp history function because it will be called by the save function. + # If finding was a false positive and is being reactivated: retroactively reactivates all equal findings. + # old_false_p must be captured before form.save(commit=False) mutates the finding in place. + if old_false_p and not finding.false_p and get_system_setting("retroactive_false_positive_history"): logger.debug("FALSE_POSITIVE_HISTORY: Reactivating existing findings based on: %s", finding) - - existing_fp_findings = match_finding_to_existing_findings( + # QuerySet.update() bypasses Django signals, which is intentional here — it mirrors + # the previous save_no_options() calls that also disabled all post-save processing. + # match_finding_to_existing_findings returns a lazy QS with no .only() applied, + # so any field can be added here without needing a corresponding .only() change in deduplication.py#_fetch_fp_candidates_for_batch. + match_finding_to_existing_findings( finding, product=finding.test.engagement.product, - ).filter(false_p=True) - - for fp in existing_fp_findings: - logger.debug("FALSE_POSITIVE_HISTORY: Reactivating false positive %i: %s", fp.id, fp) - fp.active = finding.active - fp.verified = finding.verified - fp.false_p = False - fp.out_of_scope = finding.out_of_scope - fp.is_mitigated = finding.is_mitigated - fp.save_no_options() + ).filter(false_p=True).update( + false_p=False, + active=finding.active, + verified=finding.verified, + out_of_scope=finding.out_of_scope, + is_mitigated=finding.is_mitigated, + ) def process_burp_request_response(self, finding: Finding, context: dict): if "request" in context["form"].cleaned_data or "response" in context["form"].cleaned_data: @@ -919,6 +923,9 @@ def process_burp_request_response(self, finding: Finding, context: dict): def process_finding_form(self, request: HttpRequest, finding: Finding, context: dict): if context["form"].is_valid(): # process some of the easy stuff first + # Capture false_p before form.save(commit=False) mutates the finding in place, + # so process_false_positive_history can detect a false-positive → active transition. + old_false_p = finding.false_p new_finding = context["form"].save(commit=False) new_finding.test = finding.test new_finding.numerical_severity = Finding.get_numerical_severity(new_finding.severity) @@ -950,7 +957,7 @@ def process_finding_form(self, request: HttpRequest, finding: Finding, context: endpoint_status.delete() # Handle some of the other steps self.process_mitigated_data(request, new_finding, context) - self.process_false_positive_history(new_finding) + self.process_false_positive_history(new_finding, old_false_p=old_false_p) self.process_burp_request_response(new_finding, context) # Save the vulnerability IDs finding_helper.save_vulnerability_ids(new_finding, context["form"].cleaned_data["vulnerability_ids"].split()) @@ -2758,6 +2765,10 @@ def _bulk_update_finding_status_and_severity(finds, form, request, system_settin actually_updated_count = 0 if form.cleaned_data["severity"] or form.cleaned_data["status"]: + # Accumulate findings for batched FP-history processing after the per-finding loop + fp_findings = [] # findings being marked as FP + reactivation_findings = [] # findings being un-FP'd (retroactive reactivation) + for find in finds: old_find = copy.deepcopy(find) @@ -2797,27 +2808,70 @@ def _bulk_update_finding_status_and_severity(finds, form, request, system_settin actually_updated_count += 1 if system_settings.false_positive_history: - # If finding is being marked as false positive if find.false_p: - do_false_positive_history(find) - - # If finding was a false positive and is being reactivated: retroactively reactivates all equal findings + fp_findings.append(find) elif old_find.false_p and not find.false_p: - if system_settings.retroactive_false_positive_history: - logger.debug("FALSE_POSITIVE_HISTORY: Reactivating existing findings based on: %s", find) - - existing_fp_findings = match_finding_to_existing_findings( - find, product=find.test.engagement.product, - ).filter(false_p=True) - - for fp in existing_fp_findings: - logger.debug("FALSE_POSITIVE_HISTORY: Reactivating false positive %i: %s", fp.id, fp) - fp.active = find.active - fp.verified = find.verified - fp.false_p = False - fp.out_of_scope = find.out_of_scope - fp.is_mitigated = find.is_mitigated - fp.save_no_options() + reactivation_findings.append(find) + + # --- Batch FP history: one DB query per (product, algorithm) group instead of one per finding --- + if system_settings.false_positive_history and fp_findings: + groups: dict = defaultdict(list) + for find in fp_findings: + groups[find.test.engagement.product_id, find.test.deduplication_algorithm].append(find) + for group_findings in groups.values(): + do_false_positive_history_batch(group_findings) + + # --- Batch retroactive reactivation --- + if ( + system_settings.false_positive_history + and system_settings.retroactive_false_positive_history + and reactivation_findings + ): + all_fp_ids_to_reactivate: set = set() + groups = defaultdict(list) + for find in reactivation_findings: + groups[find.test.engagement.product_id, find.test.deduplication_algorithm].append(find) + for (_, dedup_alg), group_findings in groups.items(): + product = group_findings[0].test.engagement.product + candidates = _fetch_fp_candidates_for_batch(group_findings, product, dedup_alg) + for find in group_findings: + if dedup_alg == "unique_id_from_tool_or_hash_code": + by_uid, by_hash = candidates + uid_matches = by_uid.get(find.unique_id_from_tool, []) if find.unique_id_from_tool else [] + hash_matches = by_hash.get(find.hash_code, []) if find.hash_code else [] + seen: dict = {} + for ef in uid_matches + hash_matches: + seen.setdefault(ef.id, ef) + existing = list(seen.values()) + elif dedup_alg == "hash_code": + existing = candidates.get(find.hash_code, []) if find.hash_code else [] + elif dedup_alg == "unique_id_from_tool": + existing = candidates.get(find.unique_id_from_tool, []) if find.unique_id_from_tool else [] + elif dedup_alg == "legacy": + lookup_key = (find.title.lower(), find.severity) if find.title else None + existing = candidates.get(lookup_key, []) if lookup_key else [] + else: + existing = [] + for ef in existing: + if ef.false_p: + all_fp_ids_to_reactivate.add(ef.id) + + if all_fp_ids_to_reactivate: + logger.debug( + "FALSE_POSITIVE_HISTORY: Reactivating %i finding(s): %s", + len(all_fp_ids_to_reactivate), + sorted(all_fp_ids_to_reactivate), + ) + # All reactivation findings received the same form values, so a single bulk update covers all. + # QuerySet.update() bypasses Django signals, which is intentional here — it mirrors + # the previous save_no_options() calls that also disabled all post-save processing. + Finding.objects.filter(id__in=all_fp_ids_to_reactivate).update( + false_p=False, + active=form.cleaned_data["active"], + verified=form.cleaned_data["verified"], + out_of_scope=form.cleaned_data["out_of_scope"], + is_mitigated=form.cleaned_data["is_mitigated"], + ) for prod in prods: calculate_grade(prod.id) diff --git a/dojo/utils.py b/dojo/utils.py index a5d8a13ed8..0a7e4f58db 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -95,154 +95,6 @@ def get_visible_scan_types(): return Test_Type.objects.filter(active=True) -def do_false_positive_history(finding, *args, **kwargs): - """ - Replicate false positives across product. - - Mark finding as false positive if the same finding was previously marked - as false positive in the same product, beyond that, retroactively mark - all equal findings in the product as false positive (if they weren't already). - The retroactively replication will be also trigerred if the finding passed as - an argument already is a false positive. With this feature we can assure that - on each call of this method all findings in the product complies to the rule - (if one finding is a false positive, all equal findings in the same product also are). - - Args: - finding (:model:`dojo.Finding`): Finding to be replicated - - """ - to_mark_as_fp = set() - - existing_findings = match_finding_to_existing_findings(finding, product=finding.test.engagement.product) - deduplicationLogger.debug( - "FALSE_POSITIVE_HISTORY: Found %i existing findings in the same product", - len(existing_findings), - ) - - existing_fp_findings = existing_findings.filter(false_p=True) - deduplicationLogger.debug( - ( - "FALSE_POSITIVE_HISTORY: Found %i existing findings in the same product " - "that were previously marked as false positive" - ), - len(existing_fp_findings), - ) - - if existing_fp_findings: - finding.false_p = True - to_mark_as_fp.add(finding) - - system_settings = System_Settings.objects.get() - if system_settings.retroactive_false_positive_history: - # Retroactively mark all active existing findings as false positive if this one - # is being (or already was) marked as a false positive - if finding.false_p: - existing_non_fp_findings = existing_findings.filter(active=True).exclude(false_p=True) - to_mark_as_fp.update(set(existing_non_fp_findings)) - - for find in to_mark_as_fp: - deduplicationLogger.debug( - "FALSE_POSITIVE_HISTORY: Marking Finding %i:%s from %s as false positive", - find.id, find.title, find.test.engagement, - ) - try: - find.false_p = True - find.active = False - find.verified = False - super(Finding, find).save(skip_validation=True, *args, **kwargs) - except Exception as e: - deduplicationLogger.debug(str(e)) - - -def match_finding_to_existing_findings(finding, product=None, engagement=None, test=None): - """ - Customizable lookup that returns all existing findings for a given finding. - - Takes one finding as an argument and returns all findings that are equal to it - on the same product, engagement or test. For now, only one custom filter can - be used, so you should choose between product, engagement or test. - The lookup is done based on the deduplication_algorithm of the given finding test. - - Args: - finding (:model:`dojo.Finding`): Finding to be matched - product (:model:`dojo.Product`, optional): Product to filter findings by - engagement (:model:`dojo.Engagement`, optional): Engagement to filter findings by - test (:model:`dojo.Test`, optional): Test to filter findings by - - """ - if product: - custom_filter_type = "product" - custom_filter = {"test__engagement__product": product} - - elif engagement: - custom_filter_type = "engagement" - custom_filter = {"test__engagement": engagement} - - elif test: - custom_filter_type = "test" - custom_filter = {"test": test} - - else: - msg = "No product, engagement or test provided as argument." - raise ValueError(msg) - - deduplication_algorithm = finding.test.deduplication_algorithm - - deduplicationLogger.debug( - "Matching finding %i:%s to existing findings in %s %s using %s as deduplication algorithm.", - finding.id, finding.title, custom_filter_type, list(custom_filter.values())[0], deduplication_algorithm, - ) - - if deduplication_algorithm == "hash_code": - return ( - Finding.objects.filter( - **custom_filter, - hash_code=finding.hash_code, - ).exclude(hash_code=None) - .exclude(id=finding.id) - .order_by("id") - ) - - if deduplication_algorithm == "unique_id_from_tool": - return ( - Finding.objects.filter( - **custom_filter, - unique_id_from_tool=finding.unique_id_from_tool, - ).exclude(unique_id_from_tool=None) - .exclude(id=finding.id) - .order_by("id") - ) - - if deduplication_algorithm == "unique_id_from_tool_or_hash_code": - query = Finding.objects.filter( - Q(**custom_filter), - ( - (Q(hash_code__isnull=False) & Q(hash_code=finding.hash_code)) - | (Q(unique_id_from_tool__isnull=False) & Q(unique_id_from_tool=finding.unique_id_from_tool)) - ), - ).exclude(id=finding.id).order_by("id") - deduplicationLogger.debug(query.query) - return query - - if deduplication_algorithm == "legacy": - # This is the legacy reimport behavior. Although it's pretty flawed and - # doesn't match the legacy algorithm for deduplication, this is left as is for simplicity. - # Re-writing the legacy deduplication here would be complicated and counter-productive. - # If you have use cases going through this section, you're advised to create a deduplication configuration for your parser - logger.debug("Legacy dedupe. In case of issue, you're advised to create a deduplication configuration in order not to go through this section") - return ( - Finding.objects.filter( - **custom_filter, - title__iexact=finding.title, - severity=finding.severity, - numerical_severity=Finding.get_numerical_severity(finding.severity), - ).order_by("id") - ) - - logger.error("Internal error: unexpected deduplication_algorithm: '%s' ", deduplication_algorithm) - return None - - def count_findings(findings: QuerySet) -> tuple[dict["Product", list[int]], dict[str, int]]: agg = ( findings.values(prod_id=F("test__engagement__product_id")) diff --git a/unittests/test_false_positive_history_logic.py b/unittests/test_false_positive_history_logic.py index 564aff8f0c..8748239bed 100644 --- a/unittests/test_false_positive_history_logic.py +++ b/unittests/test_false_positive_history_logic.py @@ -1,9 +1,12 @@ import logging from datetime import datetime +from unittest.mock import patch from crum import impersonate from django.conf import settings +from dojo.finding.deduplication import do_false_positive_history_batch +from dojo.finding.views import EditFinding from dojo.location.models import Location, LocationFindingReference from dojo.models import ( Endpoint, @@ -1654,6 +1657,162 @@ def test_fp_history_different_legacy_different_product(self): self.assert_finding(find_created_before_mark_diff_severity, false_p=False, not_pk=22, not_product_id=2, title=find_22.title, not_severity=find_22.severity) self.assert_finding(find_created_after_mark_diff_severity, false_p=False, not_pk=22, not_product_id=2, title=find_22.title, not_severity=find_22.severity) + # -------------------------------------------------------------------- # + # Batch function tests # + # -------------------------------------------------------------------- # + + def test_fp_history_batch_issues_single_candidate_query(self): + """do_false_positive_history_batch must call the candidate-fetch helper once for the whole batch.""" + # Create two copies of finding 2 in the same test (hash_code algorithm). + find_a, _f = self.copy_and_reset_finding(find_id=2) + find_a.save() + find_b, _f = self.copy_and_reset_finding(find_id=2) + find_b.save() + + # Mark finding 2 as FP so the batch function has something to match against. + find_2 = Finding.objects.get(id=2) + find_2.false_p = True + find_2.active = False + find_2.verified = False + find_2.save() + + batch = [Finding.objects.get(id=find_a.id), Finding.objects.get(id=find_b.id)] + + with patch("dojo.finding.deduplication._fetch_fp_candidates_for_batch", wraps=__import__("dojo.finding.deduplication", fromlist=["_fetch_fp_candidates_for_batch"])._fetch_fp_candidates_for_batch) as mock_fetch: + # 7 queries regardless of batch size: + # 1 System_Settings SELECT + # 4 lazy-load chain: findings[0].test / .engagement / .product / .test_type + # 1 candidates SELECT (with .only()) + # 1 bulk UPDATE + with self.assertNumQueries(7): + do_false_positive_history_batch(batch) + # One candidate-fetch call for the whole batch — not one per finding. + self.assertEqual(mock_fetch.call_count, 1, "Expected exactly one call to _fetch_fp_candidates_for_batch") + + # Functional check: both findings should now be marked as FP. + self.assert_finding(find_a, false_p=True) + self.assert_finding(find_b, false_p=True) + + def test_fp_history_batch_retroactive_marks_existing_active_fp(self): + """do_false_positive_history_batch retroactively marks pre-existing active findings as FP.""" + # Create a finding before the batch import so it pre-exists. + find_pre, _f = self.copy_and_reset_finding(find_id=2) + find_pre.save() + self.assert_finding(find_pre, false_p=False) + + # Simulate an incoming batch finding that already carries false_p=True + # (e.g. because the scanner reported it as a FP). + find_incoming, _f = self.copy_and_reset_finding(find_id=2) + find_incoming.false_p = True + find_incoming.active = False + find_incoming.save() + + batch = [Finding.objects.get(id=find_incoming.id)] + # 7 queries regardless of how many findings are retroactively marked: + # 1 System_Settings SELECT + # 4 lazy-load chain: findings[0].test / .engagement / .product / .test_type + # 1 candidates SELECT (with .only()) + # 1 bulk UPDATE + with self.assertNumQueries(7): + do_false_positive_history_batch(batch) + + # The pre-existing active finding must now be retroactively marked FP. + self.assert_finding(find_pre, false_p=True) + + def test_fp_history_batch_query_count_does_not_grow_with_affected_findings(self): + """ + Query count must stay flat (7) no matter how many findings are retroactively marked. + + With the old per-finding approach this would have been 7 + N queries where N is the + number of pre-existing findings that get marked as FP. With the batch approach it is + always 7: System_Settings, 4 lazy-load chain, candidates SELECT, one bulk UPDATE. + """ + NUM_PRE_EXISTING = 5 + + # Create several pre-existing active findings with the same hash_code. + pre_existing = [] + for _ in range(NUM_PRE_EXISTING): + find, _f = self.copy_and_reset_finding(find_id=2) + find.save() + pre_existing.append(find) + + # Incoming batch finding already carries false_p=True — triggers retroactive marking. + find_incoming, _f = self.copy_and_reset_finding(find_id=2) + find_incoming.false_p = True + find_incoming.active = False + find_incoming.save() + + batch = [Finding.objects.get(id=find_incoming.id)] + # 7 queries regardless of NUM_PRE_EXISTING: + # 1 System_Settings SELECT + # 4 lazy-load chain: findings[0].test / .engagement / .product / .test_type + # 1 candidates SELECT (with .only()) + # 1 bulk UPDATE covering all retroactively marked findings + with self.assertNumQueries(7): + do_false_positive_history_batch(batch) + + # All pre-existing findings must now be marked as FP. + for find in pre_existing: + self.assert_finding(find, false_p=True) + + # -------------------------------------------------------------------- # + # Single-finding edit: retroactive reactivation (was dead code pre-fix) # + # -------------------------------------------------------------------- # + + def test_process_false_positive_history_reactivation(self): + """EditFinding.process_false_positive_history reactivates FP matches when old_false_p=True.""" + # Set up a known-FP finding and a pre-existing match that is also FP. + find_2 = Finding.objects.get(id=2) + find_2.false_p = True + find_2.active = False + find_2.verified = False + find_2.save() + + find_match, _f = self.copy_and_reset_finding(find_id=2) + find_match.false_p = True + find_match.active = False + find_match.verified = False + find_match.save() + + # Now simulate unmarking find_2 as FP (same as a user editing the finding). + find_2.false_p = False + find_2.active = True + find_2.verified = True + find_2.save() + + # old_false_p=True reflects the state BEFORE form.save(commit=False). + view = EditFinding() + view.process_false_positive_history(find_2, old_false_p=True) + + # The matching finding that was FP should now be reactivated. + find_match.refresh_from_db() + self.assertFalse(find_match.false_p) + self.assertEqual(find_match.active, find_2.active) + self.assertEqual(find_match.verified, find_2.verified) + + def test_process_false_positive_history_no_reactivation_without_old_false_p(self): + """EditFinding.process_false_positive_history must not reactivate when old_false_p is False.""" + find_2 = Finding.objects.get(id=2) + find_2.false_p = True + find_2.active = False + find_2.save() + + find_match, _f = self.copy_and_reset_finding(find_id=2) + find_match.false_p = True + find_match.active = False + find_match.save() + + find_2.false_p = False + find_2.active = True + find_2.save() + + view = EditFinding() + # old_false_p defaults to False — reactivation must NOT fire. + view.process_false_positive_history(find_2) + + find_match.refresh_from_db() + self.assertTrue(find_match.false_p, "Match should remain FP when old_false_p=False") + # --------------- # # Utility Methods # # --------------- #