Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/bindings/task_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ NB_MODULE(_task_interface, m) {
.def_prop_ro("device_id", &ChipWorker::device_id)
.def_prop_ro("initialized", &ChipWorker::initialized)
.def_prop_ro("device_set", &ChipWorker::device_set)
.def_prop_ro("device_unresponsive", &ChipWorker::device_unresponsive)
.def("malloc", &ChipWorker::malloc, nb::arg("size"))
.def("free", &ChipWorker::free, nb::arg("ptr"))
.def("copy_to", &ChipWorker::copy_to, nb::arg("dst"), nb::arg("src"), nb::arg("size"))
Expand Down
5 changes: 5 additions & 0 deletions python/simpler/task_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ def finalize(self):
"""
self._impl.finalize()

@property
def device_unresponsive(self):
"""True after a run() that triggered stream sync timeout + device reset."""
return self._impl.device_unresponsive

def run(self, callable, args, config=None, **kwargs):
"""Execute a callable synchronously.
Expand Down
8 changes: 7 additions & 1 deletion simpler_setup/scene_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,13 +1350,19 @@ def run_module(module_name): # noqa: PLR0912, PLR0915 -- CLI parsing + dispatch
)
print("PASSED")
except Exception as e: # noqa: BLE001
print(f"FAILED: {e}")
print(f"FAILED: {e}", flush=True)
ok = False
if args.exitfirst:
raise SystemExit(1) from None
finally:
if level == 2:
force_exit = getattr(worker, "device_unresponsive", False)
worker.finalize()
if force_exit:
# CANN library fini functions block after aclrtResetDevice;
# skip Python interpreter cleanup to avoid hanging.
print("stream sync timeout recovery — force exit", flush=True)
os._exit(1)
else:
worker.close()

Expand Down
6 changes: 6 additions & 0 deletions src/a2a3/platform/include/common/platform_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ constexpr int PLATFORM_MAX_AICPU_THREADS = 4;
*/
constexpr int PLATFORM_MAX_AICPU_THREADS_JUST_FOR_LAUNCH = 6;

/**
* Host-side stream synchronization timeout (milliseconds)
* Passed to aclrtSynchronizeStreamWithTimeout to detect stream sync hangs.
*/
constexpr int PLATFORM_STREAM_SYNC_TIMEOUT_MS = 1000;

// =============================================================================
// Derived Platform Limits
// =============================================================================
Expand Down
8 changes: 6 additions & 2 deletions src/a2a3/platform/include/host/l2_perf_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ class ProfMemoryManager {
/**
* Stop the memory management thread
* Blocks until the thread exits.
*
* @param skip_device_free If true, skip freeing device buffers (use after device reset).
*/
void stop();
void stop(bool skip_device_free = false);

/**
* Try to pop a ready buffer info (non-blocking)
Expand Down Expand Up @@ -315,8 +317,10 @@ class L2PerfCollector {
* Stop the memory management thread and clean up remaining data
*
* Must be called after device execution completes.
*
* @param skip_device_free If true, skip freeing device buffers (use after device reset).
*/
void stop_memory_manager();
void stop_memory_manager(bool skip_device_free = false);

/**
* Cleanup all resources
Expand Down
8 changes: 8 additions & 0 deletions src/a2a3/platform/include/host/memory_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ class MemoryAllocator {
*/
int finalize();

/**
* Drop all tracked pointers without calling rtFree.
*
* Use after a device reset (aclrtResetDevice) that reclaims all device
* memory, so individual rtFree calls are unnecessary and may block.
*/
void abandon();

/**
* Get number of tracked allocations
*
Expand Down
84 changes: 75 additions & 9 deletions src/a2a3/platform/onboard/host/device_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,29 @@ int DeviceRunner::copy_from_device(void *host_ptr, const void *dev_ptr, size_t b
return rtMemcpy(host_ptr, bytes, dev_ptr, bytes, RT_MEMCPY_DEVICE_TO_HOST);
}

int DeviceRunner::synchronize_stream_with_timeout(rtStream_t stream, const char *stream_name, int timeout_ms) {
// stream is guaranteed non-null: callers invoke this after rtStreamCreate succeeded.
int rc = aclrtSynchronizeStreamWithTimeout(stream, timeout_ms);
if (rc == ACL_ERROR_RT_STREAM_SYNC_TIMEOUT) {
LOG_ERROR(
"Stream sync timeout: stream=%s timeout_ms=%d device_id=%d block_dim=%d "
"worker_count=%d binaries_loaded=%d",
stream_name, timeout_ms, device_id_, block_dim_, worker_count_, binaries_loaded_ ? 1 : 0
);
device_unresponsive_ = true;
return rc;
}
if (rc != 0) {
LOG_ERROR("aclrtSynchronizeStreamWithTimeout (%s) failed: %d", stream_name, rc);
}
return rc;
}

int DeviceRunner::run(
Runtime &runtime, int block_dim, int device_id, const std::vector<uint8_t> &aicpu_so_binary,
const std::vector<uint8_t> &aicore_kernel_binary, int launch_aicpu_num
) {
device_unresponsive_ = false;
// Validate launch_aicpu_num
if (launch_aicpu_num < 1 || launch_aicpu_num > PLATFORM_MAX_AICPU_THREADS) {
LOG_ERROR("launch_aicpu_num (%d) must be in range [1, %d]", launch_aicpu_num, PLATFORM_MAX_AICPU_THREADS);
Expand Down Expand Up @@ -554,20 +573,23 @@ int DeviceRunner::run(

// Scope guards for cleanup on all exit paths
auto regs_cleanup = RAIIScopeGuard([this]() {
if (device_unresponsive_) return;
if (kernel_args_.args.regs != 0) {
mem_alloc_.free(reinterpret_cast<void *>(kernel_args_.args.regs));
kernel_args_.args.regs = 0;
}
});

auto pmu_regs_cleanup = RAIIScopeGuard([this]() {
if (device_unresponsive_) return;
if (kernel_args_.args.pmu_reg_addrs != 0) {
mem_alloc_.free(reinterpret_cast<void *>(kernel_args_.args.pmu_reg_addrs));
kernel_args_.args.pmu_reg_addrs = 0;
}
});

auto runtime_args_cleanup = RAIIScopeGuard([this]() {
if (device_unresponsive_) return;
kernel_args_.finalize_device_kernel_args();
kernel_args_.finalize_runtime_args();
});
Expand Down Expand Up @@ -607,9 +629,8 @@ int DeviceRunner::run(
}

auto perf_cleanup = RAIIScopeGuard([this]() {
bool was_initialized = l2_perf_collector_.is_initialized();
if (was_initialized) {
l2_perf_collector_.stop_memory_manager();
if (l2_perf_collector_.is_initialized()) {
l2_perf_collector_.stop_memory_manager(/*skip_device_free=*/device_unresponsive_);
}
});

