diff --git a/CHANGELOG.md b/CHANGELOG.md index c04a6d2e84..08b93ff071 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ ### Added +- Add deterministic execution mode for atomic operations via `wp.config.deterministic = True`. + Floating-point atomic accumulations use a scatter-sort-reduce strategy for bit-exact + reproducibility across runs. Counter/allocator atomics (where the return value is used) + use automatic two-pass execution with prefix-sum-based slot assignment. Configurable at + the global, module, and kernel level. - Add double-precision (`wp.float64`) support to `warp.fem`. Precision is selected via the geometry (e.g. `scalar_type=wp.float64` on grid constructors) and propagated automatically to function spaces, quadrature, fields, and integration kernels diff --git a/asv/benchmarks/atomics.py b/asv/benchmarks/atomics.py index 216d44d613..e1a818f76d 100644 --- a/asv/benchmarks/atomics.py +++ b/asv/benchmarks/atomics.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Benchmarks for atomic operations under high thread contention. +"""Benchmarks for atomic operations and deterministic mode overhead. All threads write to a single output location (index 0) to maximize contention and measure worst-case atomic operation performance. @@ -25,6 +25,8 @@ import warp as wp +wp.set_module_options({"enable_backward": False}) + # Map string parameter names to warp dtypes DTYPE_MAP = { "float32": wp.float32, @@ -32,6 +34,9 @@ } NUM_ELEMENTS = 32 * 1024 * 1024 +DETERMINISTIC_NUM_ELEMENTS = 1 * 1024 * 1024 +COUNTER_NUM_ELEMENTS = 4 * 1024 * 1024 +DETERMINISTIC_BENCHMARK_SIZES = [64 * 1024, 256 * 1024, 1024 * 1024] @wp.kernel @@ -54,6 +59,60 @@ def min_kernel( wp.atomic_min(out, 0, val) # All threads contend on out[0] +@wp.kernel +def scatter_add_kernel( + vals: wp.array(dtype=wp.float32), + indices: wp.array(dtype=wp.int32), + out: wp.array(dtype=wp.float32), +): + tid = wp.tid() + wp.atomic_add(out, indices[tid], vals[tid]) + + +@wp.kernel(deterministic=True, deterministic_max_records=1) +def scatter_add_kernel_deterministic( + vals: wp.array(dtype=wp.float32), + indices: wp.array(dtype=wp.int32), + out: wp.array(dtype=wp.float32), +): + tid = wp.tid() + wp.atomic_add(out, indices[tid], vals[tid]) + + +@wp.kernel +def counter_kernel( + vals: wp.array(dtype=wp.float32), + counter: wp.array(dtype=wp.int32), + out: wp.array(dtype=wp.float32), +): + tid = wp.tid() + slot = wp.atomic_add(counter, 0, 1) + out[slot] = vals[tid] + + +@wp.kernel(deterministic=True, deterministic_max_records=1) +def counter_kernel_deterministic( + vals: wp.array(dtype=wp.float32), + counter: wp.array(dtype=wp.int32), + out: wp.array(dtype=wp.float32), +): + tid = wp.tid() + slot = wp.atomic_add(counter, 0, 1) + out[slot] = vals[tid] + + +@wp.kernel +def zero_float_array_kernel(out: wp.array(dtype=wp.float32)): + tid = wp.tid() + out[tid] = 0.0 + + +@wp.kernel +def zero_int_array_kernel(out: wp.array(dtype=wp.int32)): + tid = wp.tid() + out[tid] = 0 + + class AtomicMax: """Benchmark wp.atomic_max() with high thread contention. @@ -166,3 +225,162 @@ def time_cuda(self, vals_np_dict, dtype_str): self.out.zero_() self.cmd.launch() wp.synchronize_device(self.device) + + +class AtomicAddDeterminismOverhead: + """Benchmark the overhead of deterministic accumulation atomics. + + The benchmark compares the normal atomic-add path against deterministic + scatter-sort-reduce for the same kernel using CUDA graph replay. A small + size sweep exposes where deterministic execution crosses over. Two + destination counts are used: + + - ``1``: worst-case contention, where every thread targets the same output. + - ``65536``: lower contention, closer to a scatter workload. + """ + + params = (["normal", "deterministic"], [1, 65536], DETERMINISTIC_BENCHMARK_SIZES) + param_names = ["mode", "num_outputs", "num_elements"] + + repeat = 10 + number = 5 + + def setup_cache(self): + rng = np.random.default_rng(123) + vals_np = {n: rng.random(n, dtype=np.float32) for n in DETERMINISTIC_BENCHMARK_SIZES} + indices_np = {} + for n in DETERMINISTIC_BENCHMARK_SIZES: + indices_np[n] = { + 1: np.zeros(n, dtype=np.int32), + 65536: rng.integers(0, 65536, size=n, dtype=np.int32), + } + return vals_np, indices_np + + def setup(self, cache, mode, num_outputs, num_elements): + wp.init() + self.device = wp.get_device("cuda:0") + + vals_np, indices_np = cache + self.vals = wp.array(vals_np[num_elements], dtype=wp.float32, device=self.device) + self.indices = wp.array(indices_np[num_elements][num_outputs], dtype=wp.int32, device=self.device) + self.out = wp.zeros(shape=(num_outputs,), dtype=wp.float32, device=self.device) + + self.kernel = scatter_add_kernel_deterministic if mode == "deterministic" else scatter_add_kernel + wp.launch( + zero_float_array_kernel, + dim=num_outputs, + inputs=[self.out], + device=self.device, + ) + wp.launch( + self.kernel, + (num_elements,), + inputs=[self.vals, self.indices], + outputs=[self.out], + device=self.device, + ) + wp.synchronize_device(self.device) + + with wp.ScopedCapture(device=self.device, force_module_load=False) as capture: + wp.launch( + zero_float_array_kernel, + dim=num_outputs, + inputs=[self.out], + device=self.device, + ) + wp.launch( + self.kernel, + (num_elements,), + inputs=[self.vals, self.indices], + outputs=[self.out], + device=self.device, + ) + + self.graph = capture.graph + + for _ in range(5): + wp.capture_launch(self.graph) + wp.synchronize_device(self.device) + + def time_cuda(self, cache, mode, num_outputs, num_elements): + wp.capture_launch(self.graph) + wp.synchronize_device(self.device) + + +class AtomicCounterDeterminismOverhead: + """Benchmark the overhead of deterministic counter/allocator atomics. + + The timed path uses CUDA graph replay and includes resetting the output + state inside the captured graph so the benchmark isolates device work. + """ + + params = (["normal", "deterministic"], DETERMINISTIC_BENCHMARK_SIZES) + param_names = ["mode", "num_elements"] + + repeat = 10 + number = 5 + + def setup_cache(self): + rng = np.random.default_rng(321) + return {n: rng.random(n, dtype=np.float32) for n in DETERMINISTIC_BENCHMARK_SIZES} + + def setup(self, vals_np, mode, num_elements): + wp.init() + self.device = wp.get_device("cuda:0") + + self.vals = wp.array(vals_np[num_elements], dtype=wp.float32, device=self.device) + self.counter = wp.zeros(shape=(1,), dtype=wp.int32, device=self.device) + self.out = wp.zeros(shape=(num_elements,), dtype=wp.float32, device=self.device) + + self.kernel = counter_kernel_deterministic if mode == "deterministic" else counter_kernel + wp.launch( + zero_int_array_kernel, + dim=1, + inputs=[self.counter], + device=self.device, + ) + wp.launch( + zero_float_array_kernel, + dim=num_elements, + inputs=[self.out], + device=self.device, + ) + wp.launch( + self.kernel, + (num_elements,), + inputs=[self.vals, self.counter], + outputs=[self.out], + device=self.device, + ) + wp.synchronize_device(self.device) + + with wp.ScopedCapture(device=self.device, force_module_load=False) as capture: + wp.launch( + zero_int_array_kernel, + dim=1, + inputs=[self.counter], + device=self.device, + ) + wp.launch( + zero_float_array_kernel, + dim=num_elements, + inputs=[self.out], + device=self.device, + ) + wp.launch( + self.kernel, + (num_elements,), + inputs=[self.vals, self.counter], + outputs=[self.out], + device=self.device, + ) + + self.graph = capture.graph + + for _ in range(5): + wp.capture_launch(self.graph) + wp.synchronize_device(self.device) + + def time_cuda(self, vals_np, mode, num_elements): + wp.capture_launch(self.graph) + wp.synchronize_device(self.device) diff --git a/build_lib.py b/build_lib.py index 4e9c252022..c6c7973f10 100644 --- a/build_lib.py +++ b/build_lib.py @@ -522,6 +522,7 @@ def main(argv: list[str] | None = None) -> int: "native/texture.cpp", "native/mathdx.cpp", "native/coloring.cpp", + "native/deterministic.cpp", "native/fastcall.cpp", ] warp_cpp_paths = [os.path.join(build_path, cpp) for cpp in cpp_sources] @@ -533,6 +534,7 @@ def main(argv: list[str] | None = None) -> int: else: cuda_sources = [ "native/bvh.cu", + "native/deterministic.cu", "native/mesh.cu", "native/sort.cu", "native/hashgrid.cu", diff --git a/design/deterministic-execution.md b/design/deterministic-execution.md new file mode 100644 index 0000000000..07eea04c20 --- /dev/null +++ b/design/deterministic-execution.md @@ -0,0 +1,249 @@ +# Deterministic Execution Mode + +**Status**: Implemented + +## Motivation + +GPU atomic operations on floating-point arrays are inherently non-deterministic: +threads execute in unpredictable order, and since float addition is +non-associative, different execution orderings produce different rounding, +yielding different results each run. This also applies to counter/slot-allocation +patterns (``slot = wp.atomic_add(counter, 0, 1)``) where the thread-to-slot +assignment varies across runs, causing downstream writes to differ. + +Customers need bit-exact reproducibility for debugging, regression testing, and +certification workflows. The manual workaround --- rewriting algorithms to use +two-pass count-scan-write patterns or sorted reductions --- is painful and +error-prone. Users want a simple toggle that makes their existing code +deterministic without algorithm rewrites. + +## Requirements + +| ID | Requirement | Priority | Notes | +| --- | --- | --- | --- | +| R1 | ``wp.config.deterministic = "run_to_run"`` makes float atomic accumulations bit-exact reproducible across runs | Must | Core value proposition | +| R2 | Counter/allocator pattern (``slot = wp.atomic_add(counter, 0, 1)``) produces deterministic slot assignments | Must | Common in compaction, particle emission | +| R3 | Both patterns work in the same kernel simultaneously | Must | Real workloads mix accumulation and allocation | +| R4 | Integer atomics with unused return values incur no overhead | Must | Already associative+commutative | +| R5 | CPU execution unaffected (already sequential/deterministic) | Must | Zero overhead on CPU | +| R6 | Per-module and per-kernel granularity via ``module_options`` | Should | Allows selective opt-in | +| R7 | Backward pass (autodiff) gradient accumulation is also deterministic | Should | Adjoint atomics are Pattern A | +| R8 | Multiple target arrays in one kernel each get independent buffers | Must | Real kernels write to N arrays | + +**Non-goals**: +- ``atomic_cas``/``atomic_exch`` determinism (inherently order-dependent). +- Tile-level atomic operations (``tile_atomic_add``). +- Kernels where counter contributions depend on scratch array writes within the + same kernel (Phase 0 suppresses all side effects; documented limitation). + +## User Configuration + +Deterministic mode can be enabled at three scopes: + +- **Global**: set ``wp.config.deterministic`` to one of: + - ``"not_guaranteed"``: default behavior, no deterministic transform. + - ``"run_to_run"``: bit-exact reproducibility across repeated runs on the + same GPU architecture. + - ``"gpu_to_gpu"``: stronger reduction path intended to preserve identical + results across GPU architectures as well. +- **Global diagnostics**: set ``wp.config.deterministic_debug = True`` to emit + debug diagnostics for deterministic scatter overflow. +- **Per shared module**: call ``wp.set_module_options({"deterministic": "run_to_run"})`` + in the Python module that defines the kernels, just like other module-level + options such as ``enable_backward``. +- **Per kernel**: use ``@wp.kernel(deterministic="run_to_run")`` and optionally set a + per-target, per-thread scatter record limit with + ``deterministic_max_records=...``. + +For backward compatibility, ``True`` and ``False`` are still accepted at the +module and kernel level as aliases for ``"run_to_run"`` and +``"not_guaranteed"``, respectively. + +Like ``enable_backward``, the setting participates in module compilation and +hashing. A kernel defined in a shared module inherits that module's +``deterministic`` option; a unique-module kernel can override it independently. + +## Supported Operators + +Deterministic mode currently handles these atomic builtins: + +- ``atomic_add`` +- ``atomic_sub`` +- ``atomic_min`` +- ``atomic_max`` + +Handling depends on how the atomic is used: + +- **Pattern A: accumulation, return value unused** + Examples: ``wp.atomic_add(arr, i, value)``, ``arr[i] += value``. + Floating-point ``add/sub/min/max`` on scalar and composite leaf types are + redirected through scatter-sort-reduce. This includes vectors, matrices, + quaternions, and transforms with floating-point components. + Integer ``add/sub/min/max`` with unused return values are left on the normal + atomic path because the final value is already deterministic. +- **Pattern B: counter / allocator, return value consumed** + Example: ``slot = wp.atomic_add(counter, 0, 1)``. + This is handled with the two-pass count-scan-execute path. + +Operators that are not supported by deterministic mode: + +- ``atomic_cas`` +- ``atomic_exch`` +- Tile atomics such as ``tile_atomic_add`` + +Bitwise integer atomics (``atomic_and``, ``atomic_or``, ``atomic_xor``) are not +transformed because their final results are already deterministic for the +unused-return case. + +## Design + +### Approach + +Two distinct atomic usage patterns require two strategies, both transparent to +the user: + +**Pattern A --- Scatter-Sort-Reduce** (accumulation, return value unused): + +Instead of performing ``atomic_add`` in-place during kernel execution, each +thread writes a ``(sort_key, value)`` record to a temporary scatter buffer. The +sort key packs ``(dest_index << 32 | thread_id)``. After the kernel completes, +a CUB radix sort orders the fixed-capacity scatter buffer by key and record +index. The post-sort reduction depends on the selected determinism guarantee: + +- ``"run_to_run"``: scalar reductions use CUB ``DeviceReduce::ReduceByKey`` on + destination indices for better performance. +- ``"gpu_to_gpu"``: reductions use the simple sequential segmented kernel so + the left-to-right accumulation order is explicit in Warp's own code. +- Composite leaf types always use the segmented kernel. + +Unused buffer slots are initialized with an invalid sentinel key and sort to +the end. This avoids host-side scatter count readbacks and keeps the path +compatible with CUDA graph capture. + +**Pattern B --- Two-Pass Execution** (counter/allocator, return value used): + +The kernel runs twice: +1. *Phase 0 (counting)*: The kernel executes with all side effects suppressed. + Counter atomics record per-thread contributions to a scratch array instead + of performing the actual atomic. +2. *Prefix sum*: ``wp.utils.array_scan(contrib, prefix, inclusive=False)`` + computes deterministic per-thread offsets. +3. *Phase 1 (execution)*: The kernel re-executes. Counter atomics return the + deterministic offset from the prefix sum. All other operations (including + Pattern A scatters) execute normally. + +This mirrors the well-established count-scan-write pattern already used by +``warp/_src/marching_cubes.py`` and the FEM geometry code, but applied +automatically. + +### Alternatives Considered + +| Alternative | Why rejected | +| --- | --- | +| Fixed-point integer accumulation | Loses precision/range; not general for all float types | +| Per-thread output arrays | ``O(threads * output_size)`` memory; doesn't scale | +| Serialized atomics (mutex) | Prohibitively slow on GPU | +| Kahan compensated summation | Reduces error but does not guarantee determinism | +| Taint-based selective side-effect suppression for Phase 0 | Significant codegen complexity for an edge case; deferred to a future version | + +### Key Implementation Details + +**Atomic classification** happens in ``Adjoint._emit_deterministic_atomic()`` +during the codegen build phase. The ``_det_in_assign`` flag on the ``Adjoint`` +(set by ``emit_Assign``, cleared after) distinguishes Pattern B (return consumed +in an assignment like ``slot = wp.atomic_add(...)``) from Pattern A (return +discarded, as in ``arr[i] += val`` or bare ``wp.atomic_add(...)``). Only +``atomic_add``, ``atomic_sub``, ``atomic_min``, and ``atomic_max`` are +intercepted. Integer atomics with unused return values are skipped entirely +(already deterministic). + +**CPU/CUDA dual compilation** is handled with ``#ifdef __CUDA_ARCH__`` guards +in the generated function body. On CUDA the scatter or phase-branching code +executes; on CPU the normal ``wp::atomic_add(...)`` call is emitted in the +``#else`` branch. This is necessary because Warp generates a single function +body that compiles for both targets. + +**Hidden kernel parameters** are appended to the CUDA kernel signature by +``codegen_kernel()`` after the user arguments. Pattern B kernels get +``_wp_det_phase``, ``_wp_det_contrib_N``, ``_wp_det_prefix_N``. Pattern A +targets get ``_wp_scatter_keys_N``, ``_wp_scatter_vals_N``, +``_wp_scatter_ctr_N``, ``_wp_scatter_overflow_N``, ``_wp_scatter_cap_N``, and +``_wp_det_debug``. The launch system +(``_launch_deterministic``) allocates these buffers and appends the +corresponding ctypes params to the launch args. + +**Multiple reduction buffers**: each distinct ``(target array, value type, +reduction op)`` combination gets its own scatter buffer set. Multiple call +sites with the same target and reduction op share one buffer. The +``DeterministicMeta`` dataclass on the kernel's ``Adjoint`` tracks all scatter +and counter targets discovered during codegen. + +**Scatter sizing**: each scatter target uses a fixed-capacity buffer sized from +a code-generated lower bound (static records-per-thread analysis). The optional +``deterministic_max_records`` setting overrides the per-thread record count when +users know a thread may revisit the same atomic site multiple times, for +example inside a dynamic loop. Warp uses +``max(codegen_lower_bound, deterministic_max_records)`` records per thread for +each target. On overflow, new records are truncated, a device-side overflow +flag is set, and optional diagnostics may be emitted when +``wp.config.deterministic_debug`` is enabled. + +**Counter total writeback**: after the prefix sum in Phase 0, the launch system +copies the total count (last element of the inclusive scan) back to the actual +counter array so user code that reads it post-launch sees the correct value. + +**Files added/modified**: + +| File | Role | +| --- | --- | +| ``warp/config.py`` | ``deterministic`` global flag | +| ``warp/_src/deterministic.py`` | Dataclasses, buffer allocation, sort-reduce orchestration | +| ``warp/_src/codegen.py`` | Atomic classification, scatter/phase codegen, hidden kernel params | +| ``warp/_src/context.py`` | Module option, ``_launch_deterministic()`` orchestrator, ctypes bindings | +| ``warp/native/deterministic.h`` | Device-side ``wp::deterministic::scatter()`` template | +| ``warp/native/deterministic.cu`` | CUB radix sort + ``ReduceByKey`` / segmented reduce kernels | +| ``warp/native/deterministic.cpp`` | CPU stubs (linker satisfaction when CUDA unavailable) | + +## Testing Strategy + +31 tests in ``warp/tests/test_deterministic.py`` cover: + +- **Bit-exact reproducibility** (Pattern A): launch the same kernel 10 times + with ``deterministic="run_to_run"``, assert ``np.array_equal`` across all + runs for + float32 scatter-add, ``+=`` syntax, float64, and atomic-sub. +- **Composite leaf types**: deterministic ``wp.vec3`` atomic-add and + component-wise ``atomic_min``/``atomic_max``, plus deterministic ``wp.mat33`` + atomic-add. +- **Correctness** (Pattern A): compare GPU deterministic output against a + sequential CPU reference within ``rtol=1e-4``. +- **Multiple arrays**: kernel that atomically adds to three different output + arrays simultaneously. +- **Mixed reduce ops on one array**: ``atomic_add`` and ``atomic_max`` targeting + the same destination array are reduced independently. +- **Multi-dimensional indexing**: 2D array ``atomic_add`` with row/col indices. +- **Scatter capacity accounting**: kernels with more than two deterministic + scatters per thread to the same target do not overflow a fixed heuristic + buffer. +- **Counter reproducibility** (Pattern B): ``slot = atomic_add(counter, 0, 1); + output[slot] = data[tid]`` produces identical output arrays across 10 runs. +- **Phase 0 side-effect suppression**: non-counter array writes are skipped in + the counting pass. +- **Counter correctness**: verifies counter value equals N and output is a + permutation of input. +- **Conditional counter**: stream compaction (only elements above threshold), + verifying correct count and reproducible output. +- **Mixed pattern**: both counter and accumulation in one kernel. +- **Integer passthrough**: integer ``atomic_add`` with unused return incurs no + transformation; result matches ``np.bincount``. +- **Per-module override**: ``@wp.kernel(module_options={"deterministic": "gpu_to_gpu"}, + module="unique")`` works with global config off. +- **Kernel decorator override**: ``@wp.kernel(deterministic="gpu_to_gpu")`` + works with + global config off. +- **Recorded launch support**: ``wp.launch(..., record_cmd=True)`` works for + deterministic CUDA kernels. +- **Graph capture support**: deterministic scatter launches can be captured and + replayed with CUDA graphs, including composite ``wp.vec3`` reductions. +- All tests run on both CPU and CUDA where applicable. Existing + ``test_atomic.py`` (158 tests) passes with zero regressions. diff --git a/warp/_src/codegen.py b/warp/_src/codegen.py index ab225a2263..9077ea64c1 100644 --- a/warp/_src/codegen.py +++ b/warp/_src/codegen.py @@ -29,6 +29,19 @@ # of current compile options (block_dim) etc options = {} +# Atomic builtins that can be intercepted by deterministic mode. +_DET_INTERCEPTABLE_ATOMICS = frozenset( + { + "atomic_add", + "atomic_sub", + "atomic_min", + "atomic_max", + } +) + +# Atomics that are inherently order-dependent (warn but don't intercept). +_DET_ORDER_DEPENDENT_ATOMICS = frozenset({"atomic_cas", "atomic_exch"}) + def get_node_name_safe(node): """Safely get a string representation of an AST node for error messages. @@ -1104,6 +1117,15 @@ def build(adj, builder, default_builder_options=None): global options options = adj.builder_options + # Initialize deterministic mode metadata if enabled + from warp._src.deterministic import DeterministicMeta, is_deterministic_mode_enabled # noqa: PLC0415 + + if is_deterministic_mode_enabled(adj.builder_options.get("deterministic")): + adj.det_meta = DeterministicMeta() + else: + adj.det_meta = None + adj._det_in_assign = False + adj.symbols = {} # map from symbols to adjoint variables adj.variables = [] # list of local variables (in order) adj.deferred_static_expressions = [] @@ -1620,6 +1642,13 @@ def add_call(adj, func, args, kwargs, type_args, min_outputs=None): output = [adj.add_var(v) for v in return_type] output_list = output + # Deterministic mode: intercept atomic builtins and emit scatter or + # two-pass code instead of the normal atomic call. + if adj.det_meta is not None and func.is_builtin() and func.key in _DET_INTERCEPTABLE_ATOMICS: + det_output = adj._emit_deterministic_atomic(func, bound_args, return_type, output, output_list) + if det_output is not None: + return det_output + # If we have a built-in that requires special handling to dispatch # the arguments to the underlying C++ function, then we can resolve # these using the `dispatch_func`. Since this is only called from @@ -1720,6 +1749,173 @@ def add_builtin_call(adj, func_name, args, min_outputs=None): func = warp._src.context.builtin_functions[func_name] return adj.add_call(func, args, {}, {}, min_outputs=min_outputs) + def _emit_deterministic_atomic(adj, func, bound_args, return_type, output, output_list): + """Emit deterministic scatter or two-pass code for an atomic builtin. + + Returns the output Var if the atomic was handled, or None to fall + through to normal codegen. + """ + from warp._src.deterministic import ( # noqa: PLC0415 + REDUCE_OP_ADD, + REDUCE_OP_MAX, + REDUCE_OP_MIN, + get_or_create_counter_target, + get_or_create_scatter_target, + is_float_type, + warp_type_to_ctype, + ) + + args_list = list(bound_args.values()) + arr_var = args_list[0] # the target array + index_vars = args_list[1:-1] # the index arguments (1-4 depending on ndim) + + arr_type = arr_var.type + value_dtype = arr_type.dtype + scalar_dtype = value_dtype + if hasattr(scalar_dtype, "_wp_scalar_type_"): + scalar_dtype = scalar_dtype._wp_scalar_type_ + + # Determine if the return value is actually consumed by the caller. + # When called from emit_AugAssign (arr[i] += val) or a bare expression, + # the return is discarded → Pattern A. + # When called from emit_Assign (slot = wp.atomic_add(...)), + # the return is captured → Pattern B. + return_is_consumed = getattr(adj, "_det_in_assign", False) + + # Integer atomics with associative+commutative ops (add/sub/min/max) + # that don't use the return value are already deterministic — skip. + if not is_float_type(scalar_dtype) and not return_is_consumed: + return None # fall through to normal codegen + + value_ctype = Var.dtype_to_ctype(value_dtype) + scalar_ctype = warp_type_to_ctype(scalar_dtype) + + # C++ zero literal for the value type. + # Cannot use "float(0)" because Warp defines a macro #define float(x) cast_float(x). + _ZERO_LITERALS = { + "float": "0.0f", + "double": "0.0", + "wp::half": "wp::half(0)", + "int": "0", + "unsigned int": "0u", + "int64_t": "int64_t(0)", + "uint64_t": "uint64_t(0)", + } + zero_literal = _ZERO_LITERALS.get(scalar_ctype, f"{scalar_ctype}(0)") + + # Map from builtin name to reduction op + op_map = { + "atomic_add": REDUCE_OP_ADD, + "atomic_sub": REDUCE_OP_ADD, # value is negated before scatter + "atomic_min": REDUCE_OP_MIN, + "atomic_max": REDUCE_OP_MAX, + } + reduce_op = op_map.get(func.key, REDUCE_OP_ADD) + + # Classify: if the return value is consumed, this is Pattern B (counter). + # Otherwise, it's Pattern A (accumulation). + # All deterministic codegen is wrapped in #ifdef __CUDA_ARCH__ so that + # CPU compilation falls through to normal atomic calls (CPU execution + # is already sequential/deterministic). + # We return None for the #else branch, telling add_call to emit the + # normal atomic call below. However, since we can only return once, + # we handle both CUDA and CPU paths here: emit the CUDA path as + # raw C++ with #ifdef, then return output to skip the normal codegen. + + # Build the CPU fallback: a normal atomic call string. + # This is used in the #else branch for CPU compilation. + loaded_args = [adj.load(a) for a in args_list] + cpu_args_str = ", ".join(f"var_{a}" for a in loaded_args) + if output is not None: + cpu_call = f"var_{output} = wp::{func.native_func}({cpu_args_str});" + else: + cpu_call = f"wp::{func.native_func}({cpu_args_str});" + + if return_is_consumed: + # Pattern B: Counter/Allocator + target = get_or_create_counter_target(adj.det_meta, arr_var.label, scalar_ctype) + N = target.index + + val_loaded = loaded_args[-1] # already loaded above + + adj.add_forward("#ifdef __CUDA_ARCH__", skip_replay=True) + adj.add_forward( + f"if (_wp_det_phase == 0) {{ " + f"_wp_det_contrib_{N}[_idx] += var_{val_loaded}; " + f"var_{output} = {zero_literal}; " + f"}} else {{ " + f"var_{output} = static_cast<{scalar_ctype}>(_wp_det_prefix_{N}[_idx]); " + f"_wp_det_prefix_{N}[_idx] += var_{val_loaded}; " + f"}}", + replay="// deterministic counter replay (skipped)", + ) + adj.add_forward("#else", skip_replay=True) + adj.add_forward(cpu_call, replay="// " + cpu_call) + adj.add_forward("#endif", skip_replay=True) + return output + + # Pattern A: Accumulation (return value unused) + target = get_or_create_scatter_target( + adj.det_meta, + arr_var.label, + value_dtype, + value_ctype, + scalar_dtype, + reduce_op, + ) + N = target.index + + arr_loaded = loaded_args[0] + val_loaded = loaded_args[-1] + idx_loaded_list = loaded_args[1:-1] + + ndim = len(index_vars) + if ndim == 1: + flat_idx_expr = f"var_{idx_loaded_list[0]}" + elif ndim == 2: + flat_idx_expr = f"(var_{idx_loaded_list[0]} * var_{arr_loaded}.shape[1] + var_{idx_loaded_list[1]})" + elif ndim == 3: + flat_idx_expr = ( + f"(var_{idx_loaded_list[0]} * var_{arr_loaded}.shape[1] * var_{arr_loaded}.shape[2] " + f"+ var_{idx_loaded_list[1]} * var_{arr_loaded}.shape[2] + var_{idx_loaded_list[2]})" + ) + elif ndim == 4: + flat_idx_expr = ( + f"(var_{idx_loaded_list[0]} * var_{arr_loaded}.shape[1] * var_{arr_loaded}.shape[2] * var_{arr_loaded}.shape[3] " + f"+ var_{idx_loaded_list[1]} * var_{arr_loaded}.shape[2] * var_{arr_loaded}.shape[3] " + f"+ var_{idx_loaded_list[2]} * var_{arr_loaded}.shape[3] + var_{idx_loaded_list[3]})" + ) + else: + flat_idx_expr = "0" + + val_expr = f"var_{val_loaded}" + if func.key == "atomic_sub": + val_expr = f"(-{val_expr})" + + phase_guard_open = "" + phase_guard_close = "" + if adj.det_meta.has_counter: + phase_guard_open = "if (_wp_det_phase != 0) { " + phase_guard_close = " }" + + adj.add_forward("#ifdef __CUDA_ARCH__", skip_replay=True) + adj.add_forward( + f"{phase_guard_open}" + f"wp::deterministic::scatter(" + f"_wp_scatter_keys_{N}, _wp_scatter_vals_{N}, " + f"_wp_scatter_ctr_{N}, _wp_scatter_overflow_{N}, _wp_scatter_cap_{N}, " + f"_wp_det_debug, " + f"static_cast({flat_idx_expr}), _idx, {val_expr});" + f"{phase_guard_close}", + replay="// deterministic scatter replay (skipped)", + ) + if output is not None: + adj.add_forward(f"var_{output} = {zero_literal};") + adj.add_forward("#else", skip_replay=True) + adj.add_forward(cpu_call, replay="// " + cpu_call) + adj.add_forward("#endif", skip_replay=True) + return output + def add_grad_call(adj, func, args, kwargs): """Generate code for calling the gradient of a function via warp.grad(). @@ -3144,10 +3340,16 @@ def emit_Assign(adj, node): node.value.expects = len(lhs.elts) # evaluate rhs - if isinstance(lhs, ast.Tuple) and isinstance(node.value, ast.Tuple): - rhs = [adj.eval(v) for v in node.value.elts] - else: - rhs = adj.eval(node.value) + # Mark that return values are consumed (used by deterministic atomic + # classification to distinguish Pattern B from Pattern A). + adj._det_in_assign = True + try: + if isinstance(lhs, ast.Tuple) and isinstance(node.value, ast.Tuple): + rhs = [adj.eval(v) for v in node.value.elts] + else: + rhs = adj.eval(node.value) + finally: + adj._det_in_assign = False # handle the case where we are assigning multiple output variables if isinstance(lhs, ast.Tuple): @@ -3243,7 +3445,20 @@ def _store_subscript(adj, lhs, target, indices, rhs): target_type = strip_reference(target.type) if is_array(target_type): - adj.add_builtin_call("array_store", [target, *indices, rhs]) + # Deterministic two-pass mode must suppress normal array writes in + # phase 0 so the counting pass does not introduce side effects. + if adj.det_meta is not None and adj.det_meta.has_counter: + loaded_store_args = [adj.load(x) for x in (target, *indices, rhs)] + cpu_store_args = ", ".join(f"var_{x}" for x in loaded_store_args) + adj.add_forward("#ifdef __CUDA_ARCH__", skip_replay=True) + adj.add_forward("if (_wp_det_phase != 0) {", skip_replay=True) + adj.add_builtin_call("array_store", [target, *indices, rhs]) + adj.add_forward("}", skip_replay=True) + adj.add_forward("#else", skip_replay=True) + adj.add_forward(f"wp::array_store({cpu_store_args});") + adj.add_forward("#endif", skip_replay=True) + else: + adj.add_builtin_call("array_store", [target, *indices, rhs]) if adj.builder_options.get("verify_autograd_array_access", False): kernel_name = adj.fun_name @@ -4144,6 +4359,7 @@ def get_references(adj) -> tuple[dict[str, Any], dict[Any, Any], dict[warp._src. #define WP_TILE_BLOCK_DIM {block_dim} #define WP_NO_CRT #include "builtin.h" +#include "deterministic.h" // Map wp.breakpoint() to a device brkpt at the call site so cuda-gdb attributes the stop to the generated .cu line #if defined(__CUDACC__) && !defined(_MSC_VER) @@ -4934,6 +5150,23 @@ def codegen_kernel(kernel, device, options): for arg in adj.args: forward_args.append(arg.ctype() + " var_" + arg.label) + # Deterministic mode: add hidden kernel parameters for scatter/counter buffers. + if device == "cuda" and adj.det_meta is not None and adj.det_meta.needs_deterministic: + det = adj.det_meta + if det.has_counter: + forward_args.append("int _wp_det_phase") + for ct in det.counter_targets: + forward_args.append(f"int* _wp_det_contrib_{ct.index}") + forward_args.append(f"int* _wp_det_prefix_{ct.index}") + for st in det.scatter_targets: + forward_args.append(f"int64_t* _wp_scatter_keys_{st.index}") + forward_args.append(f"{st.value_ctype}* _wp_scatter_vals_{st.index}") + forward_args.append(f"int* _wp_scatter_ctr_{st.index}") + forward_args.append(f"int* _wp_scatter_overflow_{st.index}") + forward_args.append(f"int _wp_scatter_cap_{st.index}") + if det.has_scatter: + forward_args.append("int _wp_det_debug") + forward_body = codegen_func_forward(adj, func_type="kernel", device=device) template_fmt_args.update( { @@ -4961,6 +5194,24 @@ def codegen_kernel(kernel, device, options): else: reverse_args.append(arg.ctype() + " adj_" + arg.label) + # Deterministic mode: backward kernel also gets scatter buffer params + # for deterministic gradient accumulation. + if device == "cuda" and adj.det_meta is not None and adj.det_meta.needs_deterministic: + det = adj.det_meta + if det.has_counter: + reverse_args.append("int _wp_det_phase") + for ct in det.counter_targets: + reverse_args.append(f"int* _wp_det_contrib_{ct.index}") + reverse_args.append(f"int* _wp_det_prefix_{ct.index}") + for st in det.scatter_targets: + reverse_args.append(f"int64_t* _wp_scatter_keys_{st.index}") + reverse_args.append(f"{st.value_ctype}* _wp_scatter_vals_{st.index}") + reverse_args.append(f"int* _wp_scatter_ctr_{st.index}") + reverse_args.append(f"int* _wp_scatter_overflow_{st.index}") + reverse_args.append(f"int _wp_scatter_cap_{st.index}") + if det.has_scatter: + reverse_args.append("int _wp_det_debug") + reverse_body = codegen_func_reverse(adj, func_type="kernel", device=device) template_fmt_args.update( { diff --git a/warp/_src/context.py b/warp/_src/context.py index ffcb771c26..fb327dcf06 100644 --- a/warp/_src/context.py +++ b/warp/_src/context.py @@ -1228,6 +1228,8 @@ def kernel( f: Callable | None = None, *, enable_backward: bool | None = None, + deterministic: str | bool | None = None, + deterministic_max_records: int | None = None, launch_bounds: tuple[int, ...] | int | None = None, module: Module | Literal["unique"] | str | None = None, module_options: dict[str, Any] | None = None, @@ -1273,10 +1275,29 @@ def my_kernel_fast(a: wp.array(dtype=float), b: wp.array(dtype=float)): tid = wp.tid() b[tid] = a[tid] + 1.0 + + @wp.kernel(deterministic="run_to_run", deterministic_max_records=8) + def my_kernel_deterministic(a: wp.array(dtype=float), b: wp.array(dtype=float)): + # deterministic scatter buffers will assume each thread emits at + # most 8 records per target, unless codegen proves a larger lower bound + tid = wp.tid() + wp.atomic_add(b, tid % 16, a[tid]) + Args: f: The function to be registered as a kernel. enable_backward: If False, the backward pass will not be generated. + deterministic: Determinism guarantee for supported atomic operations in + this kernel. Accepted values are ``"not_guaranteed"`` (disable the + transform), ``"run_to_run"``, and ``"gpu_to_gpu"``. ``True`` and + ``False`` are accepted for backward compatibility and map to + ``"run_to_run"`` and ``"not_guaranteed"``, respectively. + deterministic_max_records: Optional per-target, per-thread upper + bound for the number of deterministic scatter records a thread may + emit. Use this when a thread can execute the same atomic site + multiple times, for example inside a dynamic loop. Warp still uses + the code-generated static record count as a lower bound, so the + larger of the two values is used. launch_bounds: CUDA ``__launch_bounds__`` attribute for the kernel. Can be an int (``maxThreadsPerBlock``) or a tuple of 1-2 ints ``(maxThreadsPerBlock, @@ -1290,8 +1311,9 @@ def my_kernel_fast(a: wp.array(dtype=float), b: wp.array(dtype=float)): after the kernel name and hash. If ``None``, the module is inferred from the function's module. module_options: A dict of module-level compilation options - (e.g. ``fast_math``, ``mode``, ``max_unroll``) that are - applied to the kernel's module. Requires + (e.g. ``fast_math``, ``mode``, ``max_unroll``, + ``deterministic``, ``deterministic_max_records``) that are applied to the kernel's + module. Requires ``module="unique"``; raises ``ValueError`` otherwise. For shared modules, use :func:`warp.set_module_options` instead. See :func:`warp.set_module_options` for the full @@ -1304,9 +1326,19 @@ def my_kernel_fast(a: wp.array(dtype=float), b: wp.array(dtype=float)): def wrapper(f, *args, **kwargs): kernel_options = {} + from warp._src.deterministic import normalize_determinism_mode # noqa: PLC0415 + if enable_backward is not None: kernel_options["enable_backward"] = enable_backward + if deterministic is not None: + kernel_options["deterministic"] = normalize_determinism_mode( + deterministic, option_name="deterministic", allow_none=True + ) + + if deterministic_max_records is not None: + kernel_options["deterministic_max_records"] = deterministic_max_records + if launch_bounds is not None: kernel_options["launch_bounds"] = launch_bounds @@ -1931,6 +1963,7 @@ class ModuleHasher: def __init__(self, kernels, options): # cache function hashes to avoid hashing multiple times self.function_hashes = {} # (function: hash) + self.options = options # avoid recursive spiral of doom (e.g., function calling an overload of itself) self.functions_in_progress = set() @@ -1989,7 +2022,22 @@ def hash_kernel(self, kernel: Kernel) -> bytes: ch = hashlib.sha256() + resolved_options = self.options | kernel.options + + # Deterministic launches need ``adj.det_meta`` at runtime even when the + # module is loaded from a cache hit and code generation is skipped. + # Build the adjoint once during hashing so the launch path has the same + # metadata it would have received on a fresh compile. + from warp._src.deterministic import is_deterministic_mode_enabled # noqa: PLC0415 + + if is_deterministic_mode_enabled(resolved_options.get("deterministic")) and not hasattr(kernel.adj, "det_meta"): + kernel.adj.build(None, resolved_options) + ch.update(bytes(kernel.key, "utf-8")) + if kernel.options: + for key in sorted(kernel.options): + ch.update(bytes(key, "utf-8")) + ch.update(bytes(repr(kernel.options[key]), "utf-8")) ch.update(self.hash_adjoint(kernel.adj)) h = ch.digest() @@ -2170,10 +2218,15 @@ def build_struct(self, struct): self.structs[struct] = None def build_kernel(self, kernel): - if kernel.options.get("enable_backward", True): - kernel.adj.used_by_backward_kernel = True + prev_options = self.options + self.options = self.options | kernel.options + try: + if kernel.options.get("enable_backward", True): + kernel.adj.used_by_backward_kernel = True - kernel.adj.build(self) + kernel.adj.build(self) + finally: + self.options = prev_options if kernel.adj.return_var is not None: raise WarpCodegenTypeError(f"'{kernel.key}': Error, kernels can't have return values") @@ -2480,6 +2533,8 @@ def __init__(self, name: str | None, loader=None): "block_dim": 256, "compile_time_trace": warp.config.compile_time_trace, "strip_hash": False, + "deterministic": None, + "deterministic_max_records": None, } # Module dependencies are determined by scanning each function @@ -2518,6 +2573,16 @@ def resolve_options(self, config) -> dict: if options["enable_mathdx_gemm"] is None: options["enable_mathdx_gemm"] = config.enable_mathdx_gemm + from warp._src.deterministic import normalize_determinism_mode # noqa: PLC0415 + + # Resolve None-means-inherit for deterministic + if options["deterministic"] is None: + options["deterministic"] = config.deterministic + options["deterministic"] = normalize_determinism_mode(options["deterministic"], option_name="deterministic") + + if options["deterministic_max_records"] is None: + options["deterministic_max_records"] = 0 + # Fold in global config flags that affect compilation options["verify_fp"] = config.verify_fp options["line_directives"] = config.line_directives @@ -4429,6 +4494,20 @@ def __init__(self): ctypes.c_int, ] + # Deterministic mode: sort scatter records and apply + # component-wise segmented reduction for scalar/composite values. + self.core.wp_deterministic_sort_reduce_device.argtypes = [ + ctypes.c_uint64, + ctypes.c_uint64, + ctypes.c_int, + ctypes.c_uint64, + ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + ] + self.core.wp_bvh_create_host.restype = ctypes.c_uint64 self.core.wp_bvh_create_host.argtypes = [ ctypes.c_void_p, @@ -7421,6 +7500,211 @@ def launch(self, stream: Stream | None = None) -> None: ) +class DeterministicLaunch(Launch): + """Recorded launch wrapper for deterministic forward CUDA kernels.""" + + def __init__( + self, + kernel, + device: Device, + det_meta, + fwd_args: Sequence[Any], + hooks: KernelHooks | None = None, + params: Sequence[Any] | None = None, + bounds: launch_bounds_t | None = None, + max_blocks: int = 0, + block_dim: int = 256, + ): + super().__init__( + kernel=kernel, + device=device, + hooks=hooks, + params=params, + params_addr=None, + bounds=bounds, + max_blocks=max_blocks, + block_dim=block_dim, + adjoint=False, + ) + self.det_meta = det_meta + self.fwd_args = list(fwd_args) + + def set_param_at_index(self, index: int, value: Any, adjoint: bool = False): + super().set_param_at_index(index, value, adjoint) + if not adjoint and index < len(self.fwd_args): + self.fwd_args[index] = value + + def launch(self, stream: Stream | None = None) -> None: + if stream is None: + stream = self.device.stream + + _launch_deterministic( + self.kernel, + self.hooks, + self.params, + self.bounds, + self.device, + stream, + self.max_blocks, + self.block_dim, + self.det_meta, + self.fwd_args, + module_exec=self.module_exec, + ) + + +def _launch_deterministic( + kernel, hooks, user_params, bounds, device, stream, max_blocks, block_dim, det_meta, fwd_args, module_exec=None +): + """Orchestrate a deterministic kernel launch with scatter-sort-reduce and/or two-pass execution. + + This is called from launch() when deterministic mode is active and the + kernel has atomic operations that require deterministic treatment. + """ + from warp._src.deterministic import ( # noqa: PLC0415 + allocate_counter_buffers, + allocate_scatter_buffers, + normalize_determinism_mode, + run_sort_reduce, + ) + + dim_size = bounds.size + options = kernel.module.resolve_options(warp.config) | kernel.options + determinism_mode = normalize_determinism_mode(options.get("deterministic"), option_name="deterministic") + max_scatter_records = max(0, int(options.get("deterministic_max_records", 0) or 0)) + det_debug = int(warp.config.deterministic_debug) + + # Allocate buffers. + scatter_bufs = ( + allocate_scatter_buffers(det_meta.scatter_targets, dim_size, device, max_records=max_scatter_records) + if det_meta.has_scatter + else [] + ) + counter_bufs = allocate_counter_buffers(det_meta.counter_targets, dim_size, device) if det_meta.has_counter else [] + + # Build the extra deterministic parameters (must match codegen_kernel order). + def build_det_params(phase, scatter_bufs, counter_bufs, use_scatter): + det_params = [] + if det_meta.has_counter: + det_params.append(ctypes.c_int(phase)) + for i, _ct in enumerate(det_meta.counter_targets): + contrib, prefix = counter_bufs[i] + if phase == 0: + det_params.append(ctypes.c_void_p(contrib.ptr)) + det_params.append(ctypes.c_void_p(0)) # prefix not used in phase 0 + else: + det_params.append(ctypes.c_void_p(0)) # contrib not used in phase 1 + det_params.append(ctypes.c_void_p(prefix.ptr)) + for i, _st in enumerate(det_meta.scatter_targets): + if use_scatter and i < len(scatter_bufs): + keys, values, counter, overflow, capacity = scatter_bufs[i] + det_params.append(ctypes.c_void_p(keys.ptr)) + det_params.append(ctypes.c_void_p(values.ptr)) + det_params.append(ctypes.c_void_p(counter.ptr)) + det_params.append(ctypes.c_void_p(overflow.ptr)) + det_params.append(ctypes.c_int(capacity)) + else: + # Null scatter buffers (phase 0 doesn't scatter). + det_params.append(ctypes.c_void_p(0)) + det_params.append(ctypes.c_void_p(0)) + det_params.append(ctypes.c_void_p(0)) + det_params.append(ctypes.c_void_p(0)) + det_params.append(ctypes.c_int(0)) + if det_meta.has_scatter: + det_params.append(ctypes.c_int(det_debug)) + return det_params + + def do_cuda_launch(hook, params_list): + kernel_args = [ctypes.c_void_p(ctypes.addressof(x)) for x in params_list] + kernel_params = (ctypes.c_void_p * len(kernel_args))(*kernel_args) + + if ( + module_exec is not None + and len(runtime.captures) > 0 + and runtime.core.wp_cuda_stream_is_capturing(stream.cuda_stream) + ): + capture_id = runtime.core.wp_cuda_stream_get_capture_id(stream.cuda_stream) + graph = runtime.captures.get(capture_id) + if graph is not None: + graph.retain_module_exec(module_exec) + + runtime.core.wp_cuda_launch_kernel( + device.context, + hook, + bounds.size, + max_blocks, + block_dim, + hooks.forward_smem_bytes, + kernel_params, + stream.cuda_stream, + ) + + if det_meta.has_counter: + # === Two-pass execution === + + # Phase 0: counting pass (side effects suppressed, scatter disabled). + det_params_p0 = build_det_params( + phase=0, scatter_bufs=scatter_bufs, counter_bufs=counter_bufs, use_scatter=False + ) + # Append det params after all user args (matches codegen_kernel order: + # dim, user_args..., det_params...). + params_p0 = [*user_params, *det_params_p0] + do_cuda_launch(hooks.forward, params_p0) + + # Prefix sum on each counter's contributions, then update the real counter. + for i, ct in enumerate(det_meta.counter_targets): + contrib, prefix = counter_bufs[i] + warp._src.utils.array_scan(contrib, prefix, inclusive=False) + + # Write the total count to the actual counter array so user code + # that reads it after the launch sees the correct value. + # Total = exclusive_prefix[-1] + contrib[-1]. + # Use inclusive scan's last element = total + inclusive_out = warp.empty(shape=(dim_size,), dtype=warp.int32, device=device) + warp._src.utils.array_scan(contrib, inclusive_out, inclusive=True) + # Find the counter array in fwd_args and add the total. + for j, arg in enumerate(kernel.adj.args): + if arg.label == ct.array_var_label: + counter_arr = fwd_args[j] + # Copy the total (last element of inclusive scan) to the counter. + warp.copy(counter_arr, inclusive_out, dest_offset=0, src_offset=dim_size - 1, count=1) + break + + # Phase 1: execution pass with deterministic slots. + det_params_p1 = build_det_params( + phase=1, scatter_bufs=scatter_bufs, counter_bufs=counter_bufs, use_scatter=True + ) + params_p1 = [*user_params, *det_params_p1] + do_cuda_launch(hooks.forward, params_p1) + + else: + # === Single-pass (scatter only) === + det_params = build_det_params(phase=1, scatter_bufs=scatter_bufs, counter_bufs=counter_bufs, use_scatter=True) + params_all = [*user_params, *det_params] + do_cuda_launch(hooks.forward, params_all) + + # Post-kernel: sort-reduce for Pattern A scatter targets. + if det_meta.has_scatter: + # Identify the destination arrays from fwd_args. + dest_arrays = [] + for st in det_meta.scatter_targets: + # Find the array argument whose label matches the scatter target. + dest_arr = None + for j, arg in enumerate(kernel.adj.args): + if arg.label == st.array_var_label: + dest_arr = fwd_args[j] + break + dest_arrays.append(dest_arr) + + run_sort_reduce(runtime, det_meta.scatter_targets, scatter_bufs, dest_arrays, device, determinism_mode) + + try: + runtime.verify_cuda_device(device) + except Exception as e: + print(f"Error in deterministic kernel launch: {kernel.key} on device {device}") + raise e + + def launch( kernel, dim: int | Sequence[int], @@ -7538,6 +7822,47 @@ def pack_args(args, params, adjoint=False): pack_args(fwd_args, params, adjoint=False) pack_args(adj_args, params, adjoint=True) + # Deterministic mode: redirect to multi-pass launcher for CUDA forward pass. + det_meta = getattr(kernel.adj, "det_meta", None) + if det_meta is not None and det_meta.needs_deterministic and device.is_cuda and not adjoint: + if stream is None: + stream = device.stream + if record_cmd: + return DeterministicLaunch( + kernel=kernel, + device=device, + det_meta=det_meta, + fwd_args=fwd_args, + hooks=hooks, + params=params, + bounds=bounds, + max_blocks=max_blocks, + block_dim=block_dim, + ) + _launch_deterministic( + kernel, + hooks, + params, + bounds, + device, + stream, + max_blocks, + block_dim, + det_meta, + fwd_args, + module_exec=module_exec, + ) + # Record on tape if one is active (same logic as below). + if runtime.tape and record_tape: + frame = inspect.currentframe().f_back + caller = {"file": frame.f_code.co_filename, "lineno": frame.f_lineno, "func": frame.f_code.co_name} + runtime.tape.record_launch( + kernel, dim, max_blocks, inputs, outputs, device, block_dim, metadata={"caller": caller} + ) + if warp.config.verify_autograd_array_access: + runtime.tape._check_kernel_array_access(kernel, fwd_args) + return + # run kernel if device.is_cpu: if adjoint: @@ -8365,6 +8690,8 @@ def set_module_options(options: dict[str, Any], module: Any = None): * **mode**: The compilation mode to use, can be ``"debug"`` or ``"release"``, defaults to the value of ``warp.config.mode``. * **optimization_level**: Compiler optimization level (0-3). When ``None``, falls back to ``warp.config.optimization_level``; if that is also ``None``, uses target-specific defaults (``-O2`` for CPU, ``-O3`` for CUDA). * **cpu_compiler_flags**: CPU compiler flags (see ``warp.config.cpu_compiler_flags``), defaults to the global config value when ``None``. + * **deterministic**: Determinism guarantee for supported atomic operations. Accepted values are ``"not_guaranteed"``, ``"run_to_run"``, ``"gpu_to_gpu"``, plus ``True``/``False`` for backward compatibility. If ``None`` (the default), defers to ``warp.config.deterministic`` at compile time. + * **deterministic_max_records**: Per-target, per-thread upper bound for deterministic scatter records. Defaults to ``0``, which means use the code-generated lower bound only. This is useful when dynamic loops or repeated visits to the same atomic site can emit more records than static analysis can prove. * **block_dim**: The default number of threads to assign to each block, defaults to ``256``. * **compile_time_trace**: Enable compile-time tracing, defaults to the value of ``warp.config.compile_time_trace``. * **strip_hash**: Omit the content hash from compiled kernel file names, defaults to ``False``. @@ -8373,6 +8700,13 @@ def set_module_options(options: dict[str, Any], module: Any = None): options: Set of key-value option pairs """ + if "deterministic" in options: + from warp._src.deterministic import normalize_determinism_mode # noqa: PLC0415 + + options = dict(options) + options["deterministic"] = normalize_determinism_mode( + options["deterministic"], option_name="deterministic", allow_none=True + ) if module is None: module_name = _get_caller_module_name(stack_level=2) diff --git a/warp/_src/deterministic.py b/warp/_src/deterministic.py new file mode 100644 index 0000000000..9af9508be0 --- /dev/null +++ b/warp/_src/deterministic.py @@ -0,0 +1,327 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Deterministic execution mode for Warp. + +This module implements the scatter-sort-reduce and two-pass strategies for +making atomic operations produce bit-exact reproducible results. + +Two patterns of atomic usage are supported: + +Pattern A — Accumulation (return value unused): + ``wp.atomic_add(arr, idx, value)`` or ``arr[idx] += value`` + Strategy: Scatter records to a temporary buffer during the kernel, then + sort by (dest_index, thread_id) and reduce in fixed order post-kernel. + +Pattern B — Counter/Allocator (return value used): + ``slot = wp.atomic_add(counter, 0, 1)`` + Strategy: Two-pass execution. Phase 0 records each thread's contribution + with all side effects suppressed. Prefix sum computes deterministic + offsets. Phase 1 re-executes with deterministic slot assignments. + +See ``warp.config.deterministic`` for the user-facing configuration modes. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +import warp +from warp._src import utils as warp_utils + +# Reduction operation constants (must match C++ ReduceOp enum in deterministic.cu). +REDUCE_OP_ADD = 0 +REDUCE_OP_MIN = 1 +REDUCE_OP_MAX = 2 + +DETERMINISM_NOT_GUARANTEED = "not_guaranteed" +DETERMINISM_RUN_TO_RUN = "run_to_run" +DETERMINISM_GPU_TO_GPU = "gpu_to_gpu" + +_VALID_DETERMINISM_MODES = { + DETERMINISM_NOT_GUARANTEED, + DETERMINISM_RUN_TO_RUN, + DETERMINISM_GPU_TO_GPU, +} + +_DETERMINISM_MODE_IDS = { + DETERMINISM_NOT_GUARANTEED: 0, + DETERMINISM_RUN_TO_RUN: 1, + DETERMINISM_GPU_TO_GPU: 2, +} + +# Map from Warp builtin names to (is_accumulation, reduce_op). +# Atomics whose return value is consumed are always Pattern B (counter); +# this table is used only for Pattern A classification. +_ATOMIC_OP_INFO = { + "atomic_add": (True, REDUCE_OP_ADD), + "atomic_sub": (True, REDUCE_OP_ADD), # codegen negates value before scattering + "atomic_min": (True, REDUCE_OP_MIN), + "atomic_max": (True, REDUCE_OP_MAX), +} + +# Atomics that are associative+commutative on integers (no transform needed). +_ALREADY_DETERMINISTIC_OPS = {"atomic_and", "atomic_or", "atomic_xor"} + +# Atomics that are inherently order-dependent (cannot be made deterministic). +_ORDER_DEPENDENT_OPS = {"atomic_cas", "atomic_exch"} + + +def normalize_determinism_mode(value, option_name="deterministic", allow_none=False): + """Normalize user-facing deterministic mode values. + + The public API accepts the explicit mode strings plus ``True``/``False`` + for backward compatibility: + + - ``False`` -> ``"not_guaranteed"`` + - ``True`` -> ``"run_to_run"`` + + Args: + value: User-provided config or option value. + option_name: Option name to use in error messages. + allow_none: Whether ``None`` is permitted. + + Returns: + The normalized deterministic mode string, or ``None`` when + ``allow_none`` is ``True`` and the input is ``None``. + """ + if value is None: + if allow_none: + return None + return DETERMINISM_NOT_GUARANTEED + + if isinstance(value, bool): + return DETERMINISM_RUN_TO_RUN if value else DETERMINISM_NOT_GUARANTEED + + if isinstance(value, str): + if value in _VALID_DETERMINISM_MODES: + return value + valid_modes = ", ".join(repr(mode) for mode in sorted(_VALID_DETERMINISM_MODES)) + raise ValueError(f"{option_name} must be one of {valid_modes}, got {value!r}") + + raise TypeError(f"{option_name} must be a bool or string, got {type(value).__name__}") + + +def is_deterministic_mode_enabled(value) -> bool: + """Return ``True`` if a deterministic mode stronger than default is enabled.""" + return normalize_determinism_mode(value) != DETERMINISM_NOT_GUARANTEED + + +def determinism_mode_to_id(value) -> int: + """Map a normalized deterministic mode to the native enum id.""" + return _DETERMINISM_MODE_IDS[normalize_determinism_mode(value)] + + +@dataclass +class ScatterTarget: + """Tracks a Pattern A (accumulation) atomic target array during codegen.""" + + array_var_label: str # label of the target array Var + value_dtype: type # Warp dtype of the accumulated value (e.g., wp.float32, wp.vec3) + value_ctype: str # C type of the value (e.g., "float", "wp::vec_t<3, float>") + scalar_dtype: type # scalar component dtype (e.g., wp.float32) + reduce_op: int # REDUCE_OP_ADD, REDUCE_OP_MIN, or REDUCE_OP_MAX + index: int = 0 # scatter buffer index (assigned during codegen) + records_per_thread: int = 1 # static estimate of emitted records per thread + + +@dataclass +class CounterTarget: + """Tracks a Pattern B (counter/allocator) atomic target array during codegen.""" + + array_var_label: str # label of the target array Var + value_ctype: str # C type of the counter value (e.g., "int") + index: int = 0 # counter buffer index (assigned during codegen) + + +@dataclass +class DeterministicMeta: + """Metadata attached to a kernel's Adjoint after codegen in deterministic mode. + + Used by the launch system to allocate scatter/counter buffers and + orchestrate the multi-pass execution. + """ + + scatter_targets: list[ScatterTarget] = field(default_factory=list) + counter_targets: list[CounterTarget] = field(default_factory=list) + + @property + def has_scatter(self): + return len(self.scatter_targets) > 0 + + @property + def has_counter(self): + return len(self.counter_targets) > 0 + + @property + def needs_deterministic(self): + return self.has_scatter or self.has_counter + + +def get_or_create_scatter_target(meta, array_var_label, value_dtype, value_ctype, scalar_dtype, reduce_op): + """Get existing scatter target for an array, or create a new one. + + Multiple atomic call sites targeting the same array and reduction op share + one scatter buffer. + """ + for target in meta.scatter_targets: + if ( + target.array_var_label == array_var_label + and target.value_dtype == value_dtype + and target.value_ctype == value_ctype + and target.scalar_dtype == scalar_dtype + and target.reduce_op == reduce_op + ): + target.records_per_thread += 1 + return target + target = ScatterTarget( + array_var_label=array_var_label, + value_dtype=value_dtype, + value_ctype=value_ctype, + scalar_dtype=scalar_dtype, + reduce_op=reduce_op, + index=len(meta.scatter_targets), + ) + meta.scatter_targets.append(target) + return target + + +def get_or_create_counter_target(meta, array_var_label, value_ctype): + """Get existing counter target for an array, or create a new one.""" + for target in meta.counter_targets: + if target.array_var_label == array_var_label: + return target + target = CounterTarget( + array_var_label=array_var_label, + value_ctype=value_ctype, + index=len(meta.counter_targets), + ) + meta.counter_targets.append(target) + return target + + +# --------------------------------------------------------------------------- +# Warp type → C++ type string mapping for scatter buffer value types +# --------------------------------------------------------------------------- + +_WARP_TO_CTYPE = { + warp.float16: "wp::half", + warp.float32: "float", + warp.float64: "double", + warp.int32: "int", + warp.uint32: "unsigned int", + warp.int64: "int64_t", + warp.uint64: "uint64_t", +} + +_SCALAR_TYPE_IDS = { + warp.float16: 0, + warp.float32: 1, + warp.float64: 2, + warp.int32: 3, + warp.uint32: 4, + warp.int64: 5, + warp.uint64: 6, +} + + +def warp_type_to_ctype(dtype) -> str: + """Map a Warp scalar type to its C++ type string.""" + ctype = _WARP_TO_CTYPE.get(dtype) + if ctype is None: + raise ValueError(f"Unsupported scalar type for deterministic atomic: {dtype}") + return ctype + + +def is_float_type(dtype) -> bool: + """Return True if dtype is a Warp floating-point type.""" + return dtype in (warp.float16, warp.float32, warp.float64) + + +def warp_scalar_type_to_id(dtype) -> int: + """Map a Warp scalar type to the native deterministic reducer enum.""" + type_id = _SCALAR_TYPE_IDS.get(dtype) + if type_id is None: + raise ValueError(f"Unsupported scalar type for deterministic atomic: {dtype}") + return type_id + + +# --------------------------------------------------------------------------- +# Launch-time helpers +# --------------------------------------------------------------------------- + + +def allocate_scatter_buffers(scatter_targets, dim_size, device, max_records=0): + """Allocate scatter buffers for Pattern A targets. + + Args: + scatter_targets: Deterministic scatter target metadata collected during + code generation. + dim_size: Launch dimension size. This corresponds to the number of + threads that may emit scatter records. + device: Target device for the temporary buffers. + max_records: Optional per-target, per-thread override for the maximum + number of scatter records a thread may emit. The final buffer size + uses ``max(codegen_lower_bound, max_records)`` records per thread. + + Returns: + A list of ``(keys, values, counter, overflow, capacity)`` tuples, one + per scatter target. + """ + buffers = [] + for target in scatter_targets: + records_per_thread = max(target.records_per_thread, max_records) + capacity = max(dim_size * records_per_thread, 1024) + keys = warp.full(shape=(capacity,), value=-1, dtype=warp.int64, device=device) + values = warp.zeros(shape=(capacity,), dtype=target.value_dtype, device=device) + counter = warp.zeros(shape=(1,), dtype=warp.int32, device=device) + overflow = warp.zeros(shape=(1,), dtype=warp.int32, device=device) + buffers.append((keys, values, counter, overflow, capacity)) + return buffers + + +def allocate_counter_buffers(counter_targets, dim_size, device): + """Allocate counter buffers for Pattern B targets. + + Returns a list of (contrib, prefix) tuples. + """ + buffers = [] + for _target in counter_targets: + contrib = warp.zeros(shape=(dim_size,), dtype=warp.int32, device=device) + prefix = warp.empty(shape=(dim_size,), dtype=warp.int32, device=device) + buffers.append((contrib, prefix)) + return buffers + + +def run_sort_reduce(runtime, scatter_targets, scatter_buffers, dest_arrays, device, determinism_mode): + """Execute post-kernel sort-reduce for all Pattern A scatter targets. + + Args: + runtime: The Warp Runtime object with native function bindings. + scatter_targets: List of ScatterTarget metadata. + scatter_buffers: List of (keys, values, counter, capacity) tuples. + dest_arrays: List of destination warp.array objects (parallel to scatter_targets). + device: The target device. + determinism_mode: One of the user-facing deterministic mode strings. + """ + for i, target in enumerate(scatter_targets): + keys, values, _counter, _overflow, capacity = scatter_buffers[i] + dest_arr = dest_arrays[i] + + try: + scalar_type_id = warp_scalar_type_to_id(target.scalar_dtype) + except ValueError: + warp_utils.warn(f"Unsupported value type '{target.value_ctype}' for deterministic sort-reduce.") + continue + + runtime.core.wp_deterministic_sort_reduce_device( + keys.ptr, + values.ptr, + capacity, + dest_arr.ptr, + dest_arr.size, + target.reduce_op, + scalar_type_id, + getattr(target.value_dtype, "_length_", 1), + determinism_mode_to_id(determinism_mode), + ) diff --git a/warp/config.py b/warp/config.py index b79ef92aad..e1d975bbaa 100644 --- a/warp/config.py +++ b/warp/config.py @@ -257,6 +257,52 @@ If ``None``, Warp determines the behavior (currently equal to ``min(os.cpu_count(), 4)``). """ +deterministic: str = "not_guaranteed" +"""Determinism guarantee for supported atomic operations. + +Accepted values are: + +- ``"not_guaranteed"``: Default behavior. Atomic ordering is not constrained. +- ``"run_to_run"``: Produce bit-exact reproducible results across repeated runs + on the same GPU architecture. +- ``"gpu_to_gpu"``: Use a stronger reduction path intended to preserve the same + result across GPU architectures as well. + +For backward compatibility, module and kernel options may still pass ``True`` +or ``False``; they are interpreted as ``"run_to_run"`` and +``"not_guaranteed"``, respectively. + +When this setting is stronger than ``"not_guaranteed"``, floating-point atomic +operations (``atomic_add``, ``atomic_sub``, ``atomic_min``, ``atomic_max``) and +counter-pattern atomics (where the return value is used for slot allocation) +produce bit-exact reproducible results according to the selected guarantee. + +Accumulation atomics are deferred to a post-kernel sort-reduce step that processes +values in a fixed order. Counter atomics use a two-pass execution scheme (counting +pass + prefix sum + execution pass) to assign deterministic slot indices. + +Note: Enabling this flag impacts performance. Accumulation atomics incur ~2-5x +overhead from sorting. Counter atomics incur ~2-3x overhead from the extra kernel +pass. + +Known limitation: In the two-pass counter mode, the counting pass (Phase 0) +suppresses all side effects (array writes, non-counter atomics, ``printf``). +Kernels where counter contributions depend on earlier scratch array writes within +the same kernel may produce incorrect results. Use local variables or read directly +from input arrays for control flow that determines counter contributions. + +This setting can be overridden at the module level by setting the +``"deterministic"`` module option. +""" + +deterministic_debug: bool = False +"""Enable debug diagnostics for deterministic execution mode. + +When enabled, deterministic scatter overflows may emit device-side diagnostics. +This setting is intended for debugging capacity issues and should remain disabled +for normal execution, especially when CUDA graph capture is performance-critical. +""" + _git_commit_hash: str | None = None """Git commit hash associated with the Warp installation. diff --git a/warp/native/deterministic.cpp b/warp/native/deterministic.cpp new file mode 100644 index 0000000000..2fc469d69e --- /dev/null +++ b/warp/native/deterministic.cpp @@ -0,0 +1,26 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// CPU stubs for deterministic mode. +// CPU kernels execute sequentially so atomics are already deterministic. +// These stubs satisfy the linker when building without CUDA. + +#include "warp.h" + +#if !WP_ENABLE_CUDA + +void wp_deterministic_sort_reduce_device( + uint64_t keys, + uint64_t values, + int count, + uint64_t dest_array, + int dest_size, + int op, + int scalar_type, + int components, + int determinism_level +) +{ +} + +#endif // !WP_ENABLE_CUDA diff --git a/warp/native/deterministic.cu b/warp/native/deterministic.cu new file mode 100644 index 0000000000..6bf9ed3ff2 --- /dev/null +++ b/warp/native/deterministic.cu @@ -0,0 +1,367 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Deterministic mode post-kernel sort-reduce. +// +// After a kernel scatters (key, value) records into a temporary buffer, this +// module sorts them by key (CUB radix sort) and then applies a deterministic +// segmented reduction: records with the same destination index are accumulated +// left-to-right in thread-id order, and the result is applied to the target +// array. + +#include "warp.h" + +#include "cuda_util.h" +#include "temp_buffer.h" + +#define THRUST_IGNORE_CUB_VERSION_CHECK + +#include + +namespace { + +// Extract the destination index (upper 32 bits) from a sort key. +__host__ __device__ __forceinline__ int dest_index_from_key(int64_t key) +{ + return static_cast(static_cast(key) >> 32); +} + +// Reduction op identifiers (must match the Python-side constants). +enum ReduceOp { + REDUCE_ADD = 0, + REDUCE_MIN = 1, + REDUCE_MAX = 2, +}; + +enum ScalarType { + SCALAR_HALF = 0, + SCALAR_FLOAT = 1, + SCALAR_DOUBLE = 2, + SCALAR_INT = 3, + SCALAR_UINT = 4, + SCALAR_INT64 = 5, + SCALAR_UINT64 = 6, +}; + +enum DeterminismLevel { + DETERMINISM_NOT_GUARANTEED = 0, + DETERMINISM_RUN_TO_RUN = 1, + DETERMINISM_GPU_TO_GPU = 2, +}; + +__global__ void init_record_indices_kernel(int* indices, int count) +{ + int tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid < count) { + indices[tid] = tid; + } +} + +template struct ReduceByKeyOp { + int op; + + __host__ __device__ T operator()(const T& a, const T& b) const + { + switch (op) { + case REDUCE_ADD: + return a + b; + case REDUCE_MIN: + return wp::min(a, b); + case REDUCE_MAX: + return wp::max(a, b); + default: + return a; + } + } +}; + +struct DestIndexTransform { + __host__ __device__ int operator()(const int64_t& key) const { return dest_index_from_key(key); } +}; + +template +__global__ void apply_reduced_runs_kernel( + const int* __restrict__ unique_dests, + const T* __restrict__ aggregates, + const int* __restrict__ num_runs, + T* __restrict__ dest_array, + int dest_size, + int op +) +{ + int tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid >= *num_runs) + return; + + int dest = unique_dests[tid]; + if (dest < 0 || dest >= dest_size) + return; + + switch (op) { + case REDUCE_ADD: + dest_array[dest] = dest_array[dest] + aggregates[tid]; + break; + case REDUCE_MIN: + dest_array[dest] = wp::min(dest_array[dest], aggregates[tid]); + break; + case REDUCE_MAX: + dest_array[dest] = wp::max(dest_array[dest], aggregates[tid]); + break; + } +} + +// Kernel that walks the sorted scatter records and applies a deterministic +// segmented reduction. Each thread handles one contiguous run of records that +// target the same destination index. +// +// We identify segment boundaries by comparing adjacent keys' upper 32 bits. +// A prefix "segment-head" scan finds the start of each segment, and each +// segment-head thread accumulates its segment sequentially. +template +__global__ void deterministic_reduce_kernel( + const int64_t* __restrict__ sorted_keys, + const int* __restrict__ sorted_indices, + const T* __restrict__ values, + int num_records, + int components, + T* __restrict__ dest_array, + int dest_size, + int op +) +{ + int tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid >= num_records) + return; + + // A thread is a segment head if it is the first record, or its dest index + // differs from the previous record's. + int my_dest = dest_index_from_key(sorted_keys[tid]); + bool is_head = (tid == 0) || (dest_index_from_key(sorted_keys[tid - 1]) != my_dest); + + if (!is_head) + return; + + if (my_dest < 0 || my_dest >= dest_size) + return; + + int base = sorted_indices[tid] * components; + int dest_base = my_dest * components; + + // Accumulate the segment sequentially (deterministic left-to-right order). + for (int c = 0; c < components; ++c) { + T accum = values[base + c]; + for (int i = tid + 1; i < num_records; ++i) { + if (dest_index_from_key(sorted_keys[i]) != my_dest) + break; + T val = values[sorted_indices[i] * components + c]; + switch (op) { + case REDUCE_ADD: + accum = accum + val; + break; + case REDUCE_MIN: + accum = wp::min(accum, val); + break; + case REDUCE_MAX: + accum = wp::max(accum, val); + break; + } + } + + switch (op) { + case REDUCE_ADD: + dest_array[dest_base + c] = dest_array[dest_base + c] + accum; + break; + case REDUCE_MIN: + dest_array[dest_base + c] = wp::min(dest_array[dest_base + c], accum); + break; + case REDUCE_MAX: + dest_array[dest_base + c] = wp::max(dest_array[dest_base + c], accum); + break; + } + } +} + +template +void deterministic_sort_reduce_device_scalar_run_to_run( + int64_t* keys, T* values, int count, T* dest_array, int dest_size, int op +) +{ + if (count <= 0) + return; + + ContextGuard guard(wp_cuda_context_get_current()); + cudaStream_t stream = static_cast(wp_cuda_stream_get_current()); + + ScopedTemporary alt_keys(WP_CURRENT_CONTEXT, count); + ScopedTemporary alt_values(WP_CURRENT_CONTEXT, count); + + cub::DoubleBuffer d_keys(keys, alt_keys.buffer()); + cub::DoubleBuffer d_values(values, alt_values.buffer()); + + size_t sort_temp_size = 0; + check_cuda( + cub::DeviceRadixSort::SortPairs( + nullptr, sort_temp_size, d_keys, d_values, count, 0, sizeof(int64_t) * 8, stream + ) + ); + + void* sort_temp = wp_alloc_device(WP_CURRENT_CONTEXT, sort_temp_size); + check_cuda( + cub::DeviceRadixSort::SortPairs( + sort_temp, sort_temp_size, d_keys, d_values, count, 0, sizeof(int64_t) * 8, stream + ) + ); + wp_free_device(WP_CURRENT_CONTEXT, sort_temp); + + ScopedTemporary unique_dests(WP_CURRENT_CONTEXT, count); + ScopedTemporary aggregates(WP_CURRENT_CONTEXT, count); + ScopedTemporary num_runs(WP_CURRENT_CONTEXT, 1); + auto dest_keys + = cub::TransformInputIterator(d_keys.Current(), DestIndexTransform {}); + + size_t reduce_temp_size = 0; + ReduceByKeyOp reduce_op { op }; + check_cuda( + cub::DeviceReduce::ReduceByKey( + nullptr, reduce_temp_size, dest_keys, unique_dests.buffer(), d_values.Current(), aggregates.buffer(), + num_runs.buffer(), reduce_op, count, stream + ) + ); + + void* reduce_temp = wp_alloc_device(WP_CURRENT_CONTEXT, reduce_temp_size); + check_cuda( + cub::DeviceReduce::ReduceByKey( + reduce_temp, reduce_temp_size, dest_keys, unique_dests.buffer(), d_values.Current(), aggregates.buffer(), + num_runs.buffer(), reduce_op, count, stream + ) + ); + wp_free_device(WP_CURRENT_CONTEXT, reduce_temp); + + const int block_size = 256; + const int num_blocks = (count + block_size - 1) / block_size; + apply_reduced_runs_kernel<<>>( + unique_dests.buffer(), aggregates.buffer(), num_runs.buffer(), dest_array, dest_size, op + ); +} + +// Sort the scatter buffer by key using CUB radix sort, then launch the +// reduce kernel. +template +void deterministic_sort_reduce_device( + int64_t* keys, T* values, int count, T* dest_array, int dest_size, int op, int components, int determinism_level +) +{ + if (count <= 0) + return; + + if (components == 1 && determinism_level == DETERMINISM_RUN_TO_RUN) { + deterministic_sort_reduce_device_scalar_run_to_run(keys, values, count, dest_array, dest_size, op); + return; + } + + ContextGuard guard(wp_cuda_context_get_current()); + cudaStream_t stream = static_cast(wp_cuda_stream_get_current()); + + // --- Sort by key --- + // The input buffers have a fixed capacity. Unused slots are initialized + // with key == -1, which sorts to the end and is ignored by the reduce + // kernel. + // Sort keys together with record indices, then reduce from the original + // value buffer component-wise. + ScopedTemporary alt_keys(WP_CURRENT_CONTEXT, count); + ScopedTemporary record_indices(WP_CURRENT_CONTEXT, count); + ScopedTemporary alt_record_indices(WP_CURRENT_CONTEXT, count); + + const int block_size = 256; + const int num_blocks = (count + block_size - 1) / block_size; + init_record_indices_kernel<<>>(record_indices.buffer(), count); + + cub::DoubleBuffer d_keys(keys, alt_keys.buffer()); + cub::DoubleBuffer d_indices(record_indices.buffer(), alt_record_indices.buffer()); + + size_t sort_temp_size = 0; + check_cuda( + cub::DeviceRadixSort::SortPairs( + nullptr, sort_temp_size, d_keys, d_indices, count, 0, sizeof(int64_t) * 8, stream + ) + ); + + void* sort_temp = wp_alloc_device(WP_CURRENT_CONTEXT, sort_temp_size); + check_cuda( + cub::DeviceRadixSort::SortPairs( + sort_temp, sort_temp_size, d_keys, d_indices, count, 0, sizeof(int64_t) * 8, stream + ) + ); + wp_free_device(WP_CURRENT_CONTEXT, sort_temp); + + // --- Segmented reduce --- + deterministic_reduce_kernel<<>>( + d_keys.Current(), d_indices.Current(), values, count, components, dest_array, dest_size, op + ); +} + +} // anonymous namespace + + +// Public API entry points called from the Python runtime via ctypes. +// Arguments are passed as uint64_t pointers (matching the Warp convention). + +void wp_deterministic_sort_reduce_device( + uint64_t keys, + uint64_t values, + int count, + uint64_t dest_array, + int dest_size, + int op, + int scalar_type, + int components, + int determinism_level +) +{ + switch (scalar_type) { + case SCALAR_HALF: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, + reinterpret_cast(dest_array), dest_size, op, components, determinism_level + ); + break; + case SCALAR_FLOAT: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, + reinterpret_cast(dest_array), dest_size, op, components, determinism_level + ); + break; + case SCALAR_DOUBLE: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, + reinterpret_cast(dest_array), dest_size, op, components, determinism_level + ); + break; + case SCALAR_INT: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, reinterpret_cast(dest_array), + dest_size, op, components, determinism_level + ); + break; + case SCALAR_UINT: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, + reinterpret_cast(dest_array), dest_size, op, components, determinism_level + ); + break; + case SCALAR_INT64: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, + reinterpret_cast(dest_array), dest_size, op, components, determinism_level + ); + break; + case SCALAR_UINT64: + deterministic_sort_reduce_device( + reinterpret_cast(keys), reinterpret_cast(values), count, + reinterpret_cast(dest_array), dest_size, op, components, determinism_level + ); + break; + default: + break; + } +} diff --git a/warp/native/deterministic.h b/warp/native/deterministic.h new file mode 100644 index 0000000000..65ce8febf5 --- /dev/null +++ b/warp/native/deterministic.h @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +// Note: this header is included in NVRTC-compiled code, so we cannot use +// standard library headers like . The int64_t type is provided +// by the Warp builtin headers. + +// Deterministic mode scatter buffer support. +// +// When deterministic mode is enabled, floating-point atomic operations are +// redirected to scatter buffers during kernel execution. After the kernel +// completes, the runtime sorts the buffer by (dest_index, thread_id) and +// reduces values in that fixed order, guaranteeing bit-exact reproducibility. + +namespace wp { +namespace deterministic { + +// Device-side function called from generated kernel code in deterministic mode. +// Writes a (key, value) record to the scatter buffer. +// +// The sort key packs the destination flat index in the upper 32 bits and the +// thread_id (_idx from the grid-stride loop) in the lower 32 bits. After a +// 64-bit radix sort, records targeting the same destination are grouped +// together and ordered by thread ID, giving a deterministic reduction order. +template +inline CUDA_CALLABLE void scatter( + int64_t* keys, + T* values, + int* counter, + int* overflow, + int capacity, + int debug_enabled, + int dest_flat_idx, + size_t thread_id, + T value +) +{ +#ifdef __CUDA_ARCH__ + int slot = atomicAdd(counter, 1); + if (slot < capacity) { + keys[slot] = (static_cast(dest_flat_idx) << 32) + | static_cast(static_cast(thread_id & 0xFFFFFFFFu)); + values[slot] = value; + } else if (overflow != nullptr) { + int prev = atomicCAS(overflow, 0, 1); + if (debug_enabled && prev == 0) { + printf("Warp deterministic scatter overflow: capacity=%d dest=%d\n", capacity, dest_flat_idx); + } + } +#else + // CPU path: direct accumulation (CPU kernels are sequential). + (void)keys; + (void)values; + (void)counter; + (void)overflow; + (void)capacity; + (void)debug_enabled; + (void)dest_flat_idx; + (void)thread_id; + (void)value; +#endif +} + +} // namespace deterministic +} // namespace wp diff --git a/warp/native/warp.h b/warp/native/warp.h index 2b1ac0abdc..c306c4ebbb 100644 --- a/warp/native/warp.h +++ b/warp/native/warp.h @@ -324,6 +324,19 @@ wp_runlength_encode_int_host(uint64_t values, uint64_t run_values, uint64_t run_ WP_API void wp_runlength_encode_int_device(uint64_t values, uint64_t run_values, uint64_t run_lengths, uint64_t run_count, int n); +// Deterministic mode: sort scatter buffer and apply component-wise segmented reduction. +WP_API void wp_deterministic_sort_reduce_device( + uint64_t keys, + uint64_t values, + int count, + uint64_t dest_array, + int dest_size, + int op, + int scalar_type, + int components, + int determinism_level +); + WP_API void wp_bsr_matrix_from_triplets_host( int block_size, int scalar_size_in_bytes, diff --git a/warp/tests/test_deterministic.py b/warp/tests/test_deterministic.py new file mode 100644 index 0000000000..7910ee1145 --- /dev/null +++ b/warp/tests/test_deterministic.py @@ -0,0 +1,1284 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for deterministic execution mode. + +Validates that deterministic modes produce bit-exact reproducible results for +atomic operations across multiple runs. +""" + +import unittest + +import numpy as np + +import warp as wp +from warp.tests.unittest_utils import * + + +def _reference_scatter_add_float32(data_np, indices_np, out_size): + """Compute the canonical left-to-right float32 scatter reduction on CPU.""" + expected = np.zeros(out_size, dtype=np.float32) + for value, index in zip(data_np, indices_np, strict=True): + expected[index] = np.float32(expected[index] + value) + return expected + + +# --------------------------------------------------------------------------- +# Pattern A kernels: accumulation (return value unused) +# --------------------------------------------------------------------------- + + +@wp.kernel +def scatter_add_kernel( + data: wp.array(dtype=wp.float32), + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), +): + """Each thread atomically adds to output[dest_indices[tid]].""" + tid = wp.tid() + idx = dest_indices[tid] + wp.atomic_add(output, idx, data[tid]) + + +@wp.kernel +def augassign_add_kernel( + data: wp.array(dtype=wp.float32), + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), +): + """Same as scatter_add_kernel but using += syntax.""" + tid = wp.tid() + idx = dest_indices[tid] + output[idx] += data[tid] + + +@wp.kernel +def multi_array_atomic_kernel( + data: wp.array(dtype=wp.float32), + dest_indices: wp.array(dtype=wp.int32), + out_a: wp.array(dtype=wp.float32), + out_b: wp.array(dtype=wp.float32), + out_c: wp.array(dtype=wp.float32), +): + """Atomic add to three different output arrays from the same kernel.""" + tid = wp.tid() + idx = dest_indices[tid] + val = data[tid] + wp.atomic_add(out_a, idx, val) + wp.atomic_add(out_b, idx, val * 2.0) + wp.atomic_add(out_c, idx, val * 3.0) + + +@wp.kernel +def atomic_sub_kernel( + data: wp.array(dtype=wp.float32), + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), +): + """Atomic sub test.""" + tid = wp.tid() + idx = dest_indices[tid] + wp.atomic_sub(output, idx, data[tid]) + + +@wp.kernel +def atomic_add_2d_kernel( + data: wp.array(dtype=wp.float32), + row_indices: wp.array(dtype=wp.int32), + col_indices: wp.array(dtype=wp.int32), + output: wp.array2d(dtype=wp.float32), +): + """Atomic add to a 2D array.""" + tid = wp.tid() + r = row_indices[tid] + c = col_indices[tid] + wp.atomic_add(output, r, c, data[tid]) + + +@wp.kernel +def atomic_double_kernel( + data: wp.array(dtype=wp.float64), + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float64), +): + """Atomic add with float64.""" + tid = wp.tid() + idx = dest_indices[tid] + wp.atomic_add(output, idx, data[tid]) + + +@wp.kernel +def vec3_scatter_add_kernel( + data: wp.array(dtype=wp.vec3), + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.vec3), +): + """Atomic add with ``wp.vec3`` values.""" + tid = wp.tid() + wp.atomic_add(output, dest_indices[tid], data[tid]) + + +@wp.kernel +def vec3_atomic_minmax_kernel( + points: wp.array(dtype=wp.vec3), + out_min: wp.array(dtype=wp.vec3), + out_max: wp.array(dtype=wp.vec3), +): + """Component-wise deterministic min/max for bounding-box style reductions.""" + tid = wp.tid() + p = points[tid] + wp.atomic_min(out_min, 0, p) + wp.atomic_max(out_max, 0, p) + + +@wp.kernel +def mat33_scatter_add_kernel( + data: wp.array(dtype=wp.mat33), + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.mat33), +): + """Atomic add with ``wp.mat33`` values.""" + tid = wp.tid() + wp.atomic_add(output, dest_indices[tid], data[tid]) + + +@wp.kernel(deterministic="gpu_to_gpu", deterministic_max_records=4096) +def decorator_deterministic_kernel( + data: wp.array(dtype=wp.float32), + output: wp.array(dtype=wp.float32), +): + """Kernel-level GPU-to-GPU deterministic flag without module options.""" + tid = wp.tid() + wp.atomic_add(output, tid % 8, data[tid]) + + +@wp.func +def _det_closure_transform_a(x: wp.float32) -> wp.float32: + return x + wp.float32(1.0) + + +@wp.func +def _det_closure_transform_b(x: wp.float32) -> wp.float32: + return x + wp.float32(2.0) + + +def _make_deterministic_closure_kernel(transform_func): + @wp.kernel(deterministic=True, module="unique") + def _deterministic_closure_kernel( + data: wp.array(dtype=wp.float32), + output: wp.array(dtype=wp.float32), + ): + tid = wp.tid() + wp.atomic_add(output, tid % 8, transform_func(data[tid])) + + return _deterministic_closure_kernel + + +@wp.kernel +def triple_scatter_add_kernel( + data: wp.array(dtype=wp.float32), + output: wp.array(dtype=wp.float32), +): + """Emit three deterministic scatter records per thread to the same target.""" + tid = wp.tid() + val = data[tid] + wp.atomic_add(output, 0, val) + wp.atomic_add(output, 0, val * 2.0) + wp.atomic_add(output, 0, val * 3.0) + + +@wp.kernel(deterministic=True, deterministic_max_records=4) +def loop_scatter_add_kernel( + data: wp.array(dtype=wp.float32), + counts: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), +): + """Emit a data-dependent number of scatter records to the same target.""" + tid = wp.tid() + val = data[tid] + count = counts[tid] + for _ in range(count): + wp.atomic_add(output, 0, val) + + +@wp.kernel +def mixed_reduce_op_same_array_kernel( + data: wp.array(dtype=wp.float32), + output: wp.array(dtype=wp.float32), +): + """Apply different atomic reductions to the same destination array.""" + tid = wp.tid() + wp.atomic_add(output, 0, data[tid]) + wp.atomic_max(output, 0, 1.0) + + +# --------------------------------------------------------------------------- +# Pattern B kernels: counter/allocator (return value used) +# --------------------------------------------------------------------------- + + +@wp.kernel +def counter_kernel( + data: wp.array(dtype=wp.float32), + counter: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), +): + """Allocate a slot and write data to it.""" + tid = wp.tid() + slot = wp.atomic_add(counter, 0, 1) + output[slot] = data[tid] + + +@wp.kernel +def conditional_counter_kernel( + data: wp.array(dtype=wp.float32), + threshold: wp.float32, + counter: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), +): + """Stream compaction: only emit elements above threshold.""" + tid = wp.tid() + val = data[tid] + if val > threshold: + slot = wp.atomic_add(counter, 0, 1) + output[slot] = val + + +@wp.kernel +def counter_side_effect_kernel( + counter: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), + scratch: wp.array(dtype=wp.float32), +): + """Counter kernel with a normal array write that must not execute in phase 0.""" + tid = wp.tid() + slot = wp.atomic_add(counter, 0, 1) + scratch[tid] = scratch[tid] + 1.0 + output[slot] = float(tid) + + +# --------------------------------------------------------------------------- +# Mixed kernels: both patterns in one kernel +# --------------------------------------------------------------------------- + + +@wp.kernel +def mixed_pattern_kernel( + data: wp.array(dtype=wp.float32), + counter: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.float32), + accum: wp.array(dtype=wp.float32), +): + """Counter allocation + accumulation in the same kernel.""" + tid = wp.tid() + val = data[tid] + + # Pattern B: allocate slot + slot = wp.atomic_add(counter, 0, 1) + output[slot] = val + + # Pattern A: accumulate + wp.atomic_add(accum, tid % 8, val) + + +# --------------------------------------------------------------------------- +# Integer atomic kernels (should pass through unchanged when return unused) +# --------------------------------------------------------------------------- + + +@wp.kernel +def int_atomic_add_kernel( + dest_indices: wp.array(dtype=wp.int32), + output: wp.array(dtype=wp.int32), +): + """Integer atomic add (should be deterministic without transformation).""" + tid = wp.tid() + idx = dest_indices[tid] + wp.atomic_add(output, idx, 1) + + +# --------------------------------------------------------------------------- +# Test functions +# --------------------------------------------------------------------------- + + +def test_scatter_add_reproducibility(test, device): + """Verify that float atomic_add produces bit-exact identical results across runs.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 4096 + out_size = 64 + rng = np.random.default_rng(42) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(10): + output = wp.zeros(out_size, dtype=wp.float32, device=device) + wp.launch( + scatter_add_kernel, + dim=n, + inputs=[data, indices], + outputs=[output], + device=device, + ) + results.append(output.numpy().copy()) + + # All runs must produce bit-exact identical results. + for i in range(1, len(results)): + np.testing.assert_array_equal( + results[0], + results[i], + err_msg=f"Run 0 vs run {i} differ (deterministic mode should be bit-exact)", + ) + + +def test_gpu_to_gpu_mode_reproducibility(test, device): + """Verify the global ``gpu_to_gpu`` mode produces reproducible results.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 1024 + out_size = 16 + rng = np.random.default_rng(44) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + old_det = wp.config.deterministic + try: + wp.config.deterministic = "gpu_to_gpu" + results = [] + for _ in range(3): + output = wp.zeros(out_size, dtype=wp.float32, device=device) + wp.launch(scatter_add_kernel, dim=n, inputs=[data, indices], outputs=[output], device=device) + results.append(output.numpy().copy()) + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + finally: + wp.config.deterministic = old_det + + +def test_gpu_to_gpu_matches_canonical_float32_reference(test, device): + """Verify ``gpu_to_gpu`` matches the canonical float32 CPU reduction order.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + data_np = np.array( + [ + 1.0e20, + 1.0, + -1.0e20, + 3.5, + -2.25, + 2.0**-20, + 1.0e10, + -1.0e10, + 7.0, + -7.0, + 9.0, + 1.0e-7, + ], + dtype=np.float32, + ) + indices_np = np.array([0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 2, 2], dtype=np.int32) + out_size = 3 + + expected = _reference_scatter_add_float32(data_np, indices_np, out_size) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + old_det = wp.config.deterministic + try: + wp.config.deterministic = "gpu_to_gpu" + output = wp.zeros(out_size, dtype=wp.float32, device=device) + wp.launch(scatter_add_kernel, dim=data_np.shape[0], inputs=[data, indices], outputs=[output], device=device) + result = output.numpy() + finally: + wp.config.deterministic = old_det + + np.testing.assert_array_equal(result.view(np.uint32), expected.view(np.uint32)) + + +def test_augassign_add_reproducibility(test, device): + """Verify += syntax (desugars to atomic_add) is also deterministic.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 2048 + out_size = 32 + rng = np.random.default_rng(123) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(10): + output = wp.zeros(out_size, dtype=wp.float32, device=device) + wp.launch( + augassign_add_kernel, + dim=n, + inputs=[data, indices], + outputs=[output], + device=device, + ) + results.append(output.numpy().copy()) + + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_scatter_add_correctness(test, device): + """Compare deterministic GPU results against CPU sequential execution.""" + n = 2048 + out_size = 32 + rng = np.random.default_rng(99) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + # CPU sequential reference (guaranteed deterministic). + expected = np.zeros(out_size, dtype=np.float32) + for i in range(n): + expected[indices_np[i]] += data_np[i] + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + output = wp.zeros(out_size, dtype=wp.float32, device=device) + + wp.launch( + scatter_add_kernel, + dim=n, + inputs=[data, indices], + outputs=[output], + device=device, + ) + + result = output.numpy() + # Deterministic sum order may differ from Python loop order, so exact + # match is not guaranteed. Check within reasonable tolerance. + np.testing.assert_allclose(result, expected, rtol=1e-4, atol=1e-5) + + +def test_multi_array_atomic(test, device): + """Verify deterministic mode works with multiple target arrays.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 1024 + out_size = 16 + rng = np.random.default_rng(77) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results_a, results_b, results_c = [], [], [] + for _ in range(5): + out_a = wp.zeros(out_size, dtype=wp.float32, device=device) + out_b = wp.zeros(out_size, dtype=wp.float32, device=device) + out_c = wp.zeros(out_size, dtype=wp.float32, device=device) + wp.launch( + multi_array_atomic_kernel, + dim=n, + inputs=[data, indices], + outputs=[out_a, out_b, out_c], + device=device, + ) + results_a.append(out_a.numpy().copy()) + results_b.append(out_b.numpy().copy()) + results_c.append(out_c.numpy().copy()) + + for i in range(1, len(results_a)): + np.testing.assert_array_equal(results_a[0], results_a[i]) + np.testing.assert_array_equal(results_b[0], results_b[i]) + np.testing.assert_array_equal(results_c[0], results_c[i]) + + +def test_atomic_sub_deterministic(test, device): + """Verify atomic_sub is deterministic.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 2048 + out_size = 32 + rng = np.random.default_rng(55) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(5): + output = wp.zeros(out_size, dtype=wp.float32, device=device) + wp.launch( + atomic_sub_kernel, + dim=n, + inputs=[data, indices], + outputs=[output], + device=device, + ) + results.append(output.numpy().copy()) + + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_atomic_add_2d(test, device): + """Verify deterministic mode with 2D array indexing.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 1024 + rows, cols = 8, 8 + rng = np.random.default_rng(88) + + data_np = rng.random(n, dtype=np.float32) + row_np = rng.integers(0, rows, size=n, dtype=np.int32) + col_np = rng.integers(0, cols, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + row_idx = wp.array(row_np, dtype=wp.int32, device=device) + col_idx = wp.array(col_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(5): + output = wp.zeros(shape=(rows, cols), dtype=wp.float32, device=device) + wp.launch( + atomic_add_2d_kernel, + dim=n, + inputs=[data, row_idx, col_idx], + outputs=[output], + device=device, + ) + results.append(output.numpy().copy()) + + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_atomic_double_deterministic(test, device): + """Verify deterministic mode with float64 atomics.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 2048 + out_size = 32 + rng = np.random.default_rng(66) + + data_np = rng.random(n).astype(np.float64) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float64, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(5): + output = wp.zeros(out_size, dtype=wp.float64, device=device) + wp.launch( + atomic_double_kernel, + dim=n, + inputs=[data, indices], + outputs=[output], + device=device, + ) + results.append(output.numpy().copy()) + + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_vec3_atomic_add_deterministic(test, device): + """Verify deterministic mode for composite ``wp.vec3`` atomic adds.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 1024 + out_size = 16 + rng = np.random.default_rng(67) + + data_np = rng.standard_normal((n, 3), dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.vec3, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(5): + output = wp.zeros(out_size, dtype=wp.vec3, device=device) + wp.launch(vec3_scatter_add_kernel, dim=n, inputs=[data, indices], outputs=[output], device=device) + results.append(output.numpy().copy()) + + expected = np.zeros((out_size, 3), dtype=np.float32) + for i in range(n): + expected[indices_np[i]] += data_np[i] + + for result in results: + np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-5) + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_vec3_atomic_minmax_deterministic(test, device): + """Verify deterministic component-wise ``wp.vec3`` min/max reductions.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 2048 + rng = np.random.default_rng(68) + points_np = rng.standard_normal((n, 3), dtype=np.float32) + points = wp.array(points_np, dtype=wp.vec3, device=device) + + mins = [] + maxs = [] + for _ in range(5): + out_min = wp.empty(1, dtype=wp.vec3, device=device) + out_max = wp.empty(1, dtype=wp.vec3, device=device) + out_min.fill_(wp.vec3(np.inf, np.inf, np.inf)) + out_max.fill_(wp.vec3(-np.inf, -np.inf, -np.inf)) + wp.launch(vec3_atomic_minmax_kernel, dim=n, inputs=[points], outputs=[out_min, out_max], device=device) + mins.append(out_min.numpy().copy()) + maxs.append(out_max.numpy().copy()) + + expected_min = np.min(points_np, axis=0, keepdims=True) + expected_max = np.max(points_np, axis=0, keepdims=True) + + for result in mins: + np.testing.assert_allclose(result, expected_min, rtol=0.0, atol=0.0) + for result in maxs: + np.testing.assert_allclose(result, expected_max, rtol=0.0, atol=0.0) + for i in range(1, len(mins)): + np.testing.assert_array_equal(mins[0], mins[i]) + np.testing.assert_array_equal(maxs[0], maxs[i]) + + +def test_mat33_atomic_add_deterministic(test, device): + """Verify deterministic mode for composite ``wp.mat33`` atomic adds.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 512 + out_size = 8 + rng = np.random.default_rng(69) + + data_np = rng.standard_normal((n, 3, 3), dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.mat33, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + results = [] + for _ in range(5): + output = wp.zeros(out_size, dtype=wp.mat33, device=device) + wp.launch(mat33_scatter_add_kernel, dim=n, inputs=[data, indices], outputs=[output], device=device) + results.append(output.numpy().copy()) + + expected = np.zeros((out_size, 3, 3), dtype=np.float32) + for i in range(n): + expected[indices_np[i]] += data_np[i] + + for result in results: + np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-5) + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_triple_scatter_capacity_estimate(test, device): + """Verify kernels with >2 scatters per thread do not overflow the buffer.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 512 + rng = np.random.default_rng(12) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + results = [] + for _ in range(3): + output = wp.zeros(1, dtype=wp.float32, device=device) + wp.launch(triple_scatter_add_kernel, dim=n, inputs=[data], outputs=[output], device=device) + results.append(output.numpy().copy()) + + expected = np.array([6.0 * data_np.sum()], dtype=np.float32) + for result in results: + np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-5) + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_loop_scatter_max_records_override(test, device): + """Verify ``deterministic_max_records`` handles dynamic loop emission counts.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 256 + rng = np.random.default_rng(71) + data_np = rng.random(n, dtype=np.float32) + counts_np = np.full(n, 4, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + counts = wp.array(counts_np, dtype=wp.int32, device=device) + + expected = np.array([np.dot(data_np, counts_np).astype(np.float32)], dtype=np.float32) + + results = [] + for _ in range(3): + output = wp.zeros(1, dtype=wp.float32, device=device) + wp.launch(loop_scatter_add_kernel, dim=n, inputs=[data, counts], outputs=[output], device=device) + results.append(output.numpy().copy()) + + for result in results: + np.testing.assert_allclose(result, expected, rtol=1e-5, atol=1e-5) + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_mixed_reduce_ops_same_array(test, device): + """Verify add/max atomics targeting one array are reduced independently.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + data_np = np.full(4, 0.05, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + output = wp.zeros(1, dtype=wp.float32, device=device) + + wp.launch(mixed_reduce_op_same_array_kernel, dim=data_np.shape[0], inputs=[data], outputs=[output], device=device) + + np.testing.assert_allclose(output.numpy(), np.array([1.0], dtype=np.float32), rtol=0.0, atol=0.0) + + +def test_counter_reproducibility(test, device): + """Verify counter/allocator pattern produces deterministic slot assignments.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 1024 + rng = np.random.default_rng(33) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + results = [] + for _ in range(10): + counter = wp.zeros(1, dtype=wp.int32, device=device) + output = wp.zeros(n, dtype=wp.float32, device=device) + wp.launch( + counter_kernel, + dim=n, + inputs=[data, counter], + outputs=[output], + device=device, + ) + results.append(output.numpy().copy()) + + for i in range(1, len(results)): + np.testing.assert_array_equal( + results[0], + results[i], + err_msg=f"Counter run 0 vs run {i} differ", + ) + + +def test_counter_phase0_suppresses_array_writes(test, device): + """Verify non-counter array stores are skipped during the counting pass.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 128 + counter = wp.zeros(1, dtype=wp.int32, device=device) + output = wp.zeros(n, dtype=wp.float32, device=device) + scratch = wp.zeros(n, dtype=wp.float32, device=device) + + wp.launch(counter_side_effect_kernel, dim=n, inputs=[counter], outputs=[output, scratch], device=device) + + np.testing.assert_array_equal(scratch.numpy(), np.ones(n, dtype=np.float32)) + test.assertEqual(int(counter.numpy()[0]), n) + + +def test_counter_correctness(test, device): + """Verify counter pattern writes all data (no lost elements).""" + n = 512 + rng = np.random.default_rng(44) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + counter = wp.zeros(1, dtype=wp.int32, device=device) + output = wp.zeros(n, dtype=wp.float32, device=device) + wp.launch( + counter_kernel, + dim=n, + inputs=[data, counter], + outputs=[output], + device=device, + ) + + # Counter should equal n. + count = int(counter.numpy()[0]) + test.assertEqual(count, n) + + # Output should contain a permutation of data. + result = sorted(output.numpy().tolist()) + expected = sorted(data_np.tolist()) + np.testing.assert_allclose(result, expected, rtol=1e-6) + + +def test_conditional_counter(test, device): + """Verify stream compaction pattern with conditional counter.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 2048 + threshold = 0.5 + rng = np.random.default_rng(55) + data_np = rng.random(n, dtype=np.float32) + expected_count = int(np.sum(data_np > threshold)) + + data = wp.array(data_np, dtype=wp.float32, device=device) + + results = [] + counts = [] + for _ in range(5): + counter = wp.zeros(1, dtype=wp.int32, device=device) + output = wp.zeros(n, dtype=wp.float32, device=device) + wp.launch( + conditional_counter_kernel, + dim=n, + inputs=[data, threshold, counter], + outputs=[output], + device=device, + ) + counts.append(int(counter.numpy()[0])) + results.append(output.numpy()[:expected_count].copy()) + + # Count should be correct. + for c in counts: + test.assertEqual(c, expected_count) + + # Results should be identical across runs. + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + + +def test_mixed_pattern(test, device): + """Verify kernel with both counter and accumulation atomics.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 512 + rng = np.random.default_rng(77) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + results_out, results_accum = [], [] + for _ in range(5): + counter = wp.zeros(1, dtype=wp.int32, device=device) + output = wp.zeros(n, dtype=wp.float32, device=device) + accum = wp.zeros(8, dtype=wp.float32, device=device) + wp.launch( + mixed_pattern_kernel, + dim=n, + inputs=[data, counter], + outputs=[output, accum], + device=device, + ) + results_out.append(output.numpy().copy()) + results_accum.append(accum.numpy().copy()) + + for i in range(1, len(results_out)): + np.testing.assert_array_equal(results_out[0], results_out[i]) + np.testing.assert_array_equal(results_accum[0], results_accum[i]) + + +def test_int_atomic_passthrough(test, device): + """Verify integer atomics (return unused) work without overhead.""" + n = 1024 + out_size = 16 + rng = np.random.default_rng(11) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + + output = wp.zeros(out_size, dtype=wp.int32, device=device) + wp.launch( + int_atomic_add_kernel, + dim=n, + inputs=[indices], + outputs=[output], + device=device, + ) + + result = output.numpy() + # Verify correctness: count of each index. + expected = np.bincount(indices_np, minlength=out_size).astype(np.int32) + np.testing.assert_array_equal(result, expected) + + +def test_module_option_override(test, device): + """Verify per-module deterministic option works.""" + + # Create a kernel with a per-module deterministic override. + @wp.kernel(module_options={"deterministic": "gpu_to_gpu"}, module="unique") + def per_kernel_det( + data: wp.array(dtype=wp.float32), + output: wp.array(dtype=wp.float32), + ): + tid = wp.tid() + wp.atomic_add(output, tid % 4, data[tid]) + + n = 256 + rng = np.random.default_rng(22) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + # Ensure global config is disabled but per-kernel override still works. + old_det = wp.config.deterministic + try: + wp.config.deterministic = "not_guaranteed" + output = wp.zeros(4, dtype=wp.float32, device=device) + wp.launch(per_kernel_det, dim=n, inputs=[data], outputs=[output], device=device) + result = output.numpy() + # Basic sanity: sum should be approximately correct. + for bin_idx in range(4): + mask = np.arange(n) % 4 == bin_idx + expected_sum = data_np[mask].sum() + np.testing.assert_allclose(result[bin_idx], expected_sum, rtol=1e-4) + finally: + wp.config.deterministic = old_det + + +def test_kernel_decorator_override(test, device): + """Verify ``@wp.kernel(deterministic="gpu_to_gpu")`` works with global config off.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 512 + rng = np.random.default_rng(28) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + old_det = wp.config.deterministic + try: + wp.config.deterministic = "not_guaranteed" + results = [] + for _ in range(3): + output = wp.zeros(8, dtype=wp.float32, device=device) + wp.launch(decorator_deterministic_kernel, dim=n, inputs=[data], outputs=[output], device=device) + results.append(output.numpy().copy()) + for i in range(1, len(results)): + np.testing.assert_array_equal(results[0], results[i]) + finally: + wp.config.deterministic = old_det + + +def test_deterministic_closure_kernel(test, device): + """Verify deterministic closure kernels remain reproducible and distinct.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + kernel_a = _make_deterministic_closure_kernel(_det_closure_transform_a) + kernel_b = _make_deterministic_closure_kernel(_det_closure_transform_b) + + test.assertIsNot(kernel_a, kernel_b) + test.assertNotEqual(kernel_a.module.name, kernel_b.module.name) + + n = 512 + rng = np.random.default_rng(30) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + + results_a = [] + results_b = [] + for _ in range(3): + out_a = wp.zeros(8, dtype=wp.float32, device=device) + out_b = wp.zeros(8, dtype=wp.float32, device=device) + wp.launch(kernel_a, dim=n, inputs=[data], outputs=[out_a], device=device) + wp.launch(kernel_b, dim=n, inputs=[data], outputs=[out_b], device=device) + results_a.append(out_a.numpy().copy()) + results_b.append(out_b.numpy().copy()) + + for i in range(1, len(results_a)): + np.testing.assert_array_equal(results_a[0], results_a[i]) + np.testing.assert_array_equal(results_b[0], results_b[i]) + + test.assertFalse(np.array_equal(results_a[0], results_b[0])) + + +def test_record_cmd_deterministic_launch(test, device): + """Verify ``record_cmd=True`` works for deterministic CUDA launches.""" + if device.is_cpu: + test.skipTest("CPU execution is already deterministic") + + n = 128 + out_size = 8 + rng = np.random.default_rng(19) + + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, out_size, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + output = wp.zeros(out_size, dtype=wp.float32, device=device) + + cmd = wp.launch( + scatter_add_kernel, + dim=n, + inputs=[data, indices], + outputs=[output], + device=device, + record_cmd=True, + ) + + np.testing.assert_array_equal(output.numpy(), np.zeros(out_size, dtype=np.float32)) + + cmd.launch() + expected = output.numpy().copy() + + output_2 = wp.zeros(out_size, dtype=wp.float32, device=device) + cmd.set_param_by_name("output", output_2) + cmd.launch() + + np.testing.assert_array_equal(output_2.numpy(), expected) + + +def test_graph_capture_deterministic_launch(test, device): + """Verify deterministic scatter launches can be captured and replayed.""" + if device.is_cpu: + test.skipTest("Graph capture requires CUDA") + + n = 256 + rng = np.random.default_rng(29) + data_np = rng.random(n, dtype=np.float32) + indices_np = rng.integers(0, 8, size=n, dtype=np.int32) + + data = wp.array(data_np, dtype=wp.float32, device=device) + indices = wp.array(indices_np, dtype=wp.int32, device=device) + output = wp.zeros(8, dtype=wp.float32, device=device) + + wp.launch(scatter_add_kernel, dim=n, inputs=[data, indices], outputs=[output], device=device) + output.zero_() + + with wp.ScopedCapture(device, force_module_load=False) as capture: + wp.launch(scatter_add_kernel, dim=n, inputs=[data, indices], outputs=[output], device=device) + + wp.capture_launch(capture.graph) + first = output.numpy().copy() + + output.zero_() + wp.capture_launch(capture.graph) + second = output.numpy().copy() + + np.testing.assert_array_equal(first, second) + + +def test_graph_capture_deterministic_closure_kernel(test, device): + """Verify deterministic closure kernels can be captured and replayed.""" + if device.is_cpu: + test.skipTest("Graph capture requires CUDA") + + kernel = _make_deterministic_closure_kernel(_det_closure_transform_a) + + n = 256 + rng = np.random.default_rng(31) + data_np = rng.random(n, dtype=np.float32) + data = wp.array(data_np, dtype=wp.float32, device=device) + output = wp.zeros(8, dtype=wp.float32, device=device) + + wp.launch(kernel, dim=n, inputs=[data], outputs=[output], device=device) + output.zero_() + + with wp.ScopedCapture(device, force_module_load=False) as capture: + wp.launch(kernel, dim=n, inputs=[data], outputs=[output], device=device) + + wp.capture_launch(capture.graph) + first = output.numpy().copy() + + output.zero_() + wp.capture_launch(capture.graph) + second = output.numpy().copy() + + np.testing.assert_array_equal(first, second) + + +def test_graph_capture_vec3_atomic_minmax(test, device): + """Verify composite deterministic reductions remain capture-safe.""" + if device.is_cpu: + test.skipTest("Graph capture requires CUDA") + + n = 512 + rng = np.random.default_rng(70) + points_np = rng.standard_normal((n, 3), dtype=np.float32) + points = wp.array(points_np, dtype=wp.vec3, device=device) + + out_min = wp.empty(1, dtype=wp.vec3, device=device) + out_max = wp.empty(1, dtype=wp.vec3, device=device) + min_init = wp.vec3(np.inf, np.inf, np.inf) + max_init = wp.vec3(-np.inf, -np.inf, -np.inf) + + out_min.fill_(min_init) + out_max.fill_(max_init) + wp.launch(vec3_atomic_minmax_kernel, dim=n, inputs=[points], outputs=[out_min, out_max], device=device) + out_min.fill_(min_init) + out_max.fill_(max_init) + + with wp.ScopedCapture(device, force_module_load=False) as capture: + wp.launch(vec3_atomic_minmax_kernel, dim=n, inputs=[points], outputs=[out_min, out_max], device=device) + + wp.capture_launch(capture.graph) + first_min = out_min.numpy().copy() + first_max = out_max.numpy().copy() + + out_min.fill_(min_init) + out_max.fill_(max_init) + wp.capture_launch(capture.graph) + second_min = out_min.numpy().copy() + second_max = out_max.numpy().copy() + + np.testing.assert_array_equal(first_min, second_min) + np.testing.assert_array_equal(first_max, second_max) + np.testing.assert_allclose(first_min, np.min(points_np, axis=0, keepdims=True), rtol=0.0, atol=0.0) + np.testing.assert_allclose(first_max, np.max(points_np, axis=0, keepdims=True), rtol=0.0, atol=0.0) + + +# --------------------------------------------------------------------------- +# Test class registration +# --------------------------------------------------------------------------- + +cuda_devices = get_selected_cuda_test_devices() +all_devices = get_test_devices() + + +class TestDeterministic(unittest.TestCase): + """Test suite for deterministic execution mode.""" + + @classmethod + def setUpClass(cls): + cls._old_deterministic = wp.config.deterministic + wp.config.deterministic = "run_to_run" + + @classmethod + def tearDownClass(cls): + wp.config.deterministic = cls._old_deterministic + + +# Pattern A tests (accumulation). +add_function_test( + TestDeterministic, "test_scatter_add_reproducibility", test_scatter_add_reproducibility, devices=cuda_devices +) +add_function_test( + TestDeterministic, + "test_gpu_to_gpu_mode_reproducibility", + test_gpu_to_gpu_mode_reproducibility, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, + "test_gpu_to_gpu_matches_canonical_float32_reference", + test_gpu_to_gpu_matches_canonical_float32_reference, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, "test_augassign_add_reproducibility", test_augassign_add_reproducibility, devices=cuda_devices +) +add_function_test(TestDeterministic, "test_scatter_add_correctness", test_scatter_add_correctness, devices=all_devices) +add_function_test(TestDeterministic, "test_multi_array_atomic", test_multi_array_atomic, devices=cuda_devices) +add_function_test( + TestDeterministic, "test_atomic_sub_deterministic", test_atomic_sub_deterministic, devices=cuda_devices +) +add_function_test(TestDeterministic, "test_atomic_add_2d", test_atomic_add_2d, devices=cuda_devices) +add_function_test( + TestDeterministic, "test_atomic_double_deterministic", test_atomic_double_deterministic, devices=cuda_devices +) +add_function_test( + TestDeterministic, "test_vec3_atomic_add_deterministic", test_vec3_atomic_add_deterministic, devices=cuda_devices +) +add_function_test( + TestDeterministic, + "test_vec3_atomic_minmax_deterministic", + test_vec3_atomic_minmax_deterministic, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, "test_mat33_atomic_add_deterministic", test_mat33_atomic_add_deterministic, devices=cuda_devices +) +add_function_test( + TestDeterministic, + "test_triple_scatter_capacity_estimate", + test_triple_scatter_capacity_estimate, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, + "test_loop_scatter_max_records_override", + test_loop_scatter_max_records_override, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, "test_mixed_reduce_ops_same_array", test_mixed_reduce_ops_same_array, devices=cuda_devices +) + +# Pattern B tests (counter). +add_function_test(TestDeterministic, "test_counter_reproducibility", test_counter_reproducibility, devices=cuda_devices) +add_function_test( + TestDeterministic, + "test_counter_phase0_suppresses_array_writes", + test_counter_phase0_suppresses_array_writes, + devices=cuda_devices, +) +add_function_test(TestDeterministic, "test_counter_correctness", test_counter_correctness, devices=all_devices) +add_function_test(TestDeterministic, "test_conditional_counter", test_conditional_counter, devices=cuda_devices) + +# Mixed pattern tests. +add_function_test(TestDeterministic, "test_mixed_pattern", test_mixed_pattern, devices=cuda_devices) + +# Passthrough / override tests. +add_function_test(TestDeterministic, "test_int_atomic_passthrough", test_int_atomic_passthrough, devices=all_devices) +add_function_test(TestDeterministic, "test_module_option_override", test_module_option_override, devices=all_devices) +add_function_test( + TestDeterministic, "test_kernel_decorator_override", test_kernel_decorator_override, devices=cuda_devices +) +add_function_test( + TestDeterministic, "test_deterministic_closure_kernel", test_deterministic_closure_kernel, devices=cuda_devices +) +add_function_test( + TestDeterministic, + "test_record_cmd_deterministic_launch", + test_record_cmd_deterministic_launch, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, + "test_graph_capture_deterministic_launch", + test_graph_capture_deterministic_launch, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, + "test_graph_capture_deterministic_closure_kernel", + test_graph_capture_deterministic_closure_kernel, + devices=cuda_devices, +) +add_function_test( + TestDeterministic, + "test_graph_capture_vec3_atomic_minmax", + test_graph_capture_vec3_atomic_minmax, + devices=cuda_devices, +) + + +if __name__ == "__main__": + wp.clear_kernel_cache() + unittest.main(verbosity=2) diff --git a/warp/tests/test_unique_module.py b/warp/tests/test_unique_module.py index 0c5a92d244..185201e95b 100644 --- a/warp/tests/test_unique_module.py +++ b/warp/tests/test_unique_module.py @@ -18,6 +18,7 @@ import numpy as np import warp as wp +from warp._src.context import ModuleHasher from warp.tests.unittest_utils import * @@ -166,6 +167,63 @@ def _kernel_no_fast_math(a: wp.array(dtype=float), b: wp.array(dtype=float)): wp.launch(_kernel_fast_math, dim=3, inputs=[a, b], device="cpu") np.testing.assert_allclose(b.numpy(), [2.0, 3.0, 4.0]) + def test_kernel_options_affect_unique_module_identity(self): + """Kernel decorator options must contribute to unique module hashing.""" + if not wp.is_cuda_available(): + self.skipTest("CUDA required for deterministic kernel launch test") + + @wp.kernel(module="unique") + def _scatter_normal( + values: wp.array(dtype=wp.float32), indices: wp.array(dtype=wp.int32), out: wp.array(dtype=float) + ): + tid = wp.tid() + wp.atomic_add(out, indices[tid], values[tid]) + + @wp.kernel(deterministic=True, deterministic_max_records=1, module="unique") + def _scatter_deterministic( + values: wp.array(dtype=wp.float32), indices: wp.array(dtype=wp.int32), out: wp.array(dtype=float) + ): + tid = wp.tid() + wp.atomic_add(out, indices[tid], values[tid]) + + self.assertNotEqual( + _scatter_normal.module.name, + _scatter_deterministic.module.name, + "Different kernel options must produce different unique module names", + ) + + values = wp.array([1.0, 2.0, 3.0, 4.0], dtype=wp.float32, device="cuda:0") + indices = wp.array([0, 0, 0, 0], dtype=wp.int32, device="cuda:0") + + out_normal = wp.zeros(1, dtype=float, device="cuda:0") + out_deterministic = wp.zeros(1, dtype=float, device="cuda:0") + + wp.launch(_scatter_normal, dim=4, inputs=[values, indices], outputs=[out_normal], device="cuda:0") + wp.launch(_scatter_deterministic, dim=4, inputs=[values, indices], outputs=[out_deterministic], device="cuda:0") + + np.testing.assert_allclose(out_normal.numpy(), [10.0]) + np.testing.assert_allclose(out_deterministic.numpy(), [10.0]) + + def test_deterministic_hashing_populates_launch_metadata(self): + """Hashing deterministic kernels must populate metadata used on cache hits.""" + + @wp.kernel(deterministic=True, deterministic_max_records=1, module="unique") + def _scatter_deterministic( + values: wp.array(dtype=wp.float32), indices: wp.array(dtype=wp.int32), out: wp.array(dtype=float) + ): + tid = wp.tid() + wp.atomic_add(out, indices[tid], values[tid]) + + self.assertTrue(hasattr(_scatter_deterministic.adj, "det_meta")) + delattr(_scatter_deterministic.adj, "det_meta") + self.assertFalse(hasattr(_scatter_deterministic.adj, "det_meta")) + + resolved_options = _scatter_deterministic.module.resolve_options(wp.config) | _scatter_deterministic.options + ModuleHasher([_scatter_deterministic], resolved_options) + + self.assertTrue(hasattr(_scatter_deterministic.adj, "det_meta")) + self.assertTrue(_scatter_deterministic.adj.det_meta.needs_deterministic) + def test_module_options_error_without_unique(self): """ValueError raised when module_options are used without ``module="unique"``.""" with self.assertRaises(ValueError) as cm: diff --git a/warp/tests/unittest_suites.py b/warp/tests/unittest_suites.py index 0d95e3cc6d..cd452b1f72 100644 --- a/warp/tests/unittest_suites.py +++ b/warp/tests/unittest_suites.py @@ -139,6 +139,7 @@ def default_suite(test_loader: unittest.TestLoader = unittest.defaultTestLoader) from warp.tests.test_conditional import TestConditional from warp.tests.test_constant_precision import TestConstantPrecision from warp.tests.test_context import TestContext + from warp.tests.test_deterministic import TestDeterministic from warp.tests.test_copy import TestCopy from warp.tests.test_cpu_precompiled_headers import TestCpuPrecompiledHeaders from warp.tests.test_ctypes import TestCTypes