diff --git a/src/a2a3/platform/include/common/platform_config.h b/src/a2a3/platform/include/common/platform_config.h index c8f7b7406..8c91e0fae 100644 --- a/src/a2a3/platform/include/common/platform_config.h +++ b/src/a2a3/platform/include/common/platform_config.h @@ -59,6 +59,12 @@ constexpr int PLATFORM_MAX_AICPU_THREADS = 4; */ constexpr int PLATFORM_MAX_AICPU_THREADS_JUST_FOR_LAUNCH = 6; +/** + * Host-side stream synchronization timeout (milliseconds) + * Used to detect potential hangs during rtStreamSynchronize. + */ +constexpr int PLATFORM_STREAM_SYNC_TIMEOUT_MS = 1000; + // ============================================================================= // Derived Platform Limits // ============================================================================= diff --git a/src/a2a3/platform/onboard/host/device_runner.cpp b/src/a2a3/platform/onboard/host/device_runner.cpp index 54b5f8623..12d725ab4 100644 --- a/src/a2a3/platform/onboard/host/device_runner.cpp +++ b/src/a2a3/platform/onboard/host/device_runner.cpp @@ -20,7 +20,11 @@ #include #include +#include +#include #include +#include +#include #include #include #include "acl/acl.h" @@ -67,6 +71,24 @@ HalHostUnregisterFn get_halHostUnregister() { return reinterpret_cast(dlsym(g_hal_handle, "halHostUnregister")); } +/** Shared state between the main thread and the create_thread worker for rtStreamSynchronize timeout. */ +struct StreamSyncWorkerState { + std::mutex mu; + std::condition_variable cv; + bool completed{false}; + int rc{0}; +}; + +/** Wait up to timeout_ms for the worker to set completed=true. Returns completed state. */ +bool wait_for_completed(const std::shared_ptr &worker_state, int timeout_ms) { + std::unique_lock lock(worker_state->mu); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + worker_state->cv.wait_until(lock, deadline, [&worker_state]() { + return worker_state->completed; + }); + return worker_state->completed; +} + } // namespace // ============================================================================= @@ -375,6 +397,94 @@ void DeviceRunner::release_run_context() { } } +int DeviceRunner::synchronize_stream_with_timeout( + rtStream_t stream, const char *stream_name, int timeout_ms, bool *timed_out +) { + if (timed_out != nullptr) { + *timed_out = false; + } + if (stream == nullptr) { + LOG_ERROR("synchronize_stream_with_timeout: stream %s is null", stream_name); + return -1; + } + + auto worker_state = std::make_shared(); + + aclrtContext ctx = nullptr; + aclrtGetCurrentContext(&ctx); + + // create_thread binds rtSetDevice(device_id_); ACL context is thread-local so + // the worker still mirrors the caller's context for this stream. + std::thread worker = create_thread([worker_state, stream, ctx]() { + if (ctx != nullptr) { + aclrtSetCurrentContext(ctx); + } + int local_rc = rtStreamSynchronize(stream); + { + std::lock_guard lock(worker_state->mu); + worker_state->rc = local_rc; + worker_state->completed = true; + } + worker_state->cv.notify_one(); + }); + + if (wait_for_completed(worker_state, timeout_ms)) { + worker.join(); + int rc = worker_state->rc; + if (rc != 0) { + LOG_ERROR("rtStreamSynchronize (%s) failed: %d", stream_name, rc); + } + return rc; + } + + LOG_ERROR( + "Stream sync timeout: stream=%s timeout_ms=%d device_id=%d block_dim=%d worker_count=%d " + "binaries_loaded=%d stream_aicpu=%p stream_aicore=%p", + stream_name, timeout_ms, device_id_, block_dim_, worker_count_, binaries_loaded_ ? 1 : 0, + static_cast(stream_aicpu_), static_cast(stream_aicore_) + ); + + bool device_reset_ok = false; + if (device_id_ >= 0) { + LOG_ERROR("Forcing aclrtResetDevice(%d) to unblock stuck stream sync", device_id_); + int reset_rc = aclrtResetDevice(device_id_); + if (reset_rc != 0) { + LOG_ERROR("aclrtResetDevice(%d) failed: %d", device_id_, reset_rc); + } else { + device_reset_ok = true; + } + } + + bool worker_joined = false; + if (wait_for_completed(worker_state, timeout_ms)) { + worker.join(); + worker_joined = true; + } else { + LOG_ERROR( + "Worker thread still blocked after aclrtResetDevice + %dms grace; " + "detaching to avoid permanent hang (possible resource leak)", + timeout_ms + ); + worker.detach(); + } + + if (device_reset_ok) { + stream_aicpu_ = nullptr; + stream_aicore_ = nullptr; + } else if (!worker_joined) { + LOG_ERROR("Stream pointers cleared to prevent UAF with detached worker (stream leak)"); + stream_aicpu_ = nullptr; + stream_aicore_ = nullptr; + } + // !device_reset_ok && worker_joined: reset failed but sync eventually completed; + // stream is still owned by this runner — leave pointers for normal teardown. + + if (timed_out != nullptr) { + *timed_out = true; + } + return -1; +} + int DeviceRunner::ensure_binaries_loaded( const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary ) { @@ -437,6 +547,7 @@ int DeviceRunner::run( Runtime &runtime, int block_dim, int device_id, const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary, int launch_aicpu_num ) { + last_run_timed_out_ = false; // Validate launch_aicpu_num if (launch_aicpu_num < 1 || launch_aicpu_num > PLATFORM_MAX_AICPU_THREADS) { LOG_ERROR("launch_aicpu_num (%d) must be in range [1, %d]", launch_aicpu_num, PLATFORM_MAX_AICPU_THREADS); @@ -718,18 +829,26 @@ int DeviceRunner::run( } }); + bool aicpu_timed_out = false; LOG_INFO("=== rtStreamSynchronize stream_aicpu_ ==="); - // Synchronize streams - rc = rtStreamSynchronize(stream_aicpu_); + // Synchronize streams with host-side timeout watchdog. + rc = synchronize_stream_with_timeout(stream_aicpu_, "AICPU", PLATFORM_STREAM_SYNC_TIMEOUT_MS, &aicpu_timed_out); if (rc != 0) { - LOG_ERROR("rtStreamSynchronize (AICPU) failed: %d", rc); + if (aicpu_timed_out) { + last_run_timed_out_ = true; + } return rc; } + bool aicore_timed_out = false; LOG_INFO("=== rtStreamSynchronize stream_aicore_ ==="); - rc = rtStreamSynchronize(stream_aicore_); + rc = synchronize_stream_with_timeout( + stream_aicore_, "AICore", PLATFORM_STREAM_SYNC_TIMEOUT_MS, &aicore_timed_out + ); if (rc != 0) { - LOG_ERROR("rtStreamSynchronize (AICore) failed: %d", rc); + if (aicore_timed_out) { + last_run_timed_out_ = true; + } return rc; } } diff --git a/src/a2a3/platform/onboard/host/device_runner.h b/src/a2a3/platform/onboard/host/device_runner.h index e19f61d19..dd8cea16f 100644 --- a/src/a2a3/platform/onboard/host/device_runner.h +++ b/src/a2a3/platform/onboard/host/device_runner.h @@ -286,6 +286,11 @@ class DeviceRunner { */ int finalize(); + /** + * Whether the most recent run() exited due to stream sync timeout. + */ + bool last_run_timed_out() const { return last_run_timed_out_; } + /** * Launch an AICPU kernel * @@ -417,6 +422,7 @@ class DeviceRunner { int cores_per_blockdim_{PLATFORM_CORES_PER_BLOCKDIM}; int worker_count_{0}; // Stored for print_handshake_results in destructor std::vector aicore_kernel_binary_; + bool last_run_timed_out_{false}; // Memory management MemoryAllocator mem_alloc_; @@ -502,6 +508,23 @@ class DeviceRunner { */ int prepare_orch_so(Runtime &runtime); + /** + * Synchronize stream with host-side timeout watchdog. + * + * Runs rtStreamSynchronize inside create_thread() (rtSetDevice + user fn); + * the caller thread waits on shared state with a deadline. On timeout, + * tries aclrtResetDevice and a second wait; if still blocked, detaches. + * Stream pointers are cleared when reset succeeds or on detach to avoid + * finalize-time UAF (detach path may leak stream handles). + * + * @param stream Stream to synchronize + * @param stream_name Stream label for logs + * @param timeout_ms Timeout threshold in milliseconds + * @param timed_out Output flag: true if watchdog timeout happened + * @return 0 on success, error code on failure/timeout + */ + int synchronize_stream_with_timeout(rtStream_t stream, const char *stream_name, int timeout_ms, bool *timed_out); + /** * Initialize performance profiling shared memory * diff --git a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp index 3017b6704..5197d62ff 100644 --- a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp @@ -239,6 +239,13 @@ int run_runtime( if (rc != 0) { validate_runtime_impl(r); r->~Runtime(); + if (runner->last_run_timed_out()) { + LOG_ERROR("run_runtime: stream sync timeout detected, triggering full DeviceRunner reset"); + int reset_rc = runner->finalize(); + if (reset_rc != 0) { + LOG_ERROR("run_runtime: DeviceRunner finalize after timeout failed: %d", reset_rc); + } + } return rc; } diff --git a/src/a5/platform/include/common/platform_config.h b/src/a5/platform/include/common/platform_config.h index 870d9d960..e8bd3a921 100644 --- a/src/a5/platform/include/common/platform_config.h +++ b/src/a5/platform/include/common/platform_config.h @@ -59,6 +59,12 @@ constexpr int PLATFORM_MAX_AICPU_THREADS = 7; */ constexpr int PLATFORM_MAX_AICPU_THREADS_JUST_FOR_LAUNCH = 7; +/** + * Host-side stream synchronization timeout (milliseconds) + * Used to detect potential hangs during rtStreamSynchronize. + */ +constexpr int PLATFORM_STREAM_SYNC_TIMEOUT_MS = 1000; + // ============================================================================= // Derived Platform Limits // ============================================================================= diff --git a/src/a5/platform/onboard/host/device_runner.cpp b/src/a5/platform/onboard/host/device_runner.cpp index 9a3d1e16f..e3a093f9e 100644 --- a/src/a5/platform/onboard/host/device_runner.cpp +++ b/src/a5/platform/onboard/host/device_runner.cpp @@ -18,11 +18,16 @@ #include "device_runner.h" #include +#include +#include #include #include +#include +#include #include #include +#include "acl/acl.h" #include "callable.h" #include "utils/elf_build_id.h" #include "host/host_regs.h" // Register address retrieval @@ -138,6 +143,28 @@ int AicpuSoInfo::finalize() { return 0; } +namespace { + +/** Shared state between the main thread and the create_thread worker for rtStreamSynchronize timeout. */ +struct StreamSyncWorkerState { + std::mutex mu; + std::condition_variable cv; + bool completed{false}; + int rc{0}; +}; + +/** Wait up to timeout_ms for the worker to set completed=true. Returns completed state. */ +bool wait_for_completed(const std::shared_ptr &worker_state, int timeout_ms) { + std::unique_lock lock(worker_state->mu); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + worker_state->cv.wait_until(lock, deadline, [&worker_state]() { + return worker_state->completed; + }); + return worker_state->completed; +} + +} // namespace + // ============================================================================= // DeviceRunner Implementation // ============================================================================= @@ -231,6 +258,94 @@ void DeviceRunner::release_run_context() { } } +int DeviceRunner::synchronize_stream_with_timeout( + rtStream_t stream, const char *stream_name, int timeout_ms, bool *timed_out +) { + if (timed_out != nullptr) { + *timed_out = false; + } + if (stream == nullptr) { + LOG_ERROR("synchronize_stream_with_timeout: stream %s is null", stream_name); + return -1; + } + + auto worker_state = std::make_shared(); + + aclrtContext ctx = nullptr; + aclrtGetCurrentContext(&ctx); + + // create_thread binds rtSetDevice(device_id_); ACL context is thread-local so + // the worker still mirrors the caller's context for this stream. + std::thread worker = create_thread([worker_state, stream, ctx]() { + if (ctx != nullptr) { + aclrtSetCurrentContext(ctx); + } + int local_rc = rtStreamSynchronize(stream); + { + std::lock_guard lock(worker_state->mu); + worker_state->rc = local_rc; + worker_state->completed = true; + } + worker_state->cv.notify_one(); + }); + + if (wait_for_completed(worker_state, timeout_ms)) { + worker.join(); + int rc = worker_state->rc; + if (rc != 0) { + LOG_ERROR("rtStreamSynchronize (%s) failed: %d", stream_name, rc); + } + return rc; + } + + LOG_ERROR( + "Stream sync timeout: stream=%s timeout_ms=%d device_id=%d block_dim=%d worker_count=%d " + "binaries_loaded=%d stream_aicpu=%p stream_aicore=%p", + stream_name, timeout_ms, device_id_, block_dim_, worker_count_, binaries_loaded_ ? 1 : 0, + static_cast(stream_aicpu_), static_cast(stream_aicore_) + ); + + bool device_reset_ok = false; + if (device_id_ >= 0) { + LOG_ERROR("Forcing aclrtResetDevice(%d) to unblock stuck stream sync", device_id_); + int reset_rc = aclrtResetDevice(device_id_); + if (reset_rc != 0) { + LOG_ERROR("aclrtResetDevice(%d) failed: %d", device_id_, reset_rc); + } else { + device_reset_ok = true; + } + } + + bool worker_joined = false; + if (wait_for_completed(worker_state, timeout_ms)) { + worker.join(); + worker_joined = true; + } else { + LOG_ERROR( + "Worker thread still blocked after aclrtResetDevice + %dms grace; " + "detaching to avoid permanent hang (possible resource leak)", + timeout_ms + ); + worker.detach(); + } + + if (device_reset_ok) { + stream_aicpu_ = nullptr; + stream_aicore_ = nullptr; + } else if (!worker_joined) { + LOG_ERROR("Stream pointers cleared to prevent UAF with detached worker (stream leak)"); + stream_aicpu_ = nullptr; + stream_aicore_ = nullptr; + } + // !device_reset_ok && worker_joined: reset failed but sync eventually completed; + // stream is still owned by this runner — leave pointers for normal teardown. + + if (timed_out != nullptr) { + *timed_out = true; + } + return -1; +} + int DeviceRunner::ensure_binaries_loaded( const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary ) { @@ -293,6 +408,7 @@ int DeviceRunner::run( Runtime &runtime, int block_dim, int device_id, const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary, int launch_aicpu_num ) { + last_run_timed_out_ = false; if (launch_aicpu_num < 1 || launch_aicpu_num > PLATFORM_MAX_AICPU_THREADS) { LOG_ERROR("launch_aicpu_num (%d) must be in range [1, %d]", launch_aicpu_num, PLATFORM_MAX_AICPU_THREADS); return -1; @@ -471,18 +587,26 @@ int DeviceRunner::run( } { + bool aicpu_timed_out = false; LOG_INFO("=== rtStreamSynchronize stream_aicpu_ ==="); - // Synchronize streams - rc = rtStreamSynchronize(stream_aicpu_); + // Synchronize streams with host-side timeout watchdog. + rc = synchronize_stream_with_timeout(stream_aicpu_, "AICPU", PLATFORM_STREAM_SYNC_TIMEOUT_MS, &aicpu_timed_out); if (rc != 0) { - LOG_ERROR("rtStreamSynchronize (AICPU) failed: %d", rc); + if (aicpu_timed_out) { + last_run_timed_out_ = true; + } return rc; } + bool aicore_timed_out = false; LOG_INFO("=== rtStreamSynchronize stream_aicore_ ==="); - rc = rtStreamSynchronize(stream_aicore_); + rc = synchronize_stream_with_timeout( + stream_aicore_, "AICore", PLATFORM_STREAM_SYNC_TIMEOUT_MS, &aicore_timed_out + ); if (rc != 0) { - LOG_ERROR("rtStreamSynchronize (AICore) failed: %d", rc); + if (aicore_timed_out) { + last_run_timed_out_ = true; + } return rc; } } diff --git a/src/a5/platform/onboard/host/device_runner.h b/src/a5/platform/onboard/host/device_runner.h index f6e78d8a3..03403be62 100644 --- a/src/a5/platform/onboard/host/device_runner.h +++ b/src/a5/platform/onboard/host/device_runner.h @@ -261,6 +261,11 @@ class DeviceRunner { */ int finalize(); + /** + * Whether the most recent run() exited due to stream sync timeout. + */ + bool last_run_timed_out() const { return last_run_timed_out_; } + /** * Launch an AICPU kernel * @@ -356,6 +361,7 @@ class DeviceRunner { int cores_per_blockdim_{PLATFORM_CORES_PER_BLOCKDIM}; int worker_count_{0}; // Stored for print_handshake_results in destructor std::vector aicore_kernel_binary_; + bool last_run_timed_out_{false}; // Memory management MemoryAllocator mem_alloc_; @@ -425,6 +431,23 @@ class DeviceRunner { */ int prepare_orch_so(Runtime &runtime); + /** + * Synchronize stream with host-side timeout watchdog. + * + * Runs rtStreamSynchronize inside create_thread() (rtSetDevice + user fn); + * the caller thread waits on shared state with a deadline. On timeout, + * tries aclrtResetDevice and a second wait; if still blocked, detaches. + * Stream pointers are cleared when reset succeeds or on detach to avoid + * finalize-time UAF (detach path may leak stream handles). + * + * @param stream Stream to synchronize + * @param stream_name Stream label for logs + * @param timeout_ms Timeout threshold in milliseconds + * @param timed_out Output flag: true if watchdog timeout happened + * @return 0 on success, error code on failure/timeout + */ + int synchronize_stream_with_timeout(rtStream_t stream, const char *stream_name, int timeout_ms, bool *timed_out); + /** * Initialize performance profiling device buffers * diff --git a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp index 9a58d7fa3..d7071c2c0 100644 --- a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp @@ -206,6 +206,13 @@ int run_runtime( if (rc != 0) { validate_runtime_impl(r); r->~Runtime(); + if (runner->last_run_timed_out()) { + LOG_ERROR("run_runtime: stream sync timeout detected, triggering full DeviceRunner reset"); + int reset_rc = runner->finalize(); + if (reset_rc != 0) { + LOG_ERROR("run_runtime: DeviceRunner finalize after timeout failed: %d", reset_rc); + } + } return rc; }