Expand Down Expand Up @@ -721,17 +742,14 @@ int DeviceRunner::run(
});

LOG_INFO("=== rtStreamSynchronize stream_aicpu_ ===");
// Synchronize streams
rc = rtStreamSynchronize(stream_aicpu_);
rc = synchronize_stream_with_timeout(stream_aicpu_, "AICPU", PLATFORM_STREAM_SYNC_TIMEOUT_MS);
if (rc != 0) {
LOG_ERROR("rtStreamSynchronize (AICPU) failed: %d", rc);
return rc;
}

LOG_INFO("=== rtStreamSynchronize stream_aicore_ ===");
rc = rtStreamSynchronize(stream_aicore_);
rc = synchronize_stream_with_timeout(stream_aicore_, "AICore", PLATFORM_STREAM_SYNC_TIMEOUT_MS);
if (rc != 0) {
LOG_ERROR("rtStreamSynchronize (AICore) failed: %d", rc);
return rc;
}
}
Expand Down Expand Up @@ -851,6 +869,50 @@ int DeviceRunner::finalize() {
return rc;
}

if (device_unresponsive_) {
// Device is unresponsive — skip all rt* cleanup (rtFree, rtStreamDestroy,
// halHostUnregister) which would block. Go directly to aclrtResetDevice.
// Device reset reclaims all device memory, so skipped frees are safe.
// Every cached device pointer must be nulled to prevent stale reuse.
LOG_ERROR("finalize: timeout recovery — skipping rt* cleanup, resetting device directly");
stream_aicpu_ = nullptr;
stream_aicore_ = nullptr;
kernel_args_.abandon();
so_info_.abandon();
func_id_to_addr_.clear();
binaries_loaded_ = false;
dev_orch_so_buffer_ = nullptr;
dev_orch_so_capacity_ = 0;
cached_orch_so_hash_ = 0;
host_orch_so_copy_.clear();
host_orch_so_copy_.shrink_to_fit();
mem_alloc_.abandon();

// aclrtResetDevice must execute unconditionally here — the run() path
// uses rtSetDevice (not aclInit), so acl_ready_ is false, but the
// device still needs resetting. reset_device_and_acl() gates on
// acl_ready_ and would skip the reset entirely.
if (device_id_ >= 0) {
int reset_rc = aclrtResetDevice(device_id_);
if (reset_rc != 0) {
LOG_ERROR("aclrtResetDevice(%d) failed during timeout recovery: %d", device_id_, reset_rc);
}
}
if (acl_ready_) {
int fin_rc = aclFinalize();
if (fin_rc != 0) {
LOG_ERROR("aclFinalize failed during timeout recovery: %d", fin_rc);
}
acl_ready_ = false;
}
device_id_ = -1;
block_dim_ = 0;
worker_count_ = 0;
aicore_kernel_binary_.clear();
LOG_INFO("DeviceRunner finalized (timeout recovery)");
return 0;
}
Comment thread
indigo1973 marked this conversation as resolved.

release_run_context();

// Cleanup kernel args (deviceArgs)
Expand Down Expand Up @@ -938,7 +1000,11 @@ int DeviceRunner::finalize() {
// Free all remaining allocations (including handshake buffer and binGmAddr)
mem_alloc_.finalize();

// Reset device and finalize ACL AFTER all device memory is freed.
return reset_device_and_acl();
}

int DeviceRunner::reset_device_and_acl() {
int rc = 0;
// Gated on acl_ready_ so rt-only runtimes that never called
// ensure_acl_ready() do not try to aclFinalize an un-init'd ACL state.
if (acl_ready_ && device_id_ >= 0) {
Expand Down
39 changes: 39 additions & 0 deletions src/a2a3/platform/onboard/host/device_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ struct KernelArgsHelper {
*/
int finalize_device_kernel_args();

/** Null out all device pointers without calling rtFree (for use after device reset). */
void abandon() {
args.device_args = nullptr;
args.runtime_args = nullptr;
args.regs = 0;
args.ffts_base_addr = 0;
args.dump_data_base = 0;
args.l2_perf_data_base = 0;
args.pmu_data_base = 0;
args.pmu_reg_addrs = 0;
device_k_args_ = nullptr;
}

/**
* Implicit conversion operators for seamless use with runtime APIs
*
Expand Down Expand Up @@ -169,6 +182,12 @@ struct AicpuSoInfo {
* @return 0 on success, error code on failure
*/
int finalize();

/** Null out device pointer without calling rtFree (for use after device reset). */
void abandon() {
aicpu_so_bin = 0;
aicpu_so_len = 0;
}
};

/**
Expand Down Expand Up @@ -291,6 +310,11 @@ class DeviceRunner {
*/
int finalize();

/**
* Whether the most recent run() exited due to stream sync timeout.
*/
bool device_unresponsive() const { return device_unresponsive_; }

/**
* Launch an AICPU kernel
*
Expand Down Expand Up @@ -422,6 +446,7 @@ class DeviceRunner {
int cores_per_blockdim_{PLATFORM_CORES_PER_BLOCKDIM};
int worker_count_{0}; // Stored for print_handshake_results in destructor
std::vector<uint8_t> aicore_kernel_binary_;
bool device_unresponsive_{false};

// Memory management
MemoryAllocator mem_alloc_;
Expand Down Expand Up @@ -507,6 +532,20 @@ class DeviceRunner {
*/
int prepare_orch_so(Runtime &runtime);

/**
* Synchronize stream with a host-side timeout via aclrtSynchronizeStreamWithTimeout.
* Sets device_unresponsive_ on ACL_ERROR_RT_STREAM_SYNC_TIMEOUT.
*
* @param stream Stream to synchronize
* @param stream_name Stream label for logs
* @param timeout_ms Timeout threshold in milliseconds
* @return 0 on success, error code on failure/timeout
*/
int synchronize_stream_with_timeout(rtStream_t stream, const char *stream_name, int timeout_ms);

/** Execute aclrtResetDevice + aclFinalize and clear runner state. */
int reset_device_and_acl();

/**
* Initialize performance profiling shared memory
*
Expand Down
5 changes: 5 additions & 0 deletions src/a2a3/platform/onboard/host/memory_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ int MemoryAllocator::finalize() {
ptr_set_.clear();
return last_error;
}

void MemoryAllocator::abandon() {
std::lock_guard<std::mutex> lk(mu_);
ptr_set_.clear();
}
21 changes: 19 additions & 2 deletions src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,20 @@ int run_runtime(
std::vector<uint8_t> aicore_vec(aicore_binary, aicore_binary + aicore_size);
rc = runner->run(*r, block_dim, device_id, aicpu_vec, aicore_vec, aicpu_thread_num);
if (rc != 0) {
validate_runtime_impl(r);
r->~Runtime();
if (runner->device_unresponsive()) {
// Skip validate_runtime_impl: device state is unreliable after
// stream sync timeout — copy_from_device / device_free may block
// or fault on already-reset resources.
LOG_ERROR("run_runtime: stream sync timeout detected, skipping validation, triggering full reset");
r->~Runtime();
int reset_rc = runner->finalize();
if (reset_rc != 0) {
LOG_ERROR("run_runtime: DeviceRunner finalize after timeout failed: %d", reset_rc);
}
} else {
validate_runtime_impl(r);
r->~Runtime();
}
return rc;
}

Expand All @@ -260,6 +272,11 @@ int finalize_device(DeviceContextHandle ctx) {
}
}

int device_unresponsive(DeviceContextHandle ctx) {
if (ctx == NULL) return 0;
return static_cast<DeviceRunner *>(ctx)->device_unresponsive() ? 1 : 0;
}

/* ===========================================================================
* Internal helpers called from runtime_maker.cpp via Runtime.host_api
* =========================================================================== */
Expand Down
5 changes: 5 additions & 0 deletions src/a2a3/platform/sim/host/pto_runtime_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ int finalize_device(DeviceContextHandle ctx) {
}
}

int device_unresponsive(DeviceContextHandle ctx) {
(void)ctx;
return 0;
}

/* ===========================================================================
* ACL lifecycle stubs. Sim has no ACL / aclrtStream concept, so these
* no-op to satisfy the uniform host_runtime.so ABI (ChipWorker dlsym's the
Expand Down
Loading
Loading