From 543ceed91f975ea502ed993bf51047d2cad662f2 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Fri, 10 Apr 2026 21:55:57 +0530 Subject: [PATCH 1/2] feat(ci): add dependency-DAG-based test scoping for Rust CI --- .github/actions/rust/pre-merge/action.yml | 83 +++- .github/config/components.yml | 21 + scripts/ci/affected-crates.py | 499 ++++++++++++++++++++++ 3 files changed, 599 insertions(+), 4 deletions(-) create mode 100755 scripts/ci/affected-crates.py diff --git a/.github/actions/rust/pre-merge/action.yml b/.github/actions/rust/pre-merge/action.yml index 0daa91b5da..26da79646f 100644 --- a/.github/actions/rust/pre-merge/action.yml +++ b/.github/actions/rust/pre-merge/action.yml @@ -59,6 +59,42 @@ runs: esac shell: bash + # DAG-based test scoping: compute affected crates from cargo metadata + git diff + # to avoid running the full test suite when only a subset of crates changed. + # Safety: cargo check/clippy run on the full workspace separately, catching all + # compilation errors. This only scopes test BUILD and EXECUTION. + - name: Fetch base branch for DAG analysis + if: startsWith(inputs.task, 'test-') + run: git fetch origin master --depth=1 2>/dev/null || true + shell: bash + + - name: Compute affected crates (DAG-based) + if: startsWith(inputs.task, 'test-') + id: affected + run: | + NEXTEST_FILTER="" + PACKAGES="" + + if python3 scripts/ci/affected-crates.py --base-ref origin/master --format nextest-filter > /tmp/nextest-filter.txt 2>/tmp/affected-stderr.txt; then + NEXTEST_FILTER=$(cat /tmp/nextest-filter.txt) + fi + if python3 scripts/ci/affected-crates.py --base-ref origin/master --format packages > /tmp/packages.txt 2>/tmp/affected-stderr.txt; then + PACKAGES=$(cat /tmp/packages.txt) + fi + + if [[ -n "$NEXTEST_FILTER" ]]; then + CRATE_COUNT=$(echo "$NEXTEST_FILTER" | grep -o 'package(' | wc -l) + echo "::notice::DAG analysis: ${CRATE_COUNT} affected crates (of 43 total)" + echo "scoped=true" >> $GITHUB_OUTPUT + else + STDERR=$(cat /tmp/affected-stderr.txt 2>/dev/null || echo "") + echo "::warning::Could not compute affected crates, running full test suite. ${STDERR}" + echo "scoped=false" >> $GITHUB_OUTPUT + # Clear filter files so test step runs everything + rm -f /tmp/nextest-filter.txt /tmp/packages.txt + fi + shell: bash + # Individual lint tasks for parallel execution - name: Cargo check if: inputs.task == 'check' @@ -117,16 +153,41 @@ runs: echo "::notice::Running test partition ${PARTITION_INDEX}/2" fi + # Read DAG-based affected crate filter (computed in earlier step) + NEXTEST_FILTER="" + PACKAGE_FLAGS="" + if [[ -f /tmp/nextest-filter.txt ]]; then + NEXTEST_FILTER=$(cat /tmp/nextest-filter.txt) + fi + if [[ -f /tmp/packages.txt ]]; then + PACKAGE_FLAGS=$(cat /tmp/packages.txt) + fi + + if [[ -n "$PACKAGE_FLAGS" ]]; then + CRATE_COUNT=$(echo "$NEXTEST_FILTER" | grep -o 'package(' | wc -l) + echo "::notice::DAG-scoped build: ${CRATE_COUNT} crates (cargo check/clippy cover full workspace separately)" + else + echo "::notice::Full workspace build (no DAG filter available)" + fi + source <(cargo llvm-cov show-env --export-prefix) bins_start=$(date +%s) - cargo build --locked + if [[ -n "$PACKAGE_FLAGS" ]]; then + cargo build --locked $PACKAGE_FLAGS + else + cargo build --locked + fi bins_end=$(date +%s) bins_duration=$((bins_end - bins_start)) echo "::notice::Binaries and libraries built in ${bins_duration}s ($(date -ud @${bins_duration} +'%M:%S'))" compile_start=$(date +%s) - cargo test --locked --no-run + if [[ -n "$PACKAGE_FLAGS" ]]; then + cargo test --locked --no-run $PACKAGE_FLAGS + else + cargo test --locked --no-run + fi compile_end=$(date +%s) compile_duration=$((compile_end - compile_start)) echo "::notice::Tests compiled in ${compile_duration}s ($(date -ud @${compile_duration} +'%M:%S'))" @@ -144,12 +205,20 @@ runs: test_start=$(date +%s) if command -v cargo-nextest &> /dev/null; then - cargo nextest run --locked --no-fail-fast --profile ci $PARTITION_FLAG + if [[ -n "$NEXTEST_FILTER" ]]; then + cargo nextest run --locked --no-fail-fast --profile ci $PARTITION_FLAG -E "$NEXTEST_FILTER" + else + cargo nextest run --locked --no-fail-fast --profile ci $PARTITION_FLAG + fi else if [[ -n "$PARTITION_FLAG" ]]; then echo "::error::cargo-nextest not found, falling back to cargo test without partitioning (all tests will run on every partition)" fi - cargo test --locked --no-fail-fast + if [[ -n "$PACKAGE_FLAGS" ]]; then + cargo test --locked --no-fail-fast $PACKAGE_FLAGS + else + cargo test --locked --no-fail-fast + fi fi test_end=$(date +%s) test_duration=$((test_end - test_start)) @@ -159,6 +228,12 @@ runs: total_duration=$((build_duration + test_duration)) echo "" echo "=========================================" + if [[ -n "$PACKAGE_FLAGS" ]]; then + CRATE_COUNT=$(echo "$NEXTEST_FILTER" | grep -o 'package(' | wc -l) + echo "DAG scope: ${CRATE_COUNT}/43 crates" + else + echo "DAG scope: full workspace (43 crates)" + fi echo "All targets build: ${bins_duration}s ($(date -ud @${bins_duration} +'%M:%S'))" echo "Tests compile: ${compile_duration}s ($(date -ud @${compile_duration} +'%M:%S'))" echo "Tests execute: ${test_duration}s ($(date -ud @${test_duration} +'%M:%S'))" diff --git a/.github/config/components.yml b/.github/config/components.yml index 7585a91b9b..a898f30dbb 100644 --- a/.github/config/components.yml +++ b/.github/config/components.yml @@ -51,9 +51,17 @@ components: paths: - "core/common/**" + # Leaf crate: zero-copy I/O buffer, depended on by binary_protocol and cluster + rust-iobuf: + depends_on: + - "rust-workspace" + paths: + - "core/iobuf/**" + rust-binary-protocol: depends_on: - "rust-workspace" # Protocol is affected by workspace changes + - "rust-iobuf" # binary_protocol depends on iobuf - "ci-infrastructure" # CI changes trigger full regression paths: - "core/binary_protocol/**" @@ -67,10 +75,13 @@ components: - "rust-cluster" paths: - "core/server/**" + - "core/server-ng/**" rust-cluster: depends_on: - "rust-workspace" + - "rust-iobuf" # cluster crates depend on iobuf + - "rust-binary-protocol" # cluster crates depend on binary_protocol paths: - "core/clock/**" - "core/consensus/**" @@ -79,18 +90,28 @@ components: - "core/metadata/**" - "core/message_bus/**" - "core/partitions/**" + + # Standalone simulation tool, does NOT affect server binary or foreign SDKs. + # Split from rust-cluster to avoid triggering SDK tests on simulator-only changes. + rust-simulator: + depends_on: + - "rust-workspace" + - "rust-cluster" + paths: - "core/simulator/**" # Main Rust workspace testing rust: depends_on: - "rust-workspace" + - "rust-iobuf" - "rust-configs" - "rust-sdk" - "rust-common" - "rust-binary-protocol" - "rust-server" - "rust-cluster" + - "rust-simulator" - "rust-tools" - "rust-cli" - "rust-bench" diff --git a/scripts/ci/affected-crates.py b/scripts/ci/affected-crates.py new file mode 100755 index 0000000000..ae65cc3f5d --- /dev/null +++ b/scripts/ci/affected-crates.py @@ -0,0 +1,499 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Dependency-DAG-based affected crate analyzer for CI optimization. + +Uses `cargo metadata` to build the workspace dependency graph, then +computes which crates are affected by a set of changed files (from +git diff). Outputs affected crates grouped by test tier: + + Tier 0 (leaf): No internal deps -> unit tests only + Tier 1 (mid): Has internal deps but few dependents -> unit + doc tests + Tier 2 (core): Many dependents (>=3) -> full test suite + Tier 3 (integration): The `integration` and `simulator` crates -> integration tests + +Usage: + # Affected crates for current PR (vs origin/master) + python3 scripts/ci/affected-crates.py + + # Affected crates for specific base ref + python3 scripts/ci/affected-crates.py --base-ref origin/master + + # Output as JSON (for CI consumption) + python3 scripts/ci/affected-crates.py --format json + + # Show full DAG without change filtering + python3 scripts/ci/affected-crates.py --all + + # Output nextest filter expression + python3 scripts/ci/affected-crates.py --format nextest-filter +""" + +import argparse +import json +import os +import subprocess +import sys +from collections import defaultdict +from pathlib import Path + + +def run(cmd, **kwargs): + """Run a command and return stdout, or exit on failure.""" + result = subprocess.run(cmd, capture_output=True, text=True, **kwargs) + if result.returncode != 0: + print(f"Error running: {' '.join(cmd)}", file=sys.stderr) + print(result.stderr, file=sys.stderr) + sys.exit(1) + return result.stdout.strip() + + +def load_cargo_metadata(): + """Load and parse cargo metadata.""" + raw = run(["cargo", "metadata", "--format-version", "1", "--no-deps"]) + meta_no_deps = json.loads(raw) + + raw_full = run(["cargo", "metadata", "--format-version", "1"]) + meta_full = json.loads(raw_full) + + return meta_no_deps, meta_full + + +def build_workspace_graph(meta_no_deps, meta_full): + """ + Build the internal workspace dependency graph. + + Returns: + workspace: dict of pkg_id -> package info + deps: dict of pkg_id -> set of internal dependency pkg_ids + rev_deps: dict of pkg_id -> set of internal reverse dependency pkg_ids + name_to_id: dict of crate_name -> pkg_id + id_to_name: dict of pkg_id -> crate_name + crate_paths: dict of crate_name -> manifest directory (relative to workspace root) + """ + workspace_root = Path(meta_no_deps["workspace_root"]) + workspace_member_ids = set(meta_no_deps["workspace_members"]) + + # Map package IDs to names and paths + id_to_name = {} + name_to_id = {} + crate_paths = {} # crate_name -> relative path from workspace root + + for pkg in meta_no_deps["packages"]: + if pkg["id"] in workspace_member_ids: + id_to_name[pkg["id"]] = pkg["name"] + name_to_id[pkg["name"]] = pkg["id"] + manifest_dir = Path(pkg["manifest_path"]).parent + rel_path = manifest_dir.relative_to(workspace_root) + crate_paths[pkg["name"]] = str(rel_path) + + # Build dependency graph from resolve section + deps = defaultdict(set) + rev_deps = defaultdict(set) + + for node in meta_full["resolve"]["nodes"]: + crate_id = node["id"] + if crate_id not in workspace_member_ids: + continue + for dep in node["deps"]: + dep_id = dep["pkg"] + if dep_id in workspace_member_ids: + deps[crate_id].add(dep_id) + rev_deps[dep_id].add(crate_id) + + return workspace_member_ids, deps, rev_deps, name_to_id, id_to_name, crate_paths + + +def get_changed_files(base_ref): + """Get list of changed files relative to base_ref.""" + # Try merge-base first for accurate PR diff + try: + merge_base = run(["git", "merge-base", base_ref, "HEAD"]) + files = run(["git", "diff", "--name-only", merge_base]) + except SystemExit: + # Fallback to direct diff + files = run(["git", "diff", "--name-only", base_ref]) + + if not files: + return [] + return [f for f in files.split("\n") if f] + + +def map_files_to_crates(changed_files, crate_paths): + """ + Map changed files to the crates they belong to. + + Returns: + dict of crate_name -> list of changed files in that crate + """ + affected = defaultdict(list) + + # Sort crate paths longest-first for most-specific match + sorted_crates = sorted(crate_paths.items(), key=lambda x: len(x[1]), reverse=True) + + for filepath in changed_files: + for crate_name, crate_dir in sorted_crates: + if filepath.startswith(crate_dir + "/") or filepath == crate_dir: + affected[crate_name].append(filepath) + break + + return affected + + +def compute_affected_set(directly_changed, rev_deps, name_to_id, id_to_name): + """ + Compute the full affected set: directly changed crates + all transitive + reverse dependencies (crates that depend on changed crates). + + Returns: + dict of crate_name -> { "reason": "direct"|"transitive", "via": crate_name|None } + """ + affected = {} + + # Add directly changed crates + for name in directly_changed: + affected[name] = {"reason": "direct", "via": None} + + # BFS through reverse dependencies + queue = list(directly_changed) + while queue: + current_name = queue.pop(0) + current_id = name_to_id.get(current_name) + if not current_id: + continue + + for rev_dep_id in rev_deps.get(current_id, set()): + rev_dep_name = id_to_name[rev_dep_id] + if rev_dep_name not in affected: + affected[rev_dep_name] = {"reason": "transitive", "via": current_name} + queue.append(rev_dep_name) + + return affected + + +def compute_layers(workspace_ids, deps, id_to_name): + """Compute topological layers of the dependency DAG.""" + assigned = {} + remaining = set(workspace_ids) + layer = 0 + + while remaining: + current_layer = [] + for crate_id in remaining: + if all(d in assigned for d in deps.get(crate_id, set())): + current_layer.append(crate_id) + if not current_layer: + # Cycle - assign remaining to current layer + current_layer = list(remaining) + for crate_id in current_layer: + assigned[crate_id] = layer + remaining.discard(crate_id) + layer += 1 + + # Convert to name-based + layers = defaultdict(list) + for crate_id, l in assigned.items(): + layers[l].append(id_to_name[crate_id]) + return dict(layers) + + +def classify_tiers(workspace_ids, deps, rev_deps, id_to_name): + """ + Classify crates into test tiers based on their position in the DAG. + + Tier 0 (leaf): No internal deps + Tier 1 (mid): Has internal deps, <3 reverse deps + Tier 2 (core): >=3 reverse deps (high-impact crates) + Tier 3 (integration): integration/simulator crates (end-to-end tests) + """ + INTEGRATION_CRATES = {"integration", "simulator"} + CORE_THRESHOLD = 3 # number of reverse deps to qualify as "core" + + tiers = {} + for crate_id in workspace_ids: + name = id_to_name[crate_id] + internal_deps = deps.get(crate_id, set()) + reverse_dep_count = len(rev_deps.get(crate_id, set())) + + if name in INTEGRATION_CRATES: + tiers[name] = 3 + elif not internal_deps: + tiers[name] = 0 + elif reverse_dep_count >= CORE_THRESHOLD: + tiers[name] = 2 + else: + tiers[name] = 1 + + return tiers + + +def format_human(affected, tiers, layers, rev_deps, id_to_name, name_to_id, crate_paths, show_all): + """Format output for human reading.""" + tier_names = {0: "leaf", 1: "mid", 2: "core", 3: "integration"} + tier_descriptions = { + 0: "No internal deps - unit tests only", + 1: "Has internal deps - unit + doc tests", + 2: "Many dependents - full test suite", + 3: "End-to-end integration tests", + } + + lines = [] + + # DAG layers + lines.append("=== Dependency DAG Layers ===") + for layer_num in sorted(layers): + crates = sorted(layers[layer_num]) + tier_info = ", ".join(f"{c} (T{tiers.get(c, '?')})" for c in crates) + lines.append(f" Layer {layer_num}: {tier_info}") + + lines.append("") + lines.append("=== Tier Classification ===") + for tier_num in sorted(tier_descriptions): + crates = sorted(n for n, t in tiers.items() if t == tier_num) + lines.append(f" Tier {tier_num} ({tier_names[tier_num]}): {tier_descriptions[tier_num]}") + for c in crates: + rev_count = len(rev_deps.get(name_to_id.get(c, ""), set())) + lines.append(f" - {c} ({rev_count} dependents)") + + if not show_all: + lines.append("") + lines.append("=== Affected Crates ===") + if not affected: + lines.append(" No Rust crates affected by the current changes.") + else: + by_tier = defaultdict(list) + for name, info in affected.items(): + by_tier[tiers.get(name, 1)].append((name, info)) + + for tier_num in sorted(by_tier): + lines.append(f" Tier {tier_num} ({tier_names[tier_num]}):") + for name, info in sorted(by_tier[tier_num]): + reason = info["reason"] + if reason == "direct": + lines.append(f" - {name} (directly changed)") + else: + lines.append(f" - {name} (transitive, via {info['via']})") + + lines.append("") + lines.append("=== Recommended CI Strategy ===") + affected_tiers = set(tiers.get(n, 1) for n in affected) + max_tier = max(affected_tiers) + + if max_tier == 0: + lines.append(" Only leaf crates changed -> run unit tests for affected crates only") + lines.append(" Skip: integration tests, full workspace build") + elif max_tier == 1: + lines.append(" Mid-tier crates changed -> run unit + doc tests for affected crates") + lines.append(" Skip: full integration suite") + elif max_tier == 2: + lines.append(" Core crates changed -> run full test suite for affected crates") + lines.append(" Include: integration tests for affected dependency chains") + else: + lines.append(" Integration crates changed -> run full integration suite") + + # Nextest filter + affected_names = sorted(affected.keys()) + if affected_names: + filter_expr = " | ".join(f"package({n})" for n in affected_names) + lines.append("") + lines.append(f" nextest filter: {filter_expr}") + + return "\n".join(lines) + + +def format_json(affected, tiers, layers, rev_deps, id_to_name, name_to_id, show_all): + """Format output as JSON for CI consumption.""" + tier_names = {0: "leaf", 1: "mid", 2: "core", 3: "integration"} + + if show_all: + crate_list = {name: tiers.get(name, 1) for name in sorted(tiers)} + else: + crate_list = {name: tiers.get(name, 1) for name in sorted(affected)} + + result = { + "affected_crates": [], + "tiers": {}, + "max_tier": 0, + "nextest_filter": "", + "nextest_packages": [], + } + + for name in sorted(crate_list): + tier = crate_list[name] + info = affected.get(name, {"reason": "all", "via": None}) if not show_all else {"reason": "all", "via": None} + rev_count = len(rev_deps.get(name_to_id.get(name, ""), set())) + + result["affected_crates"].append({ + "name": name, + "tier": tier, + "tier_name": tier_names.get(tier, "unknown"), + "reason": info["reason"], + "via": info["via"], + "reverse_dep_count": rev_count, + }) + + # Group by tier + for tier_num in sorted(tier_names): + crates = [c["name"] for c in result["affected_crates"] if c["tier"] == tier_num] + result["tiers"][tier_names[tier_num]] = crates + + if result["affected_crates"]: + result["max_tier"] = max(c["tier"] for c in result["affected_crates"]) + + affected_names = [c["name"] for c in result["affected_crates"]] + result["nextest_packages"] = affected_names + result["nextest_filter"] = " | ".join(f"package({n})" for n in affected_names) + + return json.dumps(result, indent=2) + + +def format_nextest_filter(affected, show_all, tiers): + """Output just the nextest filter expression.""" + if show_all: + names = sorted(tiers.keys()) + else: + names = sorted(affected.keys()) + if not names: + return "" + return " | ".join(f"package({n})" for n in names) + + +def format_packages(affected, show_all, tiers): + """Output -p flags for cargo test / cargo nextest.""" + if show_all: + names = sorted(tiers.keys()) + else: + names = sorted(affected.keys()) + if not names: + return "" + return " ".join(f"-p {n}" for n in names) + + +def main(): + parser = argparse.ArgumentParser( + description="Compute affected crates from the workspace dependency DAG" + ) + parser.add_argument( + "--base-ref", + default=os.environ.get("BASE_REF", "origin/master"), + help="Git ref to diff against (default: origin/master or $BASE_REF)", + ) + parser.add_argument( + "--format", + choices=["human", "json", "nextest-filter", "packages"], + default="human", + help="Output format", + ) + parser.add_argument( + "--all", + action="store_true", + help="Show full DAG without change filtering", + ) + parser.add_argument( + "--changed-files", + help="Comma-separated list of changed files (overrides git diff)", + ) + args = parser.parse_args() + + # Load cargo metadata + meta_no_deps, meta_full = load_cargo_metadata() + + # Build workspace graph + workspace_ids, deps, rev_deps, name_to_id, id_to_name, crate_paths = \ + build_workspace_graph(meta_no_deps, meta_full) + + # Classify tiers + tiers = classify_tiers(workspace_ids, deps, rev_deps, id_to_name) + + # Compute layers + layers = compute_layers(workspace_ids, deps, id_to_name) + + if args.all: + affected = {id_to_name[cid]: {"reason": "all", "via": None} for cid in workspace_ids} + else: + # Get changed files + if args.changed_files: + changed_files = [f.strip() for f in args.changed_files.split(",") if f.strip()] + else: + changed_files = get_changed_files(args.base_ref) + + if not changed_files: + if args.format == "json": + print(json.dumps({"affected_crates": [], "tiers": {}, "max_tier": -1, + "nextest_filter": "", "nextest_packages": []})) + elif args.format in ("nextest-filter", "packages"): + pass # empty output + else: + print("No changed files detected.") + return + + # Map files to crates + directly_changed = map_files_to_crates(changed_files, crate_paths) + + # Workspace-level files (Cargo.toml, Cargo.lock, etc.) affect everything + workspace_files = [] + for f in changed_files: + matched = False + for crate_dir in crate_paths.values(): + if f.startswith(crate_dir + "/"): + matched = True + break + if not matched and ( + f.endswith("Cargo.toml") + or f.endswith("Cargo.lock") + or f.endswith("rust-toolchain.toml") + or f.startswith(".cargo/") + ): + workspace_files.append(f) + + if workspace_files: + # Workspace-level change: all crates affected + if args.format not in ("nextest-filter", "packages"): + print( + f"Workspace-level files changed ({', '.join(workspace_files)}), " + "all crates affected.", + file=sys.stderr, + ) + affected = {id_to_name[cid]: {"reason": "workspace", "via": None} + for cid in workspace_ids} + else: + # Compute affected set with transitive reverse deps + affected = compute_affected_set( + directly_changed.keys(), rev_deps, name_to_id, id_to_name + ) + + # Format output + if args.format == "human": + print(format_human(affected, tiers, layers, rev_deps, id_to_name, + name_to_id, crate_paths, args.all)) + elif args.format == "json": + print(format_json(affected, tiers, layers, rev_deps, id_to_name, + name_to_id, args.all)) + elif args.format == "nextest-filter": + result = format_nextest_filter(affected, args.all, tiers) + if result: + print(result) + elif args.format == "packages": + result = format_packages(affected, args.all, tiers) + if result: + print(result) + + +if __name__ == "__main__": + main() From a1a740766d388b78900134a2d1f725e7e5b87a14 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Mon, 13 Apr 2026 14:18:18 +0530 Subject: [PATCH 2/2] refactor(ci): replace custom affected-crates.py with cargo-rail - address review comments --- .config/rail.toml | 28 ++ .github/actions/rust/pre-merge/action.yml | 56 ++- scripts/ci/affected-crates.py | 499 ---------------------- 3 files changed, 62 insertions(+), 521 deletions(-) create mode 100644 .config/rail.toml delete mode 100755 scripts/ci/affected-crates.py diff --git a/.config/rail.toml b/.config/rail.toml new file mode 100644 index 0000000000..14114221da --- /dev/null +++ b/.config/rail.toml @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cargo-rail change detection configuration +# See: https://github.com/loadingalias/cargo-rail + +[change-detection] +# Changes to these paths trigger a full workspace rebuild/retest +infrastructure = [ + "Cargo.lock", + "rust-toolchain.toml", + ".cargo/**", + ".github/**", +] diff --git a/.github/actions/rust/pre-merge/action.yml b/.github/actions/rust/pre-merge/action.yml index 26da79646f..742d489f9d 100644 --- a/.github/actions/rust/pre-merge/action.yml +++ b/.github/actions/rust/pre-merge/action.yml @@ -59,8 +59,9 @@ runs: esac shell: bash - # DAG-based test scoping: compute affected crates from cargo metadata + git diff - # to avoid running the full test suite when only a subset of crates changed. + # DAG-based test scoping: use cargo-rail to compute affected crates from the + # workspace dependency graph + git diff, avoiding the full test suite when only + # a subset of crates changed. # Safety: cargo check/clippy run on the full workspace separately, catching all # compilation errors. This only scopes test BUILD and EXECUTION. - name: Fetch base branch for DAG analysis @@ -68,29 +69,36 @@ runs: run: git fetch origin master --depth=1 2>/dev/null || true shell: bash - - name: Compute affected crates (DAG-based) + - name: Install cargo-rail if: startsWith(inputs.task, 'test-') - id: affected - run: | - NEXTEST_FILTER="" - PACKAGES="" - - if python3 scripts/ci/affected-crates.py --base-ref origin/master --format nextest-filter > /tmp/nextest-filter.txt 2>/tmp/affected-stderr.txt; then - NEXTEST_FILTER=$(cat /tmp/nextest-filter.txt) - fi - if python3 scripts/ci/affected-crates.py --base-ref origin/master --format packages > /tmp/packages.txt 2>/tmp/affected-stderr.txt; then - PACKAGES=$(cat /tmp/packages.txt) - fi + uses: taiki-e/install-action@v2 + with: + tool: cargo-rail - if [[ -n "$NEXTEST_FILTER" ]]; then - CRATE_COUNT=$(echo "$NEXTEST_FILTER" | grep -o 'package(' | wc -l) - echo "::notice::DAG analysis: ${CRATE_COUNT} affected crates (of 43 total)" - echo "scoped=true" >> $GITHUB_OUTPUT + - name: Compute affected crates (cargo-rail) + if: startsWith(inputs.task, 'test-') + run: | + TOTAL_CRATES=$(cargo metadata --format-version 1 --no-deps 2>/dev/null | jq '.workspace_members | length' 2>/dev/null || echo "?") + echo "$TOTAL_CRATES" > /tmp/total-crates.txt + + PLAN_JSON=$(cargo rail plan --since origin/master -f json 2>/tmp/affected-stderr.txt || echo "") + + if [[ -n "$PLAN_JSON" ]]; then + MODE=$(echo "$PLAN_JSON" | jq -r '.scope.mode') + if [[ "$MODE" == "crates" ]]; then + CRATES=$(echo "$PLAN_JSON" | jq -r '.scope.crates[]') + CRATE_COUNT=$(echo "$CRATES" | wc -l) + # Build -p flags for cargo build/test + echo "$CRATES" | sed 's/^/-p /' | tr '\n' ' ' > /tmp/packages.txt + # Build nextest filter expression for cargo nextest run + echo "$CRATES" | sed 's/^/package(/; s/$/)/' | paste -sd ' | ' > /tmp/nextest-filter.txt + echo "::notice::DAG analysis: ${CRATE_COUNT} affected crates (of ${TOTAL_CRATES} total)" + else + echo "::notice::Full workspace affected (${TOTAL_CRATES} crates)" + fi else STDERR=$(cat /tmp/affected-stderr.txt 2>/dev/null || echo "") echo "::warning::Could not compute affected crates, running full test suite. ${STDERR}" - echo "scoped=false" >> $GITHUB_OUTPUT - # Clear filter files so test step runs everything rm -f /tmp/nextest-filter.txt /tmp/packages.txt fi shell: bash @@ -156,12 +164,16 @@ runs: # Read DAG-based affected crate filter (computed in earlier step) NEXTEST_FILTER="" PACKAGE_FLAGS="" + TOTAL_CRATES="?" if [[ -f /tmp/nextest-filter.txt ]]; then NEXTEST_FILTER=$(cat /tmp/nextest-filter.txt) fi if [[ -f /tmp/packages.txt ]]; then PACKAGE_FLAGS=$(cat /tmp/packages.txt) fi + if [[ -f /tmp/total-crates.txt ]]; then + TOTAL_CRATES=$(cat /tmp/total-crates.txt) + fi if [[ -n "$PACKAGE_FLAGS" ]]; then CRATE_COUNT=$(echo "$NEXTEST_FILTER" | grep -o 'package(' | wc -l) @@ -230,9 +242,9 @@ runs: echo "=========================================" if [[ -n "$PACKAGE_FLAGS" ]]; then CRATE_COUNT=$(echo "$NEXTEST_FILTER" | grep -o 'package(' | wc -l) - echo "DAG scope: ${CRATE_COUNT}/43 crates" + echo "DAG scope: ${CRATE_COUNT}/${TOTAL_CRATES} crates" else - echo "DAG scope: full workspace (43 crates)" + echo "DAG scope: full workspace (${TOTAL_CRATES} crates)" fi echo "All targets build: ${bins_duration}s ($(date -ud @${bins_duration} +'%M:%S'))" echo "Tests compile: ${compile_duration}s ($(date -ud @${compile_duration} +'%M:%S'))" diff --git a/scripts/ci/affected-crates.py b/scripts/ci/affected-crates.py deleted file mode 100755 index ae65cc3f5d..0000000000 --- a/scripts/ci/affected-crates.py +++ /dev/null @@ -1,499 +0,0 @@ -#!/usr/bin/env python3 -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Dependency-DAG-based affected crate analyzer for CI optimization. - -Uses `cargo metadata` to build the workspace dependency graph, then -computes which crates are affected by a set of changed files (from -git diff). Outputs affected crates grouped by test tier: - - Tier 0 (leaf): No internal deps -> unit tests only - Tier 1 (mid): Has internal deps but few dependents -> unit + doc tests - Tier 2 (core): Many dependents (>=3) -> full test suite - Tier 3 (integration): The `integration` and `simulator` crates -> integration tests - -Usage: - # Affected crates for current PR (vs origin/master) - python3 scripts/ci/affected-crates.py - - # Affected crates for specific base ref - python3 scripts/ci/affected-crates.py --base-ref origin/master - - # Output as JSON (for CI consumption) - python3 scripts/ci/affected-crates.py --format json - - # Show full DAG without change filtering - python3 scripts/ci/affected-crates.py --all - - # Output nextest filter expression - python3 scripts/ci/affected-crates.py --format nextest-filter -""" - -import argparse -import json -import os -import subprocess -import sys -from collections import defaultdict -from pathlib import Path - - -def run(cmd, **kwargs): - """Run a command and return stdout, or exit on failure.""" - result = subprocess.run(cmd, capture_output=True, text=True, **kwargs) - if result.returncode != 0: - print(f"Error running: {' '.join(cmd)}", file=sys.stderr) - print(result.stderr, file=sys.stderr) - sys.exit(1) - return result.stdout.strip() - - -def load_cargo_metadata(): - """Load and parse cargo metadata.""" - raw = run(["cargo", "metadata", "--format-version", "1", "--no-deps"]) - meta_no_deps = json.loads(raw) - - raw_full = run(["cargo", "metadata", "--format-version", "1"]) - meta_full = json.loads(raw_full) - - return meta_no_deps, meta_full - - -def build_workspace_graph(meta_no_deps, meta_full): - """ - Build the internal workspace dependency graph. - - Returns: - workspace: dict of pkg_id -> package info - deps: dict of pkg_id -> set of internal dependency pkg_ids - rev_deps: dict of pkg_id -> set of internal reverse dependency pkg_ids - name_to_id: dict of crate_name -> pkg_id - id_to_name: dict of pkg_id -> crate_name - crate_paths: dict of crate_name -> manifest directory (relative to workspace root) - """ - workspace_root = Path(meta_no_deps["workspace_root"]) - workspace_member_ids = set(meta_no_deps["workspace_members"]) - - # Map package IDs to names and paths - id_to_name = {} - name_to_id = {} - crate_paths = {} # crate_name -> relative path from workspace root - - for pkg in meta_no_deps["packages"]: - if pkg["id"] in workspace_member_ids: - id_to_name[pkg["id"]] = pkg["name"] - name_to_id[pkg["name"]] = pkg["id"] - manifest_dir = Path(pkg["manifest_path"]).parent - rel_path = manifest_dir.relative_to(workspace_root) - crate_paths[pkg["name"]] = str(rel_path) - - # Build dependency graph from resolve section - deps = defaultdict(set) - rev_deps = defaultdict(set) - - for node in meta_full["resolve"]["nodes"]: - crate_id = node["id"] - if crate_id not in workspace_member_ids: - continue - for dep in node["deps"]: - dep_id = dep["pkg"] - if dep_id in workspace_member_ids: - deps[crate_id].add(dep_id) - rev_deps[dep_id].add(crate_id) - - return workspace_member_ids, deps, rev_deps, name_to_id, id_to_name, crate_paths - - -def get_changed_files(base_ref): - """Get list of changed files relative to base_ref.""" - # Try merge-base first for accurate PR diff - try: - merge_base = run(["git", "merge-base", base_ref, "HEAD"]) - files = run(["git", "diff", "--name-only", merge_base]) - except SystemExit: - # Fallback to direct diff - files = run(["git", "diff", "--name-only", base_ref]) - - if not files: - return [] - return [f for f in files.split("\n") if f] - - -def map_files_to_crates(changed_files, crate_paths): - """ - Map changed files to the crates they belong to. - - Returns: - dict of crate_name -> list of changed files in that crate - """ - affected = defaultdict(list) - - # Sort crate paths longest-first for most-specific match - sorted_crates = sorted(crate_paths.items(), key=lambda x: len(x[1]), reverse=True) - - for filepath in changed_files: - for crate_name, crate_dir in sorted_crates: - if filepath.startswith(crate_dir + "/") or filepath == crate_dir: - affected[crate_name].append(filepath) - break - - return affected - - -def compute_affected_set(directly_changed, rev_deps, name_to_id, id_to_name): - """ - Compute the full affected set: directly changed crates + all transitive - reverse dependencies (crates that depend on changed crates). - - Returns: - dict of crate_name -> { "reason": "direct"|"transitive", "via": crate_name|None } - """ - affected = {} - - # Add directly changed crates - for name in directly_changed: - affected[name] = {"reason": "direct", "via": None} - - # BFS through reverse dependencies - queue = list(directly_changed) - while queue: - current_name = queue.pop(0) - current_id = name_to_id.get(current_name) - if not current_id: - continue - - for rev_dep_id in rev_deps.get(current_id, set()): - rev_dep_name = id_to_name[rev_dep_id] - if rev_dep_name not in affected: - affected[rev_dep_name] = {"reason": "transitive", "via": current_name} - queue.append(rev_dep_name) - - return affected - - -def compute_layers(workspace_ids, deps, id_to_name): - """Compute topological layers of the dependency DAG.""" - assigned = {} - remaining = set(workspace_ids) - layer = 0 - - while remaining: - current_layer = [] - for crate_id in remaining: - if all(d in assigned for d in deps.get(crate_id, set())): - current_layer.append(crate_id) - if not current_layer: - # Cycle - assign remaining to current layer - current_layer = list(remaining) - for crate_id in current_layer: - assigned[crate_id] = layer - remaining.discard(crate_id) - layer += 1 - - # Convert to name-based - layers = defaultdict(list) - for crate_id, l in assigned.items(): - layers[l].append(id_to_name[crate_id]) - return dict(layers) - - -def classify_tiers(workspace_ids, deps, rev_deps, id_to_name): - """ - Classify crates into test tiers based on their position in the DAG. - - Tier 0 (leaf): No internal deps - Tier 1 (mid): Has internal deps, <3 reverse deps - Tier 2 (core): >=3 reverse deps (high-impact crates) - Tier 3 (integration): integration/simulator crates (end-to-end tests) - """ - INTEGRATION_CRATES = {"integration", "simulator"} - CORE_THRESHOLD = 3 # number of reverse deps to qualify as "core" - - tiers = {} - for crate_id in workspace_ids: - name = id_to_name[crate_id] - internal_deps = deps.get(crate_id, set()) - reverse_dep_count = len(rev_deps.get(crate_id, set())) - - if name in INTEGRATION_CRATES: - tiers[name] = 3 - elif not internal_deps: - tiers[name] = 0 - elif reverse_dep_count >= CORE_THRESHOLD: - tiers[name] = 2 - else: - tiers[name] = 1 - - return tiers - - -def format_human(affected, tiers, layers, rev_deps, id_to_name, name_to_id, crate_paths, show_all): - """Format output for human reading.""" - tier_names = {0: "leaf", 1: "mid", 2: "core", 3: "integration"} - tier_descriptions = { - 0: "No internal deps - unit tests only", - 1: "Has internal deps - unit + doc tests", - 2: "Many dependents - full test suite", - 3: "End-to-end integration tests", - } - - lines = [] - - # DAG layers - lines.append("=== Dependency DAG Layers ===") - for layer_num in sorted(layers): - crates = sorted(layers[layer_num]) - tier_info = ", ".join(f"{c} (T{tiers.get(c, '?')})" for c in crates) - lines.append(f" Layer {layer_num}: {tier_info}") - - lines.append("") - lines.append("=== Tier Classification ===") - for tier_num in sorted(tier_descriptions): - crates = sorted(n for n, t in tiers.items() if t == tier_num) - lines.append(f" Tier {tier_num} ({tier_names[tier_num]}): {tier_descriptions[tier_num]}") - for c in crates: - rev_count = len(rev_deps.get(name_to_id.get(c, ""), set())) - lines.append(f" - {c} ({rev_count} dependents)") - - if not show_all: - lines.append("") - lines.append("=== Affected Crates ===") - if not affected: - lines.append(" No Rust crates affected by the current changes.") - else: - by_tier = defaultdict(list) - for name, info in affected.items(): - by_tier[tiers.get(name, 1)].append((name, info)) - - for tier_num in sorted(by_tier): - lines.append(f" Tier {tier_num} ({tier_names[tier_num]}):") - for name, info in sorted(by_tier[tier_num]): - reason = info["reason"] - if reason == "direct": - lines.append(f" - {name} (directly changed)") - else: - lines.append(f" - {name} (transitive, via {info['via']})") - - lines.append("") - lines.append("=== Recommended CI Strategy ===") - affected_tiers = set(tiers.get(n, 1) for n in affected) - max_tier = max(affected_tiers) - - if max_tier == 0: - lines.append(" Only leaf crates changed -> run unit tests for affected crates only") - lines.append(" Skip: integration tests, full workspace build") - elif max_tier == 1: - lines.append(" Mid-tier crates changed -> run unit + doc tests for affected crates") - lines.append(" Skip: full integration suite") - elif max_tier == 2: - lines.append(" Core crates changed -> run full test suite for affected crates") - lines.append(" Include: integration tests for affected dependency chains") - else: - lines.append(" Integration crates changed -> run full integration suite") - - # Nextest filter - affected_names = sorted(affected.keys()) - if affected_names: - filter_expr = " | ".join(f"package({n})" for n in affected_names) - lines.append("") - lines.append(f" nextest filter: {filter_expr}") - - return "\n".join(lines) - - -def format_json(affected, tiers, layers, rev_deps, id_to_name, name_to_id, show_all): - """Format output as JSON for CI consumption.""" - tier_names = {0: "leaf", 1: "mid", 2: "core", 3: "integration"} - - if show_all: - crate_list = {name: tiers.get(name, 1) for name in sorted(tiers)} - else: - crate_list = {name: tiers.get(name, 1) for name in sorted(affected)} - - result = { - "affected_crates": [], - "tiers": {}, - "max_tier": 0, - "nextest_filter": "", - "nextest_packages": [], - } - - for name in sorted(crate_list): - tier = crate_list[name] - info = affected.get(name, {"reason": "all", "via": None}) if not show_all else {"reason": "all", "via": None} - rev_count = len(rev_deps.get(name_to_id.get(name, ""), set())) - - result["affected_crates"].append({ - "name": name, - "tier": tier, - "tier_name": tier_names.get(tier, "unknown"), - "reason": info["reason"], - "via": info["via"], - "reverse_dep_count": rev_count, - }) - - # Group by tier - for tier_num in sorted(tier_names): - crates = [c["name"] for c in result["affected_crates"] if c["tier"] == tier_num] - result["tiers"][tier_names[tier_num]] = crates - - if result["affected_crates"]: - result["max_tier"] = max(c["tier"] for c in result["affected_crates"]) - - affected_names = [c["name"] for c in result["affected_crates"]] - result["nextest_packages"] = affected_names - result["nextest_filter"] = " | ".join(f"package({n})" for n in affected_names) - - return json.dumps(result, indent=2) - - -def format_nextest_filter(affected, show_all, tiers): - """Output just the nextest filter expression.""" - if show_all: - names = sorted(tiers.keys()) - else: - names = sorted(affected.keys()) - if not names: - return "" - return " | ".join(f"package({n})" for n in names) - - -def format_packages(affected, show_all, tiers): - """Output -p flags for cargo test / cargo nextest.""" - if show_all: - names = sorted(tiers.keys()) - else: - names = sorted(affected.keys()) - if not names: - return "" - return " ".join(f"-p {n}" for n in names) - - -def main(): - parser = argparse.ArgumentParser( - description="Compute affected crates from the workspace dependency DAG" - ) - parser.add_argument( - "--base-ref", - default=os.environ.get("BASE_REF", "origin/master"), - help="Git ref to diff against (default: origin/master or $BASE_REF)", - ) - parser.add_argument( - "--format", - choices=["human", "json", "nextest-filter", "packages"], - default="human", - help="Output format", - ) - parser.add_argument( - "--all", - action="store_true", - help="Show full DAG without change filtering", - ) - parser.add_argument( - "--changed-files", - help="Comma-separated list of changed files (overrides git diff)", - ) - args = parser.parse_args() - - # Load cargo metadata - meta_no_deps, meta_full = load_cargo_metadata() - - # Build workspace graph - workspace_ids, deps, rev_deps, name_to_id, id_to_name, crate_paths = \ - build_workspace_graph(meta_no_deps, meta_full) - - # Classify tiers - tiers = classify_tiers(workspace_ids, deps, rev_deps, id_to_name) - - # Compute layers - layers = compute_layers(workspace_ids, deps, id_to_name) - - if args.all: - affected = {id_to_name[cid]: {"reason": "all", "via": None} for cid in workspace_ids} - else: - # Get changed files - if args.changed_files: - changed_files = [f.strip() for f in args.changed_files.split(",") if f.strip()] - else: - changed_files = get_changed_files(args.base_ref) - - if not changed_files: - if args.format == "json": - print(json.dumps({"affected_crates": [], "tiers": {}, "max_tier": -1, - "nextest_filter": "", "nextest_packages": []})) - elif args.format in ("nextest-filter", "packages"): - pass # empty output - else: - print("No changed files detected.") - return - - # Map files to crates - directly_changed = map_files_to_crates(changed_files, crate_paths) - - # Workspace-level files (Cargo.toml, Cargo.lock, etc.) affect everything - workspace_files = [] - for f in changed_files: - matched = False - for crate_dir in crate_paths.values(): - if f.startswith(crate_dir + "/"): - matched = True - break - if not matched and ( - f.endswith("Cargo.toml") - or f.endswith("Cargo.lock") - or f.endswith("rust-toolchain.toml") - or f.startswith(".cargo/") - ): - workspace_files.append(f) - - if workspace_files: - # Workspace-level change: all crates affected - if args.format not in ("nextest-filter", "packages"): - print( - f"Workspace-level files changed ({', '.join(workspace_files)}), " - "all crates affected.", - file=sys.stderr, - ) - affected = {id_to_name[cid]: {"reason": "workspace", "via": None} - for cid in workspace_ids} - else: - # Compute affected set with transitive reverse deps - affected = compute_affected_set( - directly_changed.keys(), rev_deps, name_to_id, id_to_name - ) - - # Format output - if args.format == "human": - print(format_human(affected, tiers, layers, rev_deps, id_to_name, - name_to_id, crate_paths, args.all)) - elif args.format == "json": - print(format_json(affected, tiers, layers, rev_deps, id_to_name, - name_to_id, args.all)) - elif args.format == "nextest-filter": - result = format_nextest_filter(affected, args.all, tiers) - if result: - print(result) - elif args.format == "packages": - result = format_packages(affected, args.all, tiers) - if result: - print(result) - - -if __name__ == "__main__": - main()