From 18f34fc6ef7497ed704167cb8c1457246d10d6c4 Mon Sep 17 00:00:00 2001 From: Jack Langley Date: Wed, 1 Apr 2026 15:20:50 -0500 Subject: [PATCH 1/2] feat(secops): add get_stats aggregation tool with unit tests Adds a new `get_stats` MCP tool that executes Chronicle UDM stats/aggregation queries (| stats pipeline), returning structured columns and rows. Supports the same flexible time-range parameters (start_time, end_time, hours_back) as the existing search tools via the shared parse_time_range utility. Also adds: - Unit tests covering time parsing, parameter forwarding, error handling, and API failure recovery for get_stats - Integration tests verifying the response contract and numeric type casting of count() values against a live Chronicle instance --- server/secops/secops_mcp/tools/__init__.py | 1 + server/secops/secops_mcp/tools/stats.py | 172 ++++++++++++++++++ server/secops/tests/test_secops_mcp.py | 69 +++++++ server/secops/tests/test_secops_tools_unit.py | 112 +++++++++++- 4 files changed, 351 insertions(+), 3 deletions(-) create mode 100644 server/secops/secops_mcp/tools/stats.py diff --git a/server/secops/secops_mcp/tools/__init__.py b/server/secops/secops_mcp/tools/__init__.py index 1b16e316..a9875f62 100644 --- a/server/secops/secops_mcp/tools/__init__.py +++ b/server/secops/secops_mcp/tools/__init__.py @@ -25,6 +25,7 @@ from .rule_exclusions import * from .search import * from .security_alerts import * +from .stats import * from .security_events import * from .security_rules import * from .threat_intel import * diff --git a/server/secops/secops_mcp/tools/stats.py b/server/secops/secops_mcp/tools/stats.py new file mode 100644 index 00000000..8381ec26 --- /dev/null +++ b/server/secops/secops_mcp/tools/stats.py @@ -0,0 +1,172 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Security Operations MCP tools for Chronicle stats/aggregation queries.""" + +import logging +from typing import Any, Dict, Optional + +from secops_mcp.server import get_chronicle_client, server +from secops_mcp.utils import parse_time_range + +logger = logging.getLogger("secops-mcp") + + +@server.tool() +async def get_stats( + query: str, + hours_back: int = 24, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + max_values: int = 60, + max_events: int = 10000, + case_insensitive: bool = True, + timeout: int = 120, + project_id: str = None, + customer_id: str = None, + region: str = None, +) -> Dict[str, Any]: + """Run a stats/aggregation query against Chronicle UDM events. + + Executes a Chronicle UDM query containing a `| stats` pipeline operator and + returns aggregated results as structured columns and rows. Stats queries are + significantly faster than full event fetches because they return summary data + only — ideal for long-tail analysis, frequency counts, top-N rankings, and + time-bucketed roll-ups. + + **When to use get_stats vs search_udm:** + - Use `get_stats` when you need counts, aggregations, or summaries (e.g., top + source IPs, event frequency by user, distinct process names per host). + - Use `search_udm` when you need the raw event details themselves. + + **Stats Query Syntax:** + Stats queries use the standard UDM filter syntax followed by a `| stats` + pipeline. The stats operator supports: + - `count()` — total number of matching events + - `count_distinct(field)` — unique values of a field + - `min(field)`, `max(field)`, `sum(field)`, `avg(field)` — numeric aggregates + - `array_distinct(field)` — collect unique values into a list + - `by field1, field2` — group results by one or more fields + - `| sort column desc` — sort results + - `| head N` — limit to top N rows + + **Example Queries:** + ``` + # Top 10 source IPs by login failure count + metadata.event_type = "USER_LOGIN" AND security_result.action = "BLOCK" + | stats count() as failures by principal.ip + | sort failures desc + | head 10 + + # Unique users per hostname over the window + | stats count_distinct(principal.user.userid) as unique_users by target.hostname + + # Process execution frequency (LOLBin hunting) + metadata.event_type = "PROCESS_LAUNCH" + | stats count() as launches, array_distinct(principal.hostname) as hosts + by target.process.file.full_path + | sort launches desc + + # Event volume bucketed by type + | stats count() as total by metadata.event_type | sort total desc + ``` + + **Workflow Integration:** + - Use to triage alerts by frequency before deep-diving into raw events. + - Identify rare/anomalous values (long-tail) — low-count rows in `count() by field`. + - Scope the blast radius of an incident (how many hosts? how many users?). + - Feed results into `search_udm` with specific values to get the full event context. + + Args: + query (str): UDM query with a `| stats` operator. Filter predicates before + the pipe are optional — an empty filter returns stats over all + events in the time window. + hours_back (int): Hours to look back from now when start_time is not given. + Defaults to 24. + start_time (Optional[str]): Start of time range in ISO 8601 format + (e.g. "2024-01-15T00:00:00Z"). Overrides hours_back. + end_time (Optional[str]): End of time range in ISO 8601 format. Defaults to now. + max_values (int): Maximum number of aggregated rows (buckets) to return. + Defaults to 60. Increase for high-cardinality group-by fields. + max_events (int): Maximum raw events scanned during aggregation. Defaults to + 10000. Increase for more complete results on busy environments. + case_insensitive (bool): Whether string comparisons are case-insensitive. + Defaults to True. + timeout (int): Request timeout in seconds. Defaults to 120. Increase for + complex queries over large time windows. + project_id (Optional[str]): Google Cloud project ID. Defaults to env config. + customer_id (Optional[str]): Chronicle customer ID. Defaults to env config. + region (Optional[str]): Chronicle region (e.g. "us", "europe"). Defaults to env config. + + Returns: + Dict[str, Any]: A dictionary with: + - "columns" (List[str]): Ordered list of column names from the stats query. + - "rows" (List[Dict]): Each row is a dict mapping column name to its value. + Values are typed: int, float, str, datetime, list, or None. + - "total_rows" (int): Number of rows returned. + Returns {"error": str, "columns": [], "rows": [], "total_rows": 0} on failure. + + Example Output: + { + "columns": ["principal.ip", "failures"], + "rows": [ + {"principal.ip": "10.1.2.3", "failures": 412}, + {"principal.ip": "192.168.0.55", "failures": 87} + ], + "total_rows": 2 + } + + Next Steps (using MCP-enabled tools): + - Pivot on high-count rows by passing specific values to `search_udm` for + full event context. + - Use `lookup_entity` on suspicious IPs or hostnames surfaced by stats. + - Build detection rules targeting aggregation patterns identified here. + - Export raw matching events with `export_udm_search_csv` for the top offenders. + """ + try: + try: + start_dt, end_dt = parse_time_range(start_time, end_time, hours_back) + except ValueError as e: + logger.error(f"Error parsing date format: {str(e)}", exc_info=True) + return { + "error": f"Error parsing date format: {str(e)}. Use ISO 8601 format (e.g., 2024-01-15T12:00:00Z)", + "columns": [], + "rows": [], + "total_rows": 0, + } + + logger.info( + f"Running stats query - Query: {query}, " + f"Time Range: {start_dt} to {end_dt}, max_values: {max_values}" + ) + + chronicle = get_chronicle_client(project_id, customer_id, region) + + results = chronicle.get_stats( + query=query, + start_time=start_dt, + end_time=end_dt, + max_values=max_values, + timeout=timeout, + max_events=max_events, + case_insensitive=case_insensitive, + ) + + total_rows = results.get("total_rows", 0) + logger.info(f"Stats query returned {total_rows} rows across {len(results.get('columns', []))} columns") + + return results + + except Exception as e: + logger.error(f"Error running stats query: {str(e)}", exc_info=True) + return {"error": str(e), "columns": [], "rows": [], "total_rows": 0} diff --git a/server/secops/tests/test_secops_mcp.py b/server/secops/tests/test_secops_mcp.py index 05ef0fbf..0dce67b3 100644 --- a/server/secops/tests/test_secops_mcp.py +++ b/server/secops/tests/test_secops_mcp.py @@ -36,6 +36,7 @@ from secops_mcp.tools.ioc_matches import get_ioc_matches from secops_mcp.tools.threat_intel import get_threat_intel from secops_mcp.tools.search import search_udm +from secops_mcp.tools.stats import get_stats from secops_mcp.tools.udm_search import ( export_udm_search_csv, find_udm_field_values, @@ -801,6 +802,74 @@ async def test_find_udm_field_values( first_value = result["fieldValues"][0] assert isinstance(first_value, dict) + @pytest.mark.asyncio + async def test_get_stats_basic(self, chronicle_config: Dict[str, str]) -> None: + """Test that get_stats returns a structured aggregation result from Chronicle. + + Runs a stats count by event type over the last 24 hours. We cannot assert + specific row values (data varies per environment), but we verify the response + contract: correct keys, typed values, no error field, and row/total_rows + consistency. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await get_stats( + query="| stats count() as event_count by metadata.event_type", + hours_back=24, + max_values=20, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert "error" not in result, f"get_stats returned an error: {result.get('error')}" + assert "columns" in result + assert "rows" in result + assert "total_rows" in result + assert isinstance(result["columns"], list) + assert isinstance(result["rows"], list) + assert isinstance(result["total_rows"], int) + # Row list length must match total_rows — the secops-wrapper contract + assert result["total_rows"] == len(result["rows"]) + + if result["rows"]: + sample_row = result["rows"][0] + assert isinstance(sample_row, dict) + for col in result["columns"]: + assert col in sample_row, f"Column '{col}' missing from row: {sample_row}" + + @pytest.mark.asyncio + async def test_get_stats_count_values_are_numeric( + self, chronicle_config: Dict[str, str] + ) -> None: + """Verify count() values are returned as int/float, not strings. + + The Chronicle API returns int64Val inside a JSON string. The secops-wrapper + must cast it to int. If this casting breaks, downstream consumers (sorting, + thresholding, alerting logic) will silently produce wrong results. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await get_stats( + query="| stats count() as total by metadata.event_type", + hours_back=24, + max_values=5, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert "error" not in result, f"get_stats returned an error: {result.get('error')}" + + for row in result["rows"]: + count_value = row.get("total") + if count_value is not None: + assert isinstance(count_value, (int, float)), ( + f"Expected numeric count, got {type(count_value).__name__}: {count_value!r}" + ) + @pytest.mark.asyncio async def test_service_account_authentication( self, chronicle_config: Dict[str, str] diff --git a/server/secops/tests/test_secops_tools_unit.py b/server/secops/tests/test_secops_tools_unit.py index 8add9f11..60b12f4f 100644 --- a/server/secops/tests/test_secops_tools_unit.py +++ b/server/secops/tests/test_secops_tools_unit.py @@ -45,6 +45,7 @@ def wrapper(func): from secops_mcp.tools.search import search_udm from secops_mcp.tools.udm_search import export_udm_search_csv from secops_mcp.tools.security_events import search_security_events +from secops_mcp.tools.stats import get_stats @pytest.fixture def mock_chronicle_client(): @@ -53,13 +54,22 @@ def mock_chronicle_client(): client.search_udm.return_value = {"total_events": 0, "events": []} client.fetch_udm_search_csv.return_value = {"csv": {"row": []}} client.translate_nl_to_udm.return_value = "metadata.event_type = 'USER_LOGIN'" + client.get_stats.return_value = { + "columns": ["metadata.event_type", "count"], + "rows": [ + {"metadata.event_type": "USER_LOGIN", "count": 150}, + {"metadata.event_type": "NETWORK_CONNECTION", "count": 42}, + ], + "total_rows": 2, + } return client @pytest.fixture def mock_get_client(mock_chronicle_client): with patch('secops_mcp.tools.search.get_chronicle_client', return_value=mock_chronicle_client) as m1, \ patch('secops_mcp.tools.udm_search.get_chronicle_client', return_value=mock_chronicle_client) as m2, \ - patch('secops_mcp.tools.security_events.get_chronicle_client', return_value=mock_chronicle_client) as m3: + patch('secops_mcp.tools.security_events.get_chronicle_client', return_value=mock_chronicle_client) as m3, \ + patch('secops_mcp.tools.stats.get_chronicle_client', return_value=mock_chronicle_client) as m4: yield mock_chronicle_client @pytest.mark.asyncio @@ -230,7 +240,7 @@ async def test_start_after_end(mock_get_client): async def test_export_csv_invalid_date(mock_get_client): """Test that export_udm_search_csv returns error string on invalid date.""" invalid_date = "yesterday" - + result = await export_udm_search_csv( query="test", fields=["test"], @@ -238,7 +248,103 @@ async def test_export_csv_invalid_date(mock_get_client): project_id="test", customer_id="test" ) - + assert isinstance(result, str) assert "Error parsing date format" in result assert "yesterday" in result + + +# --------------------------------------------------------------------------- +# Stats tool tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_stats_returns_structured_result(mock_get_client): + """Stats results contain typed columns, rows, and total_rows.""" + result = await get_stats( + query="| stats count() as count by metadata.event_type", + hours_back=24, + project_id="test", + customer_id="test", + ) + + assert "error" not in result, f"Unexpected error: {result.get('error')}" + assert result["total_rows"] == 2 + assert result["columns"] == ["metadata.event_type", "count"] + assert len(result["rows"]) == 2 + # Verify row values are typed correctly (int, not string) + login_row = next(r for r in result["rows"] if r["metadata.event_type"] == "USER_LOGIN") + assert login_row["count"] == 150 + + +@pytest.mark.asyncio +async def test_get_stats_passes_correct_time_range(mock_get_client): + """Explicit ISO start/end times are parsed and forwarded as datetime objects.""" + result = await get_stats( + query="| stats count() by metadata.event_type", + start_time="2024-03-01T00:00:00Z", + end_time="2024-03-02T00:00:00Z", + project_id="test", + customer_id="test", + ) + + assert "error" not in result + call_args = mock_get_client.get_stats.call_args + _, kwargs = call_args + assert isinstance(kwargs["start_time"], datetime) + assert kwargs["start_time"].year == 2024 + assert kwargs["start_time"].month == 3 + assert kwargs["start_time"].day == 1 + assert isinstance(kwargs["end_time"], datetime) + assert kwargs["end_time"].day == 2 + + +@pytest.mark.asyncio +async def test_get_stats_passes_max_values_param(mock_get_client): + """max_values is forwarded to the underlying chronicle client.""" + await get_stats( + query="| stats count() by principal.ip", + max_values=100, + project_id="test", + customer_id="test", + ) + + call_kwargs = mock_get_client.get_stats.call_args[1] + assert call_kwargs["max_values"] == 100 + + +@pytest.mark.asyncio +async def test_get_stats_invalid_date_returns_error(mock_get_client): + """Invalid ISO date returns error dict — not an exception.""" + result = await get_stats( + query="| stats count() by metadata.event_type", + start_time="not-a-date", + project_id="test", + customer_id="test", + ) + + assert "error" in result + assert "Error parsing date format" in result["error"] + assert result["columns"] == [] + assert result["rows"] == [] + assert result["total_rows"] == 0 + # Chronicle client must NOT have been called on a bad date + mock_get_client.get_stats.assert_not_called() + + +@pytest.mark.asyncio +async def test_get_stats_api_error_returns_error_dict(mock_get_client): + """A runtime exception from the Chronicle client surfaces as an error dict.""" + mock_get_client.get_stats.side_effect = Exception("Chronicle API unavailable") + + result = await get_stats( + query="| stats count() by metadata.event_type", + project_id="test", + customer_id="test", + ) + + assert "error" in result + assert "Chronicle API unavailable" in result["error"] + assert result["columns"] == [] + assert result["rows"] == [] + assert result["total_rows"] == 0 From aababf5e387eda03533e2a589dc53675d7c0abc8 Mon Sep 17 00:00:00 2001 From: Jack Langley Date: Thu, 2 Apr 2026 12:46:12 -0500 Subject: [PATCH 2/2] docs(secops): correct get_stats docstring to YARA-L 2.0 query syntax MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrites the get_stats tool docstring to reflect the actual Chronicle YARA-L 2.0 query structure (filtering → match → outcome → order → limit) and the full set of supported aggregate functions (array, array_distinct, avg, count, count_distinct, earliest, latest, max, min, stddev, sum). Replaces the incorrect pipe-style examples with correct YARA-L 2.0 syntax. Also ignores the local stats_doc.md reference file via .gitignore. --- .gitignore | 3 + server/secops/secops_mcp/tools/stats.py | 122 ++++++++++++++++-------- 2 files changed, 83 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index a614800b..86edf58e 100644 --- a/.gitignore +++ b/.gitignore @@ -203,6 +203,9 @@ app_data.db # Don't commit the evaluation data *.evalset.json + +# Local reference docs (not for repo) +stats_doc.md .adk/ # Gemini diff --git a/server/secops/secops_mcp/tools/stats.py b/server/secops/secops_mcp/tools/stats.py index 8381ec26..59abbb25 100644 --- a/server/secops/secops_mcp/tools/stats.py +++ b/server/secops/secops_mcp/tools/stats.py @@ -36,61 +36,99 @@ async def get_stats( customer_id: str = None, region: str = None, ) -> Dict[str, Any]: - """Run a stats/aggregation query against Chronicle UDM events. + """Run a YARA-L 2.0 stats/aggregation query against Chronicle UDM events. - Executes a Chronicle UDM query containing a `| stats` pipeline operator and - returns aggregated results as structured columns and rows. Stats queries are + Executes a Chronicle UDM query using YARA-L 2.0 match and outcome sections, + returning aggregated results as structured columns and rows. Stats queries are significantly faster than full event fetches because they return summary data only — ideal for long-tail analysis, frequency counts, top-N rankings, and time-bucketed roll-ups. + Note: Statistical query results are available two hours after ingestion. + **When to use get_stats vs search_udm:** - Use `get_stats` when you need counts, aggregations, or summaries (e.g., top source IPs, event frequency by user, distinct process names per host). - Use `search_udm` when you need the raw event details themselves. - **Stats Query Syntax:** - Stats queries use the standard UDM filter syntax followed by a `| stats` - pipeline. The stats operator supports: - - `count()` — total number of matching events - - `count_distinct(field)` — unique values of a field - - `min(field)`, `max(field)`, `sum(field)`, `avg(field)` — numeric aggregates - - `array_distinct(field)` — collect unique values into a list - - `by field1, field2` — group results by one or more fields - - `| sort column desc` — sort results - - `| head N` — limit to top N rows + **YARA-L 2.0 Query Structure:** + A stats query is composed of the following sections in order: + + 1. **Filtering statement** (required): UDM conditions to filter events. + 2. **match** (optional): Fields to group by, with optional time granularity. + Syntax: ``match:\\n field1, field2 [by|over every] [N] [time_unit]`` + 3. **outcome** (required): Aggregate expressions assigned to ``$variables``. + Syntax: ``outcome:\\n $alias = function(field)`` + 4. **order** (optional): Sort direction — ``asc`` (default) or ``desc``. + Syntax: ``order:\\n $alias desc`` + 5. **limit** (optional): Maximum rows returned. + Syntax: ``limit:\\n 20`` + + **Supported Aggregate Functions:** + - ``array(field)`` — all values as a list (truncated to 25 random elements) + - ``array_distinct(field)`` — distinct values as a list (max 25 elements) + - ``avg(numericField)`` — average, ignoring NULLs + - ``count(field)`` — number of rows in the group + - ``count_distinct(field)`` — number of distinct values in the group + - ``earliest(timestamp)`` — earliest timestamp with microsecond resolution + - ``latest(timestamp)`` — latest timestamp with microsecond resolution + - ``max(numericField)`` — maximum value in the group + - ``min(numericField)`` — minimum value in the group + - ``stddev(numericField)`` — standard deviation + - ``sum(numericField)`` — sum, ignoring NULLs + + **Time Granularity (match section):** + Group by time using ``by`` or ``over every`` with: ``minute``/``m``, + ``hour``/``h``, ``day``/``d``, ``week``/``w``, ``month``/``mo``. + Both keywords are functionally equivalent. **Example Queries:** ``` - # Top 10 source IPs by login failure count - metadata.event_type = "USER_LOGIN" AND security_result.action = "BLOCK" - | stats count() as failures by principal.ip - | sort failures desc - | head 10 - - # Unique users per hostname over the window - | stats count_distinct(principal.user.userid) as unique_users by target.hostname - - # Process execution frequency (LOLBin hunting) - metadata.event_type = "PROCESS_LAUNCH" - | stats count() as launches, array_distinct(principal.hostname) as hosts - by target.process.file.full_path - | sort launches desc - - # Event volume bucketed by type - | stats count() as total by metadata.event_type | sort total desc + # Count of successful user logins grouped by date + metadata.event_type = "USER_LOGIN" + $security_result = security_result.action + $security_result = "ALLOW" + $date = timestamp.get_date(metadata.event_timestamp.seconds, "America/Los_Angeles") + match: + $security_result, $date + outcome: + $event_count = count(metadata.id) + + # Top 20 IPs by unique user count (OKTA logs) + metadata.log_type = "OKTA" + match: + principal.ip + outcome: + $user_count_by_ip = count(principal.user.userid) + order: + $user_count_by_ip desc + limit: + 20 + + # Event volume per hostname bucketed by day + $hostname = target.hostname + match: + $hostname over every day + outcome: + $events_count = count($hostname) + + # Total bytes sent per source IP + target.ip != "" + match: + principal.ip + outcome: + $sent_bytes = sum(network.sent_bytes) ``` **Workflow Integration:** - Use to triage alerts by frequency before deep-diving into raw events. - - Identify rare/anomalous values (long-tail) — low-count rows in `count() by field`. + - Identify rare/anomalous values (long-tail) — low-count rows in ``count()``. - Scope the blast radius of an incident (how many hosts? how many users?). - - Feed results into `search_udm` with specific values to get the full event context. + - Feed results into ``search_udm`` with specific field values for full event context. Args: - query (str): UDM query with a `| stats` operator. Filter predicates before - the pipe are optional — an empty filter returns stats over all - events in the time window. + query (str): YARA-L 2.0 query with a filtering statement and outcome section. + The match section is optional; outcome is required for aggregation. hours_back (int): Hours to look back from now when start_time is not given. Defaults to 24. start_time (Optional[str]): Start of time range in ISO 8601 format @@ -110,7 +148,7 @@ async def get_stats( Returns: Dict[str, Any]: A dictionary with: - - "columns" (List[str]): Ordered list of column names from the stats query. + - "columns" (List[str]): Ordered list of column names from the outcome section. - "rows" (List[Dict]): Each row is a dict mapping column name to its value. Values are typed: int, float, str, datetime, list, or None. - "total_rows" (int): Number of rows returned. @@ -118,20 +156,20 @@ async def get_stats( Example Output: { - "columns": ["principal.ip", "failures"], + "columns": ["principal.ip", "$user_count_by_ip"], "rows": [ - {"principal.ip": "10.1.2.3", "failures": 412}, - {"principal.ip": "192.168.0.55", "failures": 87} + {"principal.ip": "10.1.2.3", "$user_count_by_ip": 412}, + {"principal.ip": "192.168.0.55", "$user_count_by_ip": 87} ], "total_rows": 2 } Next Steps (using MCP-enabled tools): - - Pivot on high-count rows by passing specific values to `search_udm` for + - Pivot on high-count rows by passing specific values to ``search_udm`` for full event context. - - Use `lookup_entity` on suspicious IPs or hostnames surfaced by stats. + - Use ``lookup_entity`` on suspicious IPs or hostnames surfaced by stats. - Build detection rules targeting aggregation patterns identified here. - - Export raw matching events with `export_udm_search_csv` for the top offenders. + - Export raw matching events with ``export_udm_search_csv`` for top offenders. """ try: try: