diff --git a/python/bindings/task_interface.cpp b/python/bindings/task_interface.cpp index 5d70524be..1ffcc9051 100644 --- a/python/bindings/task_interface.cpp +++ b/python/bindings/task_interface.cpp @@ -645,6 +645,7 @@ NB_MODULE(_task_interface, m) { .def_prop_ro("device_id", &ChipWorker::device_id) .def_prop_ro("initialized", &ChipWorker::initialized) .def_prop_ro("device_set", &ChipWorker::device_set) + .def_prop_ro("device_unresponsive", &ChipWorker::device_unresponsive) .def("malloc", &ChipWorker::malloc, nb::arg("size")) .def("free", &ChipWorker::free, nb::arg("ptr")) .def("copy_to", &ChipWorker::copy_to, nb::arg("dst"), nb::arg("src"), nb::arg("size")) diff --git a/python/simpler/task_interface.py b/python/simpler/task_interface.py index 716c380e9..356e4a338 100644 --- a/python/simpler/task_interface.py +++ b/python/simpler/task_interface.py @@ -282,6 +282,11 @@ def finalize(self): """ self._impl.finalize() + @property + def device_unresponsive(self): + """True after a run() that triggered stream sync timeout + device reset.""" + return self._impl.device_unresponsive + def run(self, callable, args, config=None, **kwargs): """Execute a callable synchronously. diff --git a/simpler_setup/scene_test.py b/simpler_setup/scene_test.py index 59042854e..3616df5a1 100644 --- a/simpler_setup/scene_test.py +++ b/simpler_setup/scene_test.py @@ -1350,13 +1350,19 @@ def run_module(module_name): # noqa: PLR0912, PLR0915 -- CLI parsing + dispatch ) print("PASSED") except Exception as e: # noqa: BLE001 - print(f"FAILED: {e}") + print(f"FAILED: {e}", flush=True) ok = False if args.exitfirst: raise SystemExit(1) from None finally: if level == 2: + force_exit = getattr(worker, "device_unresponsive", False) worker.finalize() + if force_exit: + # CANN library fini functions block after aclrtResetDevice; + # skip Python interpreter cleanup to avoid hanging. + print("stream sync timeout recovery — force exit", flush=True) + os._exit(1) else: worker.close() diff --git a/src/a2a3/platform/include/common/platform_config.h b/src/a2a3/platform/include/common/platform_config.h index c8f7b7406..8c29a39a3 100644 --- a/src/a2a3/platform/include/common/platform_config.h +++ b/src/a2a3/platform/include/common/platform_config.h @@ -59,6 +59,12 @@ constexpr int PLATFORM_MAX_AICPU_THREADS = 4; */ constexpr int PLATFORM_MAX_AICPU_THREADS_JUST_FOR_LAUNCH = 6; +/** + * Host-side stream synchronization timeout (milliseconds) + * Passed to aclrtSynchronizeStreamWithTimeout to detect stream sync hangs. + */ +constexpr int PLATFORM_STREAM_SYNC_TIMEOUT_MS = 1000; + // ============================================================================= // Derived Platform Limits // ============================================================================= diff --git a/src/a2a3/platform/include/host/l2_perf_collector.h b/src/a2a3/platform/include/host/l2_perf_collector.h index 8e3275542..1c0397d66 100644 --- a/src/a2a3/platform/include/host/l2_perf_collector.h +++ b/src/a2a3/platform/include/host/l2_perf_collector.h @@ -158,8 +158,10 @@ class ProfMemoryManager { /** * Stop the memory management thread * Blocks until the thread exits. + * + * @param skip_device_free If true, skip freeing device buffers (use after device reset). */ - void stop(); + void stop(bool skip_device_free = false); /** * Try to pop a ready buffer info (non-blocking) @@ -315,8 +317,10 @@ class L2PerfCollector { * Stop the memory management thread and clean up remaining data * * Must be called after device execution completes. + * + * @param skip_device_free If true, skip freeing device buffers (use after device reset). */ - void stop_memory_manager(); + void stop_memory_manager(bool skip_device_free = false); /** * Cleanup all resources diff --git a/src/a2a3/platform/include/host/memory_allocator.h b/src/a2a3/platform/include/host/memory_allocator.h index 217fc37f2..5b069ddbc 100644 --- a/src/a2a3/platform/include/host/memory_allocator.h +++ b/src/a2a3/platform/include/host/memory_allocator.h @@ -91,6 +91,14 @@ class MemoryAllocator { */ int finalize(); + /** + * Drop all tracked pointers without calling rtFree. + * + * Use after a device reset (aclrtResetDevice) that reclaims all device + * memory, so individual rtFree calls are unnecessary and may block. + */ + void abandon(); + /** * Get number of tracked allocations * diff --git a/src/a2a3/platform/onboard/host/device_runner.cpp b/src/a2a3/platform/onboard/host/device_runner.cpp index 8b91d4196..eaa077f85 100644 --- a/src/a2a3/platform/onboard/host/device_runner.cpp +++ b/src/a2a3/platform/onboard/host/device_runner.cpp @@ -433,10 +433,29 @@ int DeviceRunner::copy_from_device(void *host_ptr, const void *dev_ptr, size_t b return rtMemcpy(host_ptr, bytes, dev_ptr, bytes, RT_MEMCPY_DEVICE_TO_HOST); } +int DeviceRunner::synchronize_stream_with_timeout(rtStream_t stream, const char *stream_name, int timeout_ms) { + // stream is guaranteed non-null: callers invoke this after rtStreamCreate succeeded. + int rc = aclrtSynchronizeStreamWithTimeout(stream, timeout_ms); + if (rc == ACL_ERROR_RT_STREAM_SYNC_TIMEOUT) { + LOG_ERROR( + "Stream sync timeout: stream=%s timeout_ms=%d device_id=%d block_dim=%d " + "worker_count=%d binaries_loaded=%d", + stream_name, timeout_ms, device_id_, block_dim_, worker_count_, binaries_loaded_ ? 1 : 0 + ); + device_unresponsive_ = true; + return rc; + } + if (rc != 0) { + LOG_ERROR("aclrtSynchronizeStreamWithTimeout (%s) failed: %d", stream_name, rc); + } + return rc; +} + int DeviceRunner::run( Runtime &runtime, int block_dim, int device_id, const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary, int launch_aicpu_num ) { + device_unresponsive_ = false; // Validate launch_aicpu_num if (launch_aicpu_num < 1 || launch_aicpu_num > PLATFORM_MAX_AICPU_THREADS) { LOG_ERROR("launch_aicpu_num (%d) must be in range [1, %d]", launch_aicpu_num, PLATFORM_MAX_AICPU_THREADS); @@ -554,6 +573,7 @@ int DeviceRunner::run( // Scope guards for cleanup on all exit paths auto regs_cleanup = RAIIScopeGuard([this]() { + if (device_unresponsive_) return; if (kernel_args_.args.regs != 0) { mem_alloc_.free(reinterpret_cast(kernel_args_.args.regs)); kernel_args_.args.regs = 0; @@ -561,6 +581,7 @@ int DeviceRunner::run( }); auto pmu_regs_cleanup = RAIIScopeGuard([this]() { + if (device_unresponsive_) return; if (kernel_args_.args.pmu_reg_addrs != 0) { mem_alloc_.free(reinterpret_cast(kernel_args_.args.pmu_reg_addrs)); kernel_args_.args.pmu_reg_addrs = 0; @@ -568,6 +589,7 @@ int DeviceRunner::run( }); auto runtime_args_cleanup = RAIIScopeGuard([this]() { + if (device_unresponsive_) return; kernel_args_.finalize_device_kernel_args(); kernel_args_.finalize_runtime_args(); }); @@ -607,9 +629,8 @@ int DeviceRunner::run( } auto perf_cleanup = RAIIScopeGuard([this]() { - bool was_initialized = l2_perf_collector_.is_initialized(); - if (was_initialized) { - l2_perf_collector_.stop_memory_manager(); + if (l2_perf_collector_.is_initialized()) { + l2_perf_collector_.stop_memory_manager(/*skip_device_free=*/device_unresponsive_); } }); @@ -721,17 +742,14 @@ int DeviceRunner::run( }); LOG_INFO("=== rtStreamSynchronize stream_aicpu_ ==="); - // Synchronize streams - rc = rtStreamSynchronize(stream_aicpu_); + rc = synchronize_stream_with_timeout(stream_aicpu_, "AICPU", PLATFORM_STREAM_SYNC_TIMEOUT_MS); if (rc != 0) { - LOG_ERROR("rtStreamSynchronize (AICPU) failed: %d", rc); return rc; } LOG_INFO("=== rtStreamSynchronize stream_aicore_ ==="); - rc = rtStreamSynchronize(stream_aicore_); + rc = synchronize_stream_with_timeout(stream_aicore_, "AICore", PLATFORM_STREAM_SYNC_TIMEOUT_MS); if (rc != 0) { - LOG_ERROR("rtStreamSynchronize (AICore) failed: %d", rc); return rc; } } @@ -851,6 +869,50 @@ int DeviceRunner::finalize() { return rc; } + if (device_unresponsive_) { + // Device is unresponsive — skip all rt* cleanup (rtFree, rtStreamDestroy, + // halHostUnregister) which would block. Go directly to aclrtResetDevice. + // Device reset reclaims all device memory, so skipped frees are safe. + // Every cached device pointer must be nulled to prevent stale reuse. + LOG_ERROR("finalize: timeout recovery — skipping rt* cleanup, resetting device directly"); + stream_aicpu_ = nullptr; + stream_aicore_ = nullptr; + kernel_args_.abandon(); + so_info_.abandon(); + func_id_to_addr_.clear(); + binaries_loaded_ = false; + dev_orch_so_buffer_ = nullptr; + dev_orch_so_capacity_ = 0; + cached_orch_so_hash_ = 0; + host_orch_so_copy_.clear(); + host_orch_so_copy_.shrink_to_fit(); + mem_alloc_.abandon(); + + // aclrtResetDevice must execute unconditionally here — the run() path + // uses rtSetDevice (not aclInit), so acl_ready_ is false, but the + // device still needs resetting. reset_device_and_acl() gates on + // acl_ready_ and would skip the reset entirely. + if (device_id_ >= 0) { + int reset_rc = aclrtResetDevice(device_id_); + if (reset_rc != 0) { + LOG_ERROR("aclrtResetDevice(%d) failed during timeout recovery: %d", device_id_, reset_rc); + } + } + if (acl_ready_) { + int fin_rc = aclFinalize(); + if (fin_rc != 0) { + LOG_ERROR("aclFinalize failed during timeout recovery: %d", fin_rc); + } + acl_ready_ = false; + } + device_id_ = -1; + block_dim_ = 0; + worker_count_ = 0; + aicore_kernel_binary_.clear(); + LOG_INFO("DeviceRunner finalized (timeout recovery)"); + return 0; + } + release_run_context(); // Cleanup kernel args (deviceArgs) @@ -938,7 +1000,11 @@ int DeviceRunner::finalize() { // Free all remaining allocations (including handshake buffer and binGmAddr) mem_alloc_.finalize(); - // Reset device and finalize ACL AFTER all device memory is freed. + return reset_device_and_acl(); +} + +int DeviceRunner::reset_device_and_acl() { + int rc = 0; // Gated on acl_ready_ so rt-only runtimes that never called // ensure_acl_ready() do not try to aclFinalize an un-init'd ACL state. if (acl_ready_ && device_id_ >= 0) { diff --git a/src/a2a3/platform/onboard/host/device_runner.h b/src/a2a3/platform/onboard/host/device_runner.h index e763efceb..4955de0f0 100644 --- a/src/a2a3/platform/onboard/host/device_runner.h +++ b/src/a2a3/platform/onboard/host/device_runner.h @@ -132,6 +132,19 @@ struct KernelArgsHelper { */ int finalize_device_kernel_args(); + /** Null out all device pointers without calling rtFree (for use after device reset). */ + void abandon() { + args.device_args = nullptr; + args.runtime_args = nullptr; + args.regs = 0; + args.ffts_base_addr = 0; + args.dump_data_base = 0; + args.l2_perf_data_base = 0; + args.pmu_data_base = 0; + args.pmu_reg_addrs = 0; + device_k_args_ = nullptr; + } + /** * Implicit conversion operators for seamless use with runtime APIs * @@ -169,6 +182,12 @@ struct AicpuSoInfo { * @return 0 on success, error code on failure */ int finalize(); + + /** Null out device pointer without calling rtFree (for use after device reset). */ + void abandon() { + aicpu_so_bin = 0; + aicpu_so_len = 0; + } }; /** @@ -291,6 +310,11 @@ class DeviceRunner { */ int finalize(); + /** + * Whether the most recent run() exited due to stream sync timeout. + */ + bool device_unresponsive() const { return device_unresponsive_; } + /** * Launch an AICPU kernel * @@ -422,6 +446,7 @@ class DeviceRunner { int cores_per_blockdim_{PLATFORM_CORES_PER_BLOCKDIM}; int worker_count_{0}; // Stored for print_handshake_results in destructor std::vector aicore_kernel_binary_; + bool device_unresponsive_{false}; // Memory management MemoryAllocator mem_alloc_; @@ -507,6 +532,20 @@ class DeviceRunner { */ int prepare_orch_so(Runtime &runtime); + /** + * Synchronize stream with a host-side timeout via aclrtSynchronizeStreamWithTimeout. + * Sets device_unresponsive_ on ACL_ERROR_RT_STREAM_SYNC_TIMEOUT. + * + * @param stream Stream to synchronize + * @param stream_name Stream label for logs + * @param timeout_ms Timeout threshold in milliseconds + * @return 0 on success, error code on failure/timeout + */ + int synchronize_stream_with_timeout(rtStream_t stream, const char *stream_name, int timeout_ms); + + /** Execute aclrtResetDevice + aclFinalize and clear runner state. */ + int reset_device_and_acl(); + /** * Initialize performance profiling shared memory * diff --git a/src/a2a3/platform/onboard/host/memory_allocator.cpp b/src/a2a3/platform/onboard/host/memory_allocator.cpp index 144252940..b203357a0 100644 --- a/src/a2a3/platform/onboard/host/memory_allocator.cpp +++ b/src/a2a3/platform/onboard/host/memory_allocator.cpp @@ -70,3 +70,8 @@ int MemoryAllocator::finalize() { ptr_set_.clear(); return last_error; } + +void MemoryAllocator::abandon() { + std::lock_guard lk(mu_); + ptr_set_.clear(); +} diff --git a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp index 6f73749d8..e5191689b 100644 --- a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp @@ -238,8 +238,20 @@ int run_runtime( std::vector aicore_vec(aicore_binary, aicore_binary + aicore_size); rc = runner->run(*r, block_dim, device_id, aicpu_vec, aicore_vec, aicpu_thread_num); if (rc != 0) { - validate_runtime_impl(r); - r->~Runtime(); + if (runner->device_unresponsive()) { + // Skip validate_runtime_impl: device state is unreliable after + // stream sync timeout — copy_from_device / device_free may block + // or fault on already-reset resources. + LOG_ERROR("run_runtime: stream sync timeout detected, skipping validation, triggering full reset"); + r->~Runtime(); + int reset_rc = runner->finalize(); + if (reset_rc != 0) { + LOG_ERROR("run_runtime: DeviceRunner finalize after timeout failed: %d", reset_rc); + } + } else { + validate_runtime_impl(r); + r->~Runtime(); + } return rc; } @@ -260,6 +272,11 @@ int finalize_device(DeviceContextHandle ctx) { } } +int device_unresponsive(DeviceContextHandle ctx) { + if (ctx == NULL) return 0; + return static_cast(ctx)->device_unresponsive() ? 1 : 0; +} + /* =========================================================================== * Internal helpers called from runtime_maker.cpp via Runtime.host_api * =========================================================================== */ diff --git a/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp b/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp index f17ca0025..b562f53b1 100644 --- a/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp @@ -237,6 +237,11 @@ int finalize_device(DeviceContextHandle ctx) { } } +int device_unresponsive(DeviceContextHandle ctx) { + (void)ctx; + return 0; +} + /* =========================================================================== * ACL lifecycle stubs. Sim has no ACL / aclrtStream concept, so these * no-op to satisfy the uniform host_runtime.so ABI (ChipWorker dlsym's the diff --git a/src/a2a3/platform/src/host/l2_perf_collector.cpp b/src/a2a3/platform/src/host/l2_perf_collector.cpp index 720829e58..84005bd4c 100644 --- a/src/a2a3/platform/src/host/l2_perf_collector.cpp +++ b/src/a2a3/platform/src/host/l2_perf_collector.cpp @@ -68,12 +68,23 @@ void ProfMemoryManager::start( LOG_INFO("ProfMemoryManager started: %d cores, %d phase threads", num_cores, num_phase_threads); } -void ProfMemoryManager::stop() { +void ProfMemoryManager::stop(bool skip_device_free) { running_.store(false); if (mgmt_thread_.joinable()) { mgmt_thread_.join(); } + if (skip_device_free) { + // After device reset, device memory is already reclaimed. + // Just clear host-side tracking without calling rtFree. + std::scoped_lock lock(done_mutex_); + std::queue().swap(done_queue_); + recycled_perf_buffers_.clear(); + recycled_phase_buffers_.clear(); + LOG_INFO("ProfMemoryManager stopped (buffers abandoned after device reset)"); + return; + } + // Drain remaining done_queue and free buffers { std::scoped_lock lock(done_mutex_); @@ -710,9 +721,9 @@ void L2PerfCollector::start_memory_manager(const ThreadFactory &thread_factory) ); } -void L2PerfCollector::stop_memory_manager() { +void L2PerfCollector::stop_memory_manager(bool skip_device_free) { if (memory_manager_.is_running()) { - memory_manager_.stop(); + memory_manager_.stop(skip_device_free); } } diff --git a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp index f2723bc63..c6981e226 100644 --- a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp @@ -227,6 +227,11 @@ int finalize_device(DeviceContextHandle ctx) { } } +int device_unresponsive(DeviceContextHandle ctx) { + (void)ctx; + return 0; +} + /* =========================================================================== * ACL + comm_* placeholders (distributed runtime not yet implemented on a5) * diff --git a/src/a5/platform/sim/host/pto_runtime_c_api.cpp b/src/a5/platform/sim/host/pto_runtime_c_api.cpp index a6b396460..db3801e46 100644 --- a/src/a5/platform/sim/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/sim/host/pto_runtime_c_api.cpp @@ -237,6 +237,11 @@ int finalize_device(DeviceContextHandle ctx) { } } +int device_unresponsive(DeviceContextHandle ctx) { + (void)ctx; + return 0; +} + /* =========================================================================== * ACL lifecycle stubs. Sim has no ACL / aclrtStream concept, so these no-op * to satisfy the uniform host_runtime.so ABI that ChipWorker dlsym's. The diff --git a/src/common/worker/chip_worker.cpp b/src/common/worker/chip_worker.cpp index 43daa5219..0f1726ca4 100644 --- a/src/common/worker/chip_worker.cpp +++ b/src/common/worker/chip_worker.cpp @@ -121,6 +121,7 @@ void ChipWorker::init( get_runtime_size_fn_ = load_symbol(handle, "get_runtime_size"); run_runtime_fn_ = load_symbol(handle, "run_runtime"); finalize_device_fn_ = load_symbol(handle, "finalize_device"); + device_unresponsive_fn_ = load_symbol(handle, "device_unresponsive"); // ACL lifecycle + comm_* are part of the uniform host_runtime.so ABI. // Every platform runtime exports all of them — runtimes that do not // have a real backend (today: a5) ship not-supported stubs rather @@ -210,6 +211,7 @@ void ChipWorker::finalize() { get_runtime_size_fn_ = nullptr; run_runtime_fn_ = nullptr; finalize_device_fn_ = nullptr; + device_unresponsive_fn_ = nullptr; ensure_acl_ready_fn_ = nullptr; create_comm_stream_fn_ = nullptr; destroy_comm_stream_fn_ = nullptr; @@ -249,6 +251,9 @@ void ChipWorker::run(const void *callable, const void *args, const CallConfig &c config.enable_dump_tensor, config.enable_pmu, config.output_prefix ); if (rc != 0) { + if (device_unresponsive_fn_ && device_unresponsive_fn_(device_ctx_)) { + device_unresponsive_ = true; + } throw std::runtime_error("run_runtime failed with code " + std::to_string(rc)); } } diff --git a/src/common/worker/chip_worker.h b/src/common/worker/chip_worker.h index 5617456d6..924dfcc67 100644 --- a/src/common/worker/chip_worker.h +++ b/src/common/worker/chip_worker.h @@ -88,6 +88,11 @@ class ChipWorker : public IWorker { bool initialized() const { return initialized_; } bool device_set() const { return device_set_; } + /// True after a run() that triggered stream sync timeout + device reset. + /// When set, the caller should use os._exit() to avoid CANN library fini + /// functions blocking on the reset device during normal process exit. + bool device_unresponsive() const { return device_unresponsive_; } + private: using CreateDeviceContextFn = void *(*)(); using DestroyDeviceContextFn = void (*)(void *); @@ -102,6 +107,7 @@ class ChipWorker : public IWorker { int, int, int, const char * ); using FinalizeDeviceFn = int (*)(void *); + using DeviceUnresponsiveFn = int (*)(void *); using EnsureAclReadyFn = int (*)(void *, int); using CreateCommStreamFn = void *(*)(void *); using DestroyCommStreamFn = int (*)(void *, void *); @@ -123,6 +129,7 @@ class ChipWorker : public IWorker { GetRuntimeSizeFn get_runtime_size_fn_ = nullptr; RunRuntimeFn run_runtime_fn_ = nullptr; FinalizeDeviceFn finalize_device_fn_ = nullptr; + DeviceUnresponsiveFn device_unresponsive_fn_ = nullptr; EnsureAclReadyFn ensure_acl_ready_fn_ = nullptr; CreateCommStreamFn create_comm_stream_fn_ = nullptr; DestroyCommStreamFn destroy_comm_stream_fn_ = nullptr; @@ -146,6 +153,7 @@ class ChipWorker : public IWorker { bool initialized_ = false; bool device_set_ = false; bool finalized_ = false; + bool device_unresponsive_ = false; }; #endif // SRC_COMMON_WORKER_CHIP_WORKER_H_ diff --git a/src/common/worker/pto_runtime_c_api.h b/src/common/worker/pto_runtime_c_api.h index 970995ed4..66a7487c2 100644 --- a/src/common/worker/pto_runtime_c_api.h +++ b/src/common/worker/pto_runtime_c_api.h @@ -107,6 +107,13 @@ int run_runtime( */ int finalize_device(DeviceContextHandle ctx); +/** + * Return 1 if the device is unresponsive (stream sync timeout triggered + * aclrtResetDevice), 0 otherwise. Used by ChipWorker to decide whether + * os._exit is needed (CANN library fini functions block after device reset). + */ +int device_unresponsive(DeviceContextHandle ctx); + #ifdef __cplusplus } #endif