From aa7a016001778d75af82ca4cff3bbb84f84cf2ea Mon Sep 17 00:00:00 2001 From: jvjhfhg Date: Tue, 28 Apr 2026 17:21:42 +0800 Subject: [PATCH] Update: simplify deferred completion context - Always pass async context through dispatch payload on a2a3 and a5 - Remove submit-time deferred flag and redundant AICPU cache maintenance - Update notify demos and fatal tests for the explicit async context API --- .../kernels/aiv/kernel_notify_wait.cpp | 5 +- .../async_notify_orchestration.cpp | 2 +- .../kernels/aiv/kernel_notify_wait.cpp | 5 +- .../orchestration/deferred_notify_orch.cpp | 2 +- .../kernels/aiv/kernel_notify_wait.cpp | 5 +- .../async_notify_orchestration.cpp | 2 +- .../kernels/aiv/kernel_notify_wait.cpp | 5 +- .../orchestration/deferred_notify_orch.cpp | 2 +- .../common/intrinsic.h | 29 ++++- .../orchestration/pto_orchestration_api.h | 23 +--- .../runtime/pto_async_kernel_api.h | 83 +++++++++----- .../runtime/pto_async_wait.h | 106 +++++------------- .../runtime/pto_completion_ingress.h | 9 +- .../runtime/pto_orchestrator.cpp | 7 +- .../runtime/pto_orchestrator.h | 2 +- .../runtime/pto_runtime2.cpp | 5 +- .../runtime/pto_runtime2.h | 4 +- .../runtime/pto_runtime2_types.h | 12 +- .../runtime/scheduler/pto_scheduler.h | 8 -- .../scheduler/scheduler_completion.cpp | 6 +- .../runtime/scheduler/scheduler_context.h | 9 +- .../runtime/scheduler/scheduler_dispatch.cpp | 22 ++-- .../common/intrinsic.h | 29 ++++- .../orchestration/pto_orchestration_api.h | 23 +--- .../runtime/pto_async_kernel_api.h | 83 +++++++++----- .../runtime/pto_async_wait.h | 106 +++++------------- .../runtime/pto_completion_ingress.h | 9 +- .../runtime/pto_orchestrator.cpp | 7 +- .../runtime/pto_orchestrator.h | 2 +- .../runtime/pto_runtime2.cpp | 5 +- .../runtime/pto_runtime2.h | 4 +- .../runtime/pto_runtime2_types.h | 12 +- .../runtime/scheduler/pto_scheduler.h | 8 -- .../scheduler/scheduler_completion.cpp | 6 +- .../runtime/scheduler/scheduler_context.h | 9 +- .../runtime/scheduler/scheduler_dispatch.cpp | 22 ++-- tests/ut/cpp/a2a3/test_a2a3_fatal.cpp | 2 +- tests/ut/cpp/a5/test_a5_fatal.cpp | 2 +- 38 files changed, 296 insertions(+), 386 deletions(-) diff --git a/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp b/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp index 6a6cc120d..bc8f1cd86 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp @@ -24,7 +24,8 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { uint64_t notify_counter_addr = static_cast(args[1]); uint32_t expected_value = static_cast(args[2]); - pto2_save_expected_notification_counter( - args, reinterpret_cast(notify_counter_addr), expected_value + AsyncCtx async_ctx = get_async_ctx(args); + save_expected_notification_counter( + async_ctx, reinterpret_cast(notify_counter_addr), expected_value ); } diff --git a/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp b/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp index d4e8a68ca..89a6baaae 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp @@ -52,7 +52,7 @@ __attribute__((visibility("default"))) void async_notify_orchestration(const Chi params_notify.add_output(notify_token_info); params_notify.add_scalar(notify_counter.buffer.addr); params_notify.add_scalar(static_cast(1)); - TaskOutputTensors notify_outputs = rt_submit_aiv_task_deferred(2, params_notify); + TaskOutputTensors notify_outputs = rt_submit_aiv_task(2, params_notify); Tensor notify_token = notify_outputs.get_ref(0); Arg params_consumer; diff --git a/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp b/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp index 807fce920..2a4d5cbf2 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp @@ -25,7 +25,8 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { uint64_t counter_addr = static_cast(args[1]); uint32_t expected_value = static_cast(args[2]); - pto2_save_expected_notification_counter( - args, reinterpret_cast(counter_addr), expected_value + AsyncCtx async_ctx = get_async_ctx(args); + save_expected_notification_counter( + async_ctx, reinterpret_cast(counter_addr), expected_value ); } diff --git a/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp b/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp index 28ddaf808..23a82fc4f 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp @@ -55,7 +55,7 @@ __attribute__((visibility("default"))) void deferred_notify_orchestration(const params_notify.add_output(notify_token_info); params_notify.add_scalar(notify_counter.buffer.addr); params_notify.add_scalar(static_cast(1)); - TaskOutputTensors notify_outputs = rt_submit_aiv_task_deferred(2, params_notify); + TaskOutputTensors notify_outputs = rt_submit_aiv_task(2, params_notify); Tensor notify_token = notify_outputs.get_ref(0); Arg params_consumer; diff --git a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp index 6a6cc120d..bc8f1cd86 100644 --- a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp +++ b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_notify_wait.cpp @@ -24,7 +24,8 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { uint64_t notify_counter_addr = static_cast(args[1]); uint32_t expected_value = static_cast(args[2]); - pto2_save_expected_notification_counter( - args, reinterpret_cast(notify_counter_addr), expected_value + AsyncCtx async_ctx = get_async_ctx(args); + save_expected_notification_counter( + async_ctx, reinterpret_cast(notify_counter_addr), expected_value ); } diff --git a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp index d4e8a68ca..89a6baaae 100644 --- a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp +++ b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/orchestration/async_notify_orchestration.cpp @@ -52,7 +52,7 @@ __attribute__((visibility("default"))) void async_notify_orchestration(const Chi params_notify.add_output(notify_token_info); params_notify.add_scalar(notify_counter.buffer.addr); params_notify.add_scalar(static_cast(1)); - TaskOutputTensors notify_outputs = rt_submit_aiv_task_deferred(2, params_notify); + TaskOutputTensors notify_outputs = rt_submit_aiv_task(2, params_notify); Tensor notify_token = notify_outputs.get_ref(0); Arg params_consumer; diff --git a/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp b/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp index 807fce920..2a4d5cbf2 100644 --- a/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp +++ b/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/aiv/kernel_notify_wait.cpp @@ -25,7 +25,8 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { uint64_t counter_addr = static_cast(args[1]); uint32_t expected_value = static_cast(args[2]); - pto2_save_expected_notification_counter( - args, reinterpret_cast(counter_addr), expected_value + AsyncCtx async_ctx = get_async_ctx(args); + save_expected_notification_counter( + async_ctx, reinterpret_cast(counter_addr), expected_value ); } diff --git a/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp b/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp index 28ddaf808..23a82fc4f 100644 --- a/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp +++ b/examples/a5/tensormap_and_ringbuffer/deferred_notify_demo/kernels/orchestration/deferred_notify_orch.cpp @@ -55,7 +55,7 @@ __attribute__((visibility("default"))) void deferred_notify_orchestration(const params_notify.add_output(notify_token_info); params_notify.add_scalar(notify_counter.buffer.addr); params_notify.add_scalar(static_cast(1)); - TaskOutputTensors notify_outputs = rt_submit_aiv_task_deferred(2, params_notify); + TaskOutputTensors notify_outputs = rt_submit_aiv_task(2, params_notify); Tensor notify_token = notify_outputs.get_ref(0); Arg params_consumer; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/common/intrinsic.h b/src/a2a3/runtime/tensormap_and_ringbuffer/common/intrinsic.h index bf99e4754..43e360282 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/common/intrinsic.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/common/intrinsic.h @@ -52,6 +52,7 @@ #include +#include "pto_completion_ingress.h" #include "pto_task_id.h" #ifndef __gm__ @@ -65,8 +66,6 @@ /** Number of extra pointer slots appended to the args[] tail (LocalContext + GlobalContext). */ static constexpr int32_t PTO2_EXT_PARAMS_COUNT = 2; -struct PTO2DeferredCompletionIngressBuffer; - /** * Args[] suffix indices for context pointers. * Derived from MAX_TENSOR_ARGS(16) + MAX_SCALAR_ARGS(32). @@ -89,6 +88,28 @@ struct GlobalContext { int32_t sub_block_id; }; +struct AsyncCtx { + volatile __gm__ uint32_t *completion_count; + volatile __gm__ int32_t *completion_error_code; + volatile __gm__ PTO2DeferredCompletionEntry *completion_entries; + uint32_t completion_capacity; + PTO2TaskId task_token; + + static inline AsyncCtx make(PTO2TaskId task_token, volatile __gm__ PTO2DeferredCompletionIngressBuffer *buffer) { + AsyncCtx ctx{}; + ctx.task_token = task_token; + if (buffer == nullptr) { + ctx.task_token = PTO2TaskId::invalid(); + return ctx; + } + ctx.completion_count = &buffer->count; + ctx.completion_error_code = &buffer->error_code; + ctx.completion_entries = &buffer->entries[0]; + ctx.completion_capacity = PTO2_MAX_COMPLETIONS_PER_TASK; + return ctx; + } +}; + /** * Per-dispatch local context, stored in PTO2DispatchPayload. * Written by build_payload() before each dispatch. Different blocks of the @@ -101,9 +122,7 @@ struct LocalContext { // Currently fixed to 1 (block_dim > 1 not yet implemented). // NOT the same as RUNTIME_CONFIG.block_dim in kernel_config.py, // which controls how many physical cores the runtime launches. - PTO2TaskId task_token; - volatile __gm__ PTO2DeferredCompletionIngressBuffer *deferred_ingress; - uint32_t deferred_completion_capacity; + AsyncCtx async_ctx; }; /** diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h index 2a2facf43..2d61dce4d 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h @@ -115,9 +115,7 @@ void framework_bind_runtime(PTO2Runtime *rt); * Populated by the runtime; called by orchestration through inline wrappers. */ typedef struct PTO2RuntimeOps { - TaskOutputTensors (*submit_task)( - PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future - ); + TaskOutputTensors (*submit_task)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); void (*scope_begin)(PTO2Runtime *rt); void (*scope_end)(PTO2Runtime *rt); void (*orchestration_done)(PTO2Runtime *rt); @@ -209,17 +207,12 @@ static inline TaskOutputTensors alloc_tensors(const CIs &...cis) { return alloc_tensors(args); } -static inline TaskOutputTensors -rt_submit_task_impl(const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future) { +static inline TaskOutputTensors rt_submit_task(const MixedKernels &mixed_kernels, const Arg &args) { PTO2Runtime *rt = current_runtime(); if (rt->ops->is_fatal(rt)) { return TaskOutputTensors{}; } - return rt->ops->submit_task(rt, mixed_kernels, args, complete_in_future); -} - -static inline TaskOutputTensors rt_submit_task(const MixedKernels &mixed_kernels, const Arg &args) { - return rt_submit_task_impl(mixed_kernels, args, false); + return rt->ops->submit_task(rt, mixed_kernels, args); } /** @@ -228,7 +221,7 @@ static inline TaskOutputTensors rt_submit_task(const MixedKernels &mixed_kernels static inline TaskOutputTensors rt_submit_aic_task(int32_t kernel_id, const Arg &args) { MixedKernels mk; mk.aic_kernel_id = kernel_id; - return rt_submit_task_impl(mk, args, false); + return rt_submit_task(mk, args); } /** @@ -237,13 +230,7 @@ static inline TaskOutputTensors rt_submit_aic_task(int32_t kernel_id, const Arg static inline TaskOutputTensors rt_submit_aiv_task(int32_t kernel_id, const Arg &args) { MixedKernels mk; mk.aiv0_kernel_id = kernel_id; - return rt_submit_task_impl(mk, args, false); -} - -static inline TaskOutputTensors rt_submit_aiv_task_deferred(int32_t kernel_id, const Arg &args) { - MixedKernels mk; - mk.aiv0_kernel_id = kernel_id; - return rt_submit_task_impl(mk, args, true); + return rt_submit_task(mk, args); } static inline void rt_scope_begin(PTO2ScopeMode mode = PTO2ScopeMode::AUTO) { diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h index e41e52a46..b3bb01f6e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h @@ -28,49 +28,78 @@ #define __gm__ #endif -struct PTO2AsyncCtx { - volatile __gm__ PTO2DeferredCompletionIngressBuffer *ingress; - uint32_t entry_capacity; - PTO2TaskId task_token; -}; +inline __aicore__ void defer_load_ingress(AsyncCtx &ctx) { + if (ctx.completion_count == nullptr) return; +#if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) + uintptr_t line = reinterpret_cast(ctx.completion_count) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); + dcci((__gm__ int32_t *)line, SINGLE_CACHE_LINE); +#else + __asm__ __volatile__("" ::: "memory"); +#endif +} -inline __aicore__ PTO2AsyncCtx pto2_async_ctx(__gm__ int64_t *args) { +inline __aicore__ AsyncCtx get_async_ctx(__gm__ int64_t *args) { __gm__ LocalContext *lc = reinterpret_cast<__gm__ LocalContext *>(static_cast(args[PAYLOAD_LOCAL_CONTEXT_INDEX])); - PTO2AsyncCtx ctx; - ctx.ingress = lc->deferred_ingress; - ctx.entry_capacity = lc->deferred_completion_capacity; - ctx.task_token.raw = lc->task_token.raw; + AsyncCtx ctx = lc->async_ctx; + defer_load_ingress(ctx); return ctx; } -inline __aicore__ void pto2_defer_condition( - PTO2AsyncCtx &ctx, volatile __gm__ void *addr, uint32_t expected, uint32_t engine, int32_t completion_type +inline __aicore__ void defer_condition( + AsyncCtx &ctx, volatile __gm__ void *addr, uint32_t expected, uint32_t engine, int32_t completion_type ) { - if (ctx.task_token.is_invalid() || ctx.ingress == nullptr) { + if (ctx.task_token.is_invalid() || ctx.completion_count == nullptr || ctx.completion_entries == nullptr) { return; } - uint32_t idx = ctx.ingress->count; - if (idx >= ctx.entry_capacity) { - ctx.ingress->error_code = PTO2_ERROR_ASYNC_WAIT_OVERFLOW; + uint32_t idx = *ctx.completion_count; + if (idx >= ctx.completion_capacity) { + if (ctx.completion_error_code != nullptr) { + *ctx.completion_error_code = PTO2_ERROR_ASYNC_WAIT_OVERFLOW; + } return; } - volatile __gm__ PTO2DeferredCompletionEntry *slot = &ctx.ingress->entries[idx]; + volatile __gm__ PTO2DeferredCompletionEntry *slot = &ctx.completion_entries[idx]; slot->addr = reinterpret_cast(addr); slot->expected_value = expected; slot->engine = engine; slot->completion_type = completion_type; slot->_pad = 0; - ctx.ingress->count = idx + 1; + *ctx.completion_count = idx + 1; +} + +inline __aicore__ void defer_flush_range(volatile __gm__ void *addr, uint32_t size_bytes) { + if (addr == nullptr || size_bytes == 0) return; +#if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) + uintptr_t start = reinterpret_cast(addr) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); + uintptr_t end = + (reinterpret_cast(addr) + size_bytes + PTO2_ALIGN_SIZE - 1u) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); + for (uintptr_t p = start; p < end; p += PTO2_ALIGN_SIZE) { + dcci((__gm__ int32_t *)p, SINGLE_CACHE_LINE, CACHELINE_OUT); + } +#else + (void)addr; + (void)size_bytes; +#endif } -inline __aicore__ void pto2_defer_flush(PTO2AsyncCtx &ctx) { - if (ctx.task_token.is_invalid() || ctx.ingress == nullptr) return; +inline __aicore__ void defer_flush(AsyncCtx &ctx) { + if (ctx.task_token.is_invalid() || ctx.completion_count == nullptr) return; #if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) - dcci((__gm__ int32_t *)ctx.ingress->entries, ENTIRE_DATA_CACHE, CACHELINE_OUT); - dcci((__gm__ int32_t *)ctx.ingress, SINGLE_CACHE_LINE, CACHELINE_OUT); + uint32_t count = *ctx.completion_count; + if (count > ctx.completion_capacity) { + count = ctx.completion_capacity; + } + uint32_t flush_bytes = static_cast(sizeof(*ctx.completion_count)); + if (ctx.completion_error_code != nullptr) { + flush_bytes += static_cast(sizeof(*ctx.completion_error_code)); + } + if (ctx.completion_entries != nullptr) { + flush_bytes += count * static_cast(sizeof(PTO2DeferredCompletionEntry)); + } + defer_flush_range(ctx.completion_count, flush_bytes); #if defined(__CPU_SIM) dsb(0); #else @@ -90,12 +119,10 @@ pto2_send_notification(volatile __gm__ void *remote_counter_addr, int32_t value, pto::comm::TNOTIFY(signal, value, notify_op); } -inline __aicore__ void pto2_save_expected_notification_counter( - __gm__ int64_t *args, volatile __gm__ void *counter_addr, uint32_t expected_value -) { - PTO2AsyncCtx ctx = pto2_async_ctx(args); - pto2_defer_condition(ctx, counter_addr, expected_value, PTO2_COMPLETION_ENGINE_SDMA, PTO2_COMPLETION_TYPE_COUNTER); - pto2_defer_flush(ctx); +inline __aicore__ void +save_expected_notification_counter(AsyncCtx &ctx, volatile __gm__ void *counter_addr, uint32_t expected_value) { + defer_condition(ctx, counter_addr, expected_value, PTO2_COMPLETION_ENGINE_SDMA, PTO2_COMPLETION_TYPE_COUNTER); + defer_flush(ctx); } #endif // PTO_ASYNC_KERNEL_API_H diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h index a0406fb63..fe4ba9ef1 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h @@ -16,8 +16,7 @@ #include #include -#include "aicpu/platform_regs.h" -#include "common/memory_barrier.h" +#include "intrinsic.h" #include "pto_completion_ingress.h" #include "pto_runtime2_types.h" @@ -28,43 +27,8 @@ struct PTO2CompletionStats; inline constexpr int32_t PTO2_MAX_ASYNC_WAITS = 64; inline constexpr int32_t PTO2_MAX_PENDING_COMPLETIONS = 128; -inline uintptr_t completion_ingress_cache_line(const volatile void *addr) { - return reinterpret_cast(addr) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); -} - -inline void completion_ingress_invalidate_entries( - volatile PTO2CompletionIngressQueue *completion_ingress, uint64_t tail, uint64_t head -) { - uint64_t active_count = head - tail; - if (active_count == 0) return; - - uint64_t tail_index = tail & PTO2_COMPLETION_INGRESS_MASK; - uint64_t first_span = active_count; - uint64_t contiguous_capacity = PTO2_COMPLETION_INGRESS_CAPACITY - tail_index; - if (first_span > contiguous_capacity) { - first_span = contiguous_capacity; - } - - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->entries[tail_index])), - first_span * sizeof(PTO2CompletionIngressEntry) - ); - - uint64_t remaining = active_count - first_span; - if (remaining == 0) return; - - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->entries[0])), - remaining * sizeof(PTO2CompletionIngressEntry) - ); -} - inline bool completion_ingress_has_pending(volatile PTO2CompletionIngressQueue *completion_ingress) { if (completion_ingress == nullptr) return false; - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->head)), - sizeof(completion_ingress->head) - ); uint64_t head = __atomic_load_n(&completion_ingress->head, __ATOMIC_ACQUIRE); uint64_t tail = __atomic_load_n(&completion_ingress->tail, __ATOMIC_ACQUIRE); return tail < head; @@ -167,13 +131,8 @@ struct PTO2AsyncWaitList { int32_t drained = 0; while (true) { uint64_t tail = __atomic_load_n(&completion_ingress->tail, __ATOMIC_ACQUIRE); - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->head)), - sizeof(completion_ingress->head) - ); uint64_t head_snapshot = __atomic_load_n(&completion_ingress->head, __ATOMIC_ACQUIRE); if (tail >= head_snapshot) break; - completion_ingress_invalidate_entries(completion_ingress, tail, head_snapshot); while (tail < head_snapshot) { volatile PTO2CompletionIngressEntry *slot = @@ -187,14 +146,8 @@ struct PTO2AsyncWaitList { uint32_t expected_value = slot->expected_value; PTO2AsyncEngine engine = static_cast(slot->engine); - slot->seq = 0; - completion_ingress->tail = tail + 1; - OUT_OF_ORDER_STORE_BARRIER(); - cache_flush_range(const_cast(reinterpret_cast(slot)), sizeof(*slot)); - cache_flush_range( - const_cast(reinterpret_cast(&completion_ingress->tail)), - sizeof(completion_ingress->tail) - ); + __atomic_store_n(&slot->seq, 0, __ATOMIC_RELEASE); + __atomic_store_n(&completion_ingress->tail, tail + 1, __ATOMIC_RELEASE); drained++; tail++; @@ -266,47 +219,44 @@ struct PTO2AsyncWaitList { return true; } - RegisterResult register_deferred( - PTO2TaskSlotState &slot_state, volatile PTO2DeferredCompletionIngressBuffer *ingress, bool normal_done, - int32_t &error_code - ) { + RegisterResult + register_deferred(PTO2TaskSlotState &slot_state, const AsyncCtx &async_ctx, bool normal_done, int32_t &error_code) { error_code = PTO2_ERROR_NONE; - PTO2TaskPayload *payload = slot_state.payload; - if (payload == nullptr || !payload->complete_in_future) { + if (slot_state.payload == nullptr) { return RegisterResult::NotDeferred; } if (!try_lock()) return RegisterResult::Skipped; uint32_t deferred_count = 0; - if (ingress != nullptr) { - cache_invalidate_range( - const_cast(reinterpret_cast(ingress)), PTO2_ALIGN_SIZE - ); - if (ingress->error_code != PTO2_ERROR_NONE) { - error_code = ingress->error_code; - unlock(); - return RegisterResult::Error; + if (async_ctx.completion_count != nullptr) { + if (async_ctx.completion_error_code != nullptr) { + if (*async_ctx.completion_error_code != PTO2_ERROR_NONE) { + error_code = *async_ctx.completion_error_code; + unlock(); + return RegisterResult::Error; + } } - deferred_count = ingress->count; + deferred_count = *async_ctx.completion_count; } - if (deferred_count > PTO2_MAX_COMPLETIONS_PER_TASK) { + if (deferred_count > async_ctx.completion_capacity) { error_code = PTO2_ERROR_ASYNC_REGISTRATION_FAILED; unlock(); return RegisterResult::Error; } - if (deferred_count > 0 && ingress != nullptr) { - cache_invalidate_range( - const_cast(reinterpret_cast(ingress->entries)), - deferred_count * sizeof(PTO2DeferredCompletionEntry) - ); + if (deferred_count > 0) { + if (async_ctx.completion_entries == nullptr) { + error_code = PTO2_ERROR_ASYNC_REGISTRATION_FAILED; + unlock(); + return RegisterResult::Error; + } } PTO2AsyncWaitEntry *entry = find_entry_by_token(slot_state.task->task_id); + if (entry == nullptr && deferred_count == 0) { + unlock(); + return RegisterResult::NotDeferred; + } if (entry == nullptr) { - if (!normal_done && deferred_count == 0) { - unlock(); - return RegisterResult::Registered; - } if (count >= PTO2_MAX_ASYNC_WAITS) { error_code = PTO2_ERROR_ASYNC_WAIT_OVERFLOW; unlock(); @@ -324,11 +274,7 @@ struct PTO2AsyncWaitList { } for (uint32_t i = 0; i < deferred_count; ++i) { - volatile PTO2DeferredCompletionEntry *deferred = &ingress->entries[i]; - volatile uint32_t *counter = reinterpret_cast(static_cast(deferred->addr)); - cache_invalidate_range( - reinterpret_cast(completion_ingress_cache_line(counter)), sizeof(uint32_t) - ); + volatile PTO2DeferredCompletionEntry *deferred = &async_ctx.completion_entries[i]; if (!append_condition_locked( *entry, deferred->addr, deferred->expected_value, static_cast(deferred->engine), error_code diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h index cb32de6c4..3f120251d 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h @@ -56,16 +56,15 @@ struct PTO2DeferredCompletionEntry { static_assert(sizeof(PTO2DeferredCompletionEntry) == 24, "PTO2DeferredCompletionEntry layout drift"); -struct PTO2DeferredCompletionIngressBuffer { - alignas(PTO2_ALIGN_SIZE) volatile uint32_t count; +struct alignas(PTO2_ALIGN_SIZE) PTO2DeferredCompletionIngressBuffer { + volatile uint32_t count; volatile int32_t error_code; - uint8_t _header_pad[PTO2_ALIGN_SIZE - sizeof(uint32_t) - sizeof(int32_t)]; - alignas(PTO2_ALIGN_SIZE) PTO2DeferredCompletionEntry entries[PTO2_MAX_COMPLETIONS_PER_TASK]; + PTO2DeferredCompletionEntry entries[PTO2_MAX_COMPLETIONS_PER_TASK]; }; static_assert( sizeof(PTO2DeferredCompletionIngressBuffer) % PTO2_ALIGN_SIZE == 0, - "PTO2DeferredCompletionIngressBuffer size must be cache-line aligned" + "PTO2DeferredCompletionIngressBuffer size must preserve array element cache-line boundaries" ); struct PTO2CompletionIngressQueue { diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 2e9f90c69..964db3cbb 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -503,8 +503,7 @@ void PTO2OrchestratorState::end_scope() { // ============================================================================= // Task Submission // ============================================================================= -TaskOutputTensors -PTO2OrchestratorState::submit_task(const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future) { +TaskOutputTensors PTO2OrchestratorState::submit_task(const MixedKernels &mixed_kernels, const Arg &args) { auto *orch = this; CYCLE_COUNT_START(); @@ -718,7 +717,7 @@ PTO2OrchestratorState::submit_task(const MixedKernels &mixed_kernels, const Arg payload.fanin_inline_slot_states[i] = fanin_builder.inline_slots[i]; } - payload.init(args, result, prepared.alloc_result, layout, complete_in_future); + payload.init(args, result, prepared.alloc_result, layout); CYCLE_COUNT_LAP_RECORD(g_orch_args_cycle, AicpuPhaseId::ORCH_PARAMS, task_id.raw); #if PTO2_ORCH_PROFILING @@ -808,7 +807,7 @@ TaskOutputTensors PTO2OrchestratorState::alloc_tensors(const Arg &args) { TaskOutputTensors outputs; outputs.set_task_id(prepared.task_id); - payload.init(args, outputs, prepared.alloc_result, layout, false); + payload.init(args, outputs, prepared.alloc_result, layout); payload.fanin_actual_count = 0; payload.fanin_spill_start = 0; payload.fanin_spill_pool = &orch->rings[prepared.task_id.ring()].fanin_pool; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 0d21ccc44..dc1ecf461 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -125,7 +125,7 @@ struct PTO2OrchestratorState { void report_fatal(int32_t error_code, const char *func, const char *fmt, ...); void begin_scope(PTO2ScopeMode mode = PTO2ScopeMode::AUTO); void end_scope(); - TaskOutputTensors submit_task(const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future); + TaskOutputTensors submit_task(const MixedKernels &mixed_kernels, const Arg &args); TaskOutputTensors alloc_tensors(const Arg &args); void mark_done(); }; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index c2402bca2..742ced6cf 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -38,9 +38,8 @@ __attribute__((weak, visibility("hidden"))) uint64_t get_sys_cnt_aicpu() { retur // Orchestration Ops Table (function-pointer dispatch for orchestration .so) // ============================================================================= -static TaskOutputTensors -submit_task_impl(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future) { - return rt->orchestrator.submit_task(mixed_kernels, args, complete_in_future); +static TaskOutputTensors submit_task_impl(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args) { + return rt->orchestrator.submit_task(mixed_kernels, args); } static TaskOutputTensors alloc_tensors_impl(PTO2Runtime *rt, const Arg &args) { diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h index ca43bd75b..917c9c5ed 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h @@ -67,9 +67,7 @@ enum PTO2RuntimeMode { typedef struct PTO2Runtime PTO2Runtime; // forward declare for ops signatures struct PTO2RuntimeOps { - TaskOutputTensors (*submit_task)( - PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future - ); + TaskOutputTensors (*submit_task)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); void (*scope_begin)(PTO2Runtime *rt); void (*scope_end)(PTO2Runtime *rt); void (*orchestration_done)(PTO2Runtime *rt); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index bcf36efd9..1ec87cae0 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -242,7 +242,6 @@ struct PTO2TaskPayload { int32_t scalar_count{0}; int32_t fanin_actual_count{0}; // Actual fanin count (without the +1 redundance) int32_t fanin_spill_start{0}; // Linear start index in fanin spill pool (0 = no spill) - bool complete_in_future{false}; PTO2FaninPool *fanin_spill_pool{nullptr}; PTO2TaskSlotState *fanin_inline_slot_states[PTO2_FANIN_INLINE_CAP]; // === Cache lines 9-40 (2048B) — tensors (alignas(64) forces alignment) === @@ -264,10 +263,7 @@ struct PTO2TaskPayload { * @param args Task arguments (tensors + scalars) * @param result Materialized output tensors (from TensorCreateInfo path) */ - void init( - const Arg &args, TaskOutputTensors &result, PTO2TaskAllocResult &alloc_result, PTO2OutputLayout &layout, - bool complete_in_future_flag - ) { + void init(const Arg &args, TaskOutputTensors &result, PTO2TaskAllocResult &alloc_result, PTO2OutputLayout &layout) { tensor_count = args.tensor_count(); scalar_count = args.scalar_count(); @@ -289,15 +285,13 @@ struct PTO2TaskPayload { // Round up to cache line boundary. Both arrays are 1024B so no overrun. // Eliminates branches; extra bytes within the same CL have zero additional cost. memcpy(scalars, args.scalars(), PTO2_ALIGN_UP(args.scalar_count() * sizeof(uint64_t), 64)); - complete_in_future = complete_in_future_flag; } }; // PTO2TaskPayload layout verification (offsetof requires complete type). -static_assert(offsetof(PTO2TaskPayload, complete_in_future) == 16, "deferred flag must stay in the first cache line"); -static_assert(offsetof(PTO2TaskPayload, fanin_spill_pool) == 24, "spill pool pointer layout drift"); +static_assert(offsetof(PTO2TaskPayload, fanin_spill_pool) == 16, "spill pool pointer layout drift"); static_assert( - offsetof(PTO2TaskPayload, fanin_inline_slot_states) == 32, "inline fanin array must follow spill metadata" + offsetof(PTO2TaskPayload, fanin_inline_slot_states) == 24, "inline fanin array must follow spill metadata" ); static_assert(offsetof(PTO2TaskPayload, tensors) == 576, "tensors must start at byte 576 (cache line 9)"); static_assert( diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index b5eb29f0b..344512b7a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -1019,17 +1019,9 @@ inline PTO2AsyncPollResult PTO2AsyncWaitList::poll_and_complete( for (int32_t i = count - 1; i >= 0; --i) { PTO2AsyncWaitEntry &entry = entries[i]; - uintptr_t last_invalidated_counter_line = static_cast(-1); for (int32_t c = 0; c < entry.condition_count; c++) { PTO2CompletionCondition &cond = entry.conditions[c]; if (cond.satisfied) continue; - if (cond.counter_addr) { - uintptr_t counter_line = completion_ingress_cache_line(cond.counter_addr); - if (counter_line != last_invalidated_counter_line) { - cache_invalidate_range(reinterpret_cast(counter_line), sizeof(uint32_t)); - last_invalidated_counter_line = counter_line; - } - } PTO2CompletionPollResult poll = cond.test(); if (poll.state == PTO2CompletionPollState::FAILED) { result.error_code = poll.error_code; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp index 0cfbebd20..87a68b72a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp @@ -81,14 +81,14 @@ void SchedulerContext::complete_slot_task( (void)hank; #endif bool mixed_complete = sched_->on_subtask_complete(slot_state); - if (slot_state.payload != nullptr && slot_state.payload->complete_in_future) { + if (slot_state.payload != nullptr) { int32_t reg_err = PTO2_ERROR_NONE; PTO2AsyncWaitList::RegisterResult reg_result; volatile PTO2DeferredCompletionIngressBuffer *deferred_ingress = &deferred_ingress_per_core_[core_id][expected_reg_task_id & 1]; + AsyncCtx async_ctx = AsyncCtx::make(slot_state.task->task_id, deferred_ingress); do { - reg_result = - sched_->async_wait_list.register_deferred(slot_state, deferred_ingress, mixed_complete, reg_err); + reg_result = sched_->async_wait_list.register_deferred(slot_state, async_ctx, mixed_complete, reg_err); if (reg_result == PTO2AsyncWaitList::RegisterResult::Skipped) { SPIN_WAIT_HINT(); } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h index 1ee1c92cc..61d80e087 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h @@ -124,9 +124,10 @@ class SchedulerContext { // buf_idx = reg_task_id & 1; adjacent dispatches alternate automatically. PTO2DispatchPayload payload_per_core_[RUNTIME_MAX_WORKER][2]; - // Per-core deferred-completion ingress storage. This has the same runtime - // lifetime as payload_per_core_, but is kept out of the dispatch payload so - // normal task dispatch layout and cache footprint stay unchanged. + // Per-core deferred-completion software registration storage. This has + // the same runtime lifetime as payload_per_core_, but is kept out of the + // dispatch payload so normal task dispatch layout and cache footprint stay + // unchanged. PTO2DeferredCompletionIngressBuffer deferred_ingress_per_core_[RUNTIME_MAX_WORKER][2]; // sync_start drain coordination @@ -208,7 +209,7 @@ class SchedulerContext { void build_payload( PTO2DispatchPayload &dispatch_payload, PTO2TaskSlotState &slot_state, PTO2SubtaskSlot subslot, - PTO2DeferredCompletionIngressBuffer *deferred_ingress + const AsyncCtx &async_ctx ); void dispatch_subtask_to_core( diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp index 3dc49e7bb..e9fcafd90 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp @@ -88,7 +88,7 @@ int SchedulerContext::pop_ready_tasks_batch( void SchedulerContext::build_payload( PTO2DispatchPayload &dispatch_payload, PTO2TaskSlotState &slot_state, PTO2SubtaskSlot subslot, - PTO2DeferredCompletionIngressBuffer *deferred_ingress + const AsyncCtx &async_ctx ) { int32_t slot_idx = static_cast(subslot); uint64_t callable_addr = get_function_bin_addr(slot_state.task->kernel_id[slot_idx]); @@ -104,11 +104,7 @@ void SchedulerContext::build_payload( } dispatch_payload.local_context.block_idx = slot_state.next_block_idx; dispatch_payload.local_context.block_num = slot_state.logical_block_num; - dispatch_payload.local_context.task_token = - payload.complete_in_future ? slot_state.task->task_id : PTO2TaskId::invalid(); - dispatch_payload.local_context.deferred_ingress = payload.complete_in_future ? deferred_ingress : nullptr; - dispatch_payload.local_context.deferred_completion_capacity = - payload.complete_in_future ? PTO2_MAX_COMPLETIONS_PER_TASK : 0; + dispatch_payload.local_context.async_ctx = async_ctx; dispatch_payload.args[PAYLOAD_LOCAL_CONTEXT_INDEX] = reinterpret_cast(&dispatch_payload.local_context); dispatch_payload.args[PAYLOAD_GLOBAL_CONTEXT_INDEX] = reinterpret_cast(&dispatch_payload.global_context); } @@ -137,15 +133,11 @@ void SchedulerContext::dispatch_subtask_to_core( uint32_t buf_idx = reg_task_id & 1u; PTO2DispatchPayload &payload = payload_per_core_[core_id][buf_idx]; - PTO2DeferredCompletionIngressBuffer *deferred_ingress = nullptr; - if (slot_state.payload != nullptr && slot_state.payload->complete_in_future) { - deferred_ingress = &deferred_ingress_per_core_[core_id][buf_idx]; - deferred_ingress->count = 0; - deferred_ingress->error_code = PTO2_ERROR_NONE; - OUT_OF_ORDER_STORE_BARRIER(); - cache_flush_range(deferred_ingress, PTO2_ALIGN_SIZE); - } - build_payload(payload, slot_state, subslot, deferred_ingress); + PTO2DeferredCompletionIngressBuffer *deferred_ingress = &deferred_ingress_per_core_[core_id][buf_idx]; + deferred_ingress->count = 0; + deferred_ingress->error_code = PTO2_ERROR_NONE; + AsyncCtx async_ctx = AsyncCtx::make(slot_state.task->task_id, deferred_ingress); + build_payload(payload, slot_state, subslot, async_ctx); if (to_pending) { core_exec_state.pending_subslot = subslot; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/common/intrinsic.h b/src/a5/runtime/tensormap_and_ringbuffer/common/intrinsic.h index 23ff25578..759d29c63 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/common/intrinsic.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/common/intrinsic.h @@ -52,6 +52,7 @@ #include +#include "pto_completion_ingress.h" #include "pto_task_id.h" #ifndef __gm__ @@ -65,8 +66,6 @@ /** Number of extra pointer slots appended to the args[] tail (LocalContext + GlobalContext). */ static constexpr int32_t PTO2_EXT_PARAMS_COUNT = 2; -struct PTO2DeferredCompletionIngressBuffer; - /** * Args[] suffix indices for context pointers. * Derived from MAX_TENSOR_ARGS(16) + MAX_SCALAR_ARGS(32). @@ -89,6 +88,28 @@ struct GlobalContext { int32_t sub_block_id; }; +struct AsyncCtx { + volatile __gm__ uint32_t *completion_count; + volatile __gm__ int32_t *completion_error_code; + volatile __gm__ PTO2DeferredCompletionEntry *completion_entries; + uint32_t completion_capacity; + PTO2TaskId task_token; + + static inline AsyncCtx make(PTO2TaskId task_token, volatile __gm__ PTO2DeferredCompletionIngressBuffer *buffer) { + AsyncCtx ctx{}; + ctx.task_token = task_token; + if (buffer == nullptr) { + ctx.task_token = PTO2TaskId::invalid(); + return ctx; + } + ctx.completion_count = &buffer->count; + ctx.completion_error_code = &buffer->error_code; + ctx.completion_entries = &buffer->entries[0]; + ctx.completion_capacity = PTO2_MAX_COMPLETIONS_PER_TASK; + return ctx; + } +}; + /** * Per-dispatch local context, stored in PTO2DispatchPayload. * Written by build_payload() before each dispatch. Different blocks of the @@ -104,9 +125,7 @@ struct LocalContext { // Currently fixed to 1 (block_dim > 1 not yet implemented). // NOT the same as RUNTIME_CONFIG.block_dim in kernel_config.py, // which controls how many physical cores the runtime launches. - PTO2TaskId task_token; - volatile __gm__ PTO2DeferredCompletionIngressBuffer *deferred_ingress; - uint32_t deferred_completion_capacity; + AsyncCtx async_ctx; }; /** diff --git a/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h b/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h index b74c238ef..59fc1a8c2 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h @@ -115,9 +115,7 @@ void framework_bind_runtime(PTO2Runtime *rt); * Populated by the runtime; called by orchestration through inline wrappers. */ typedef struct PTO2RuntimeOps { - TaskOutputTensors (*submit_task)( - PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future - ); + TaskOutputTensors (*submit_task)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); void (*scope_begin)(PTO2Runtime *rt); void (*scope_end)(PTO2Runtime *rt); void (*orchestration_done)(PTO2Runtime *rt); @@ -209,17 +207,12 @@ static inline TaskOutputTensors alloc_tensors(const CIs &...cis) { return alloc_tensors(args); } -static inline TaskOutputTensors -rt_submit_task_impl(const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future) { +static inline TaskOutputTensors rt_submit_task(const MixedKernels &mixed_kernels, const Arg &args) { PTO2Runtime *rt = current_runtime(); if (rt->ops->is_fatal(rt)) { return TaskOutputTensors{}; } - return rt->ops->submit_task(rt, mixed_kernels, args, complete_in_future); -} - -static inline TaskOutputTensors rt_submit_task(const MixedKernels &mixed_kernels, const Arg &args) { - return rt_submit_task_impl(mixed_kernels, args, false); + return rt->ops->submit_task(rt, mixed_kernels, args); } /** @@ -228,7 +221,7 @@ static inline TaskOutputTensors rt_submit_task(const MixedKernels &mixed_kernels static inline TaskOutputTensors rt_submit_aic_task(int32_t kernel_id, const Arg &args) { MixedKernels mk; mk.aic_kernel_id = kernel_id; - return rt_submit_task_impl(mk, args, false); + return rt_submit_task(mk, args); } /** @@ -237,13 +230,7 @@ static inline TaskOutputTensors rt_submit_aic_task(int32_t kernel_id, const Arg static inline TaskOutputTensors rt_submit_aiv_task(int32_t kernel_id, const Arg &args) { MixedKernels mk; mk.aiv0_kernel_id = kernel_id; - return rt_submit_task_impl(mk, args, false); -} - -static inline TaskOutputTensors rt_submit_aiv_task_deferred(int32_t kernel_id, const Arg &args) { - MixedKernels mk; - mk.aiv0_kernel_id = kernel_id; - return rt_submit_task_impl(mk, args, true); + return rt_submit_task(mk, args); } static inline void rt_scope_begin(PTO2ScopeMode mode = PTO2ScopeMode::AUTO) { diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h index e41e52a46..b3bb01f6e 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h @@ -28,49 +28,78 @@ #define __gm__ #endif -struct PTO2AsyncCtx { - volatile __gm__ PTO2DeferredCompletionIngressBuffer *ingress; - uint32_t entry_capacity; - PTO2TaskId task_token; -}; +inline __aicore__ void defer_load_ingress(AsyncCtx &ctx) { + if (ctx.completion_count == nullptr) return; +#if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) + uintptr_t line = reinterpret_cast(ctx.completion_count) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); + dcci((__gm__ int32_t *)line, SINGLE_CACHE_LINE); +#else + __asm__ __volatile__("" ::: "memory"); +#endif +} -inline __aicore__ PTO2AsyncCtx pto2_async_ctx(__gm__ int64_t *args) { +inline __aicore__ AsyncCtx get_async_ctx(__gm__ int64_t *args) { __gm__ LocalContext *lc = reinterpret_cast<__gm__ LocalContext *>(static_cast(args[PAYLOAD_LOCAL_CONTEXT_INDEX])); - PTO2AsyncCtx ctx; - ctx.ingress = lc->deferred_ingress; - ctx.entry_capacity = lc->deferred_completion_capacity; - ctx.task_token.raw = lc->task_token.raw; + AsyncCtx ctx = lc->async_ctx; + defer_load_ingress(ctx); return ctx; } -inline __aicore__ void pto2_defer_condition( - PTO2AsyncCtx &ctx, volatile __gm__ void *addr, uint32_t expected, uint32_t engine, int32_t completion_type +inline __aicore__ void defer_condition( + AsyncCtx &ctx, volatile __gm__ void *addr, uint32_t expected, uint32_t engine, int32_t completion_type ) { - if (ctx.task_token.is_invalid() || ctx.ingress == nullptr) { + if (ctx.task_token.is_invalid() || ctx.completion_count == nullptr || ctx.completion_entries == nullptr) { return; } - uint32_t idx = ctx.ingress->count; - if (idx >= ctx.entry_capacity) { - ctx.ingress->error_code = PTO2_ERROR_ASYNC_WAIT_OVERFLOW; + uint32_t idx = *ctx.completion_count; + if (idx >= ctx.completion_capacity) { + if (ctx.completion_error_code != nullptr) { + *ctx.completion_error_code = PTO2_ERROR_ASYNC_WAIT_OVERFLOW; + } return; } - volatile __gm__ PTO2DeferredCompletionEntry *slot = &ctx.ingress->entries[idx]; + volatile __gm__ PTO2DeferredCompletionEntry *slot = &ctx.completion_entries[idx]; slot->addr = reinterpret_cast(addr); slot->expected_value = expected; slot->engine = engine; slot->completion_type = completion_type; slot->_pad = 0; - ctx.ingress->count = idx + 1; + *ctx.completion_count = idx + 1; +} + +inline __aicore__ void defer_flush_range(volatile __gm__ void *addr, uint32_t size_bytes) { + if (addr == nullptr || size_bytes == 0) return; +#if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) + uintptr_t start = reinterpret_cast(addr) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); + uintptr_t end = + (reinterpret_cast(addr) + size_bytes + PTO2_ALIGN_SIZE - 1u) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); + for (uintptr_t p = start; p < end; p += PTO2_ALIGN_SIZE) { + dcci((__gm__ int32_t *)p, SINGLE_CACHE_LINE, CACHELINE_OUT); + } +#else + (void)addr; + (void)size_bytes; +#endif } -inline __aicore__ void pto2_defer_flush(PTO2AsyncCtx &ctx) { - if (ctx.task_token.is_invalid() || ctx.ingress == nullptr) return; +inline __aicore__ void defer_flush(AsyncCtx &ctx) { + if (ctx.task_token.is_invalid() || ctx.completion_count == nullptr) return; #if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) - dcci((__gm__ int32_t *)ctx.ingress->entries, ENTIRE_DATA_CACHE, CACHELINE_OUT); - dcci((__gm__ int32_t *)ctx.ingress, SINGLE_CACHE_LINE, CACHELINE_OUT); + uint32_t count = *ctx.completion_count; + if (count > ctx.completion_capacity) { + count = ctx.completion_capacity; + } + uint32_t flush_bytes = static_cast(sizeof(*ctx.completion_count)); + if (ctx.completion_error_code != nullptr) { + flush_bytes += static_cast(sizeof(*ctx.completion_error_code)); + } + if (ctx.completion_entries != nullptr) { + flush_bytes += count * static_cast(sizeof(PTO2DeferredCompletionEntry)); + } + defer_flush_range(ctx.completion_count, flush_bytes); #if defined(__CPU_SIM) dsb(0); #else @@ -90,12 +119,10 @@ pto2_send_notification(volatile __gm__ void *remote_counter_addr, int32_t value, pto::comm::TNOTIFY(signal, value, notify_op); } -inline __aicore__ void pto2_save_expected_notification_counter( - __gm__ int64_t *args, volatile __gm__ void *counter_addr, uint32_t expected_value -) { - PTO2AsyncCtx ctx = pto2_async_ctx(args); - pto2_defer_condition(ctx, counter_addr, expected_value, PTO2_COMPLETION_ENGINE_SDMA, PTO2_COMPLETION_TYPE_COUNTER); - pto2_defer_flush(ctx); +inline __aicore__ void +save_expected_notification_counter(AsyncCtx &ctx, volatile __gm__ void *counter_addr, uint32_t expected_value) { + defer_condition(ctx, counter_addr, expected_value, PTO2_COMPLETION_ENGINE_SDMA, PTO2_COMPLETION_TYPE_COUNTER); + defer_flush(ctx); } #endif // PTO_ASYNC_KERNEL_API_H diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h index a0406fb63..fe4ba9ef1 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h @@ -16,8 +16,7 @@ #include #include -#include "aicpu/platform_regs.h" -#include "common/memory_barrier.h" +#include "intrinsic.h" #include "pto_completion_ingress.h" #include "pto_runtime2_types.h" @@ -28,43 +27,8 @@ struct PTO2CompletionStats; inline constexpr int32_t PTO2_MAX_ASYNC_WAITS = 64; inline constexpr int32_t PTO2_MAX_PENDING_COMPLETIONS = 128; -inline uintptr_t completion_ingress_cache_line(const volatile void *addr) { - return reinterpret_cast(addr) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); -} - -inline void completion_ingress_invalidate_entries( - volatile PTO2CompletionIngressQueue *completion_ingress, uint64_t tail, uint64_t head -) { - uint64_t active_count = head - tail; - if (active_count == 0) return; - - uint64_t tail_index = tail & PTO2_COMPLETION_INGRESS_MASK; - uint64_t first_span = active_count; - uint64_t contiguous_capacity = PTO2_COMPLETION_INGRESS_CAPACITY - tail_index; - if (first_span > contiguous_capacity) { - first_span = contiguous_capacity; - } - - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->entries[tail_index])), - first_span * sizeof(PTO2CompletionIngressEntry) - ); - - uint64_t remaining = active_count - first_span; - if (remaining == 0) return; - - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->entries[0])), - remaining * sizeof(PTO2CompletionIngressEntry) - ); -} - inline bool completion_ingress_has_pending(volatile PTO2CompletionIngressQueue *completion_ingress) { if (completion_ingress == nullptr) return false; - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->head)), - sizeof(completion_ingress->head) - ); uint64_t head = __atomic_load_n(&completion_ingress->head, __ATOMIC_ACQUIRE); uint64_t tail = __atomic_load_n(&completion_ingress->tail, __ATOMIC_ACQUIRE); return tail < head; @@ -167,13 +131,8 @@ struct PTO2AsyncWaitList { int32_t drained = 0; while (true) { uint64_t tail = __atomic_load_n(&completion_ingress->tail, __ATOMIC_ACQUIRE); - cache_invalidate_range( - const_cast(reinterpret_cast(&completion_ingress->head)), - sizeof(completion_ingress->head) - ); uint64_t head_snapshot = __atomic_load_n(&completion_ingress->head, __ATOMIC_ACQUIRE); if (tail >= head_snapshot) break; - completion_ingress_invalidate_entries(completion_ingress, tail, head_snapshot); while (tail < head_snapshot) { volatile PTO2CompletionIngressEntry *slot = @@ -187,14 +146,8 @@ struct PTO2AsyncWaitList { uint32_t expected_value = slot->expected_value; PTO2AsyncEngine engine = static_cast(slot->engine); - slot->seq = 0; - completion_ingress->tail = tail + 1; - OUT_OF_ORDER_STORE_BARRIER(); - cache_flush_range(const_cast(reinterpret_cast(slot)), sizeof(*slot)); - cache_flush_range( - const_cast(reinterpret_cast(&completion_ingress->tail)), - sizeof(completion_ingress->tail) - ); + __atomic_store_n(&slot->seq, 0, __ATOMIC_RELEASE); + __atomic_store_n(&completion_ingress->tail, tail + 1, __ATOMIC_RELEASE); drained++; tail++; @@ -266,47 +219,44 @@ struct PTO2AsyncWaitList { return true; } - RegisterResult register_deferred( - PTO2TaskSlotState &slot_state, volatile PTO2DeferredCompletionIngressBuffer *ingress, bool normal_done, - int32_t &error_code - ) { + RegisterResult + register_deferred(PTO2TaskSlotState &slot_state, const AsyncCtx &async_ctx, bool normal_done, int32_t &error_code) { error_code = PTO2_ERROR_NONE; - PTO2TaskPayload *payload = slot_state.payload; - if (payload == nullptr || !payload->complete_in_future) { + if (slot_state.payload == nullptr) { return RegisterResult::NotDeferred; } if (!try_lock()) return RegisterResult::Skipped; uint32_t deferred_count = 0; - if (ingress != nullptr) { - cache_invalidate_range( - const_cast(reinterpret_cast(ingress)), PTO2_ALIGN_SIZE - ); - if (ingress->error_code != PTO2_ERROR_NONE) { - error_code = ingress->error_code; - unlock(); - return RegisterResult::Error; + if (async_ctx.completion_count != nullptr) { + if (async_ctx.completion_error_code != nullptr) { + if (*async_ctx.completion_error_code != PTO2_ERROR_NONE) { + error_code = *async_ctx.completion_error_code; + unlock(); + return RegisterResult::Error; + } } - deferred_count = ingress->count; + deferred_count = *async_ctx.completion_count; } - if (deferred_count > PTO2_MAX_COMPLETIONS_PER_TASK) { + if (deferred_count > async_ctx.completion_capacity) { error_code = PTO2_ERROR_ASYNC_REGISTRATION_FAILED; unlock(); return RegisterResult::Error; } - if (deferred_count > 0 && ingress != nullptr) { - cache_invalidate_range( - const_cast(reinterpret_cast(ingress->entries)), - deferred_count * sizeof(PTO2DeferredCompletionEntry) - ); + if (deferred_count > 0) { + if (async_ctx.completion_entries == nullptr) { + error_code = PTO2_ERROR_ASYNC_REGISTRATION_FAILED; + unlock(); + return RegisterResult::Error; + } } PTO2AsyncWaitEntry *entry = find_entry_by_token(slot_state.task->task_id); + if (entry == nullptr && deferred_count == 0) { + unlock(); + return RegisterResult::NotDeferred; + } if (entry == nullptr) { - if (!normal_done && deferred_count == 0) { - unlock(); - return RegisterResult::Registered; - } if (count >= PTO2_MAX_ASYNC_WAITS) { error_code = PTO2_ERROR_ASYNC_WAIT_OVERFLOW; unlock(); @@ -324,11 +274,7 @@ struct PTO2AsyncWaitList { } for (uint32_t i = 0; i < deferred_count; ++i) { - volatile PTO2DeferredCompletionEntry *deferred = &ingress->entries[i]; - volatile uint32_t *counter = reinterpret_cast(static_cast(deferred->addr)); - cache_invalidate_range( - reinterpret_cast(completion_ingress_cache_line(counter)), sizeof(uint32_t) - ); + volatile PTO2DeferredCompletionEntry *deferred = &async_ctx.completion_entries[i]; if (!append_condition_locked( *entry, deferred->addr, deferred->expected_value, static_cast(deferred->engine), error_code diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h index d3e9bfd70..292ff12c9 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_ingress.h @@ -56,16 +56,15 @@ struct PTO2DeferredCompletionEntry { static_assert(sizeof(PTO2DeferredCompletionEntry) == 24, "PTO2DeferredCompletionEntry layout drift"); -struct PTO2DeferredCompletionIngressBuffer { - alignas(PTO2_ALIGN_SIZE) volatile uint32_t count; +struct alignas(PTO2_ALIGN_SIZE) PTO2DeferredCompletionIngressBuffer { + volatile uint32_t count; volatile int32_t error_code; - uint8_t _header_pad[PTO2_ALIGN_SIZE - sizeof(uint32_t) - sizeof(int32_t)]; - alignas(PTO2_ALIGN_SIZE) PTO2DeferredCompletionEntry entries[PTO2_MAX_COMPLETIONS_PER_TASK]; + PTO2DeferredCompletionEntry entries[PTO2_MAX_COMPLETIONS_PER_TASK]; }; static_assert( sizeof(PTO2DeferredCompletionIngressBuffer) % PTO2_ALIGN_SIZE == 0, - "PTO2DeferredCompletionIngressBuffer size must be cache-line aligned" + "PTO2DeferredCompletionIngressBuffer size must preserve array element cache-line boundaries" ); struct PTO2CompletionIngressQueue { diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 8abc7829d..f02ea82ee 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -503,8 +503,7 @@ void PTO2OrchestratorState::end_scope() { // ============================================================================= // Task Submission // ============================================================================= -TaskOutputTensors -PTO2OrchestratorState::submit_task(const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future) { +TaskOutputTensors PTO2OrchestratorState::submit_task(const MixedKernels &mixed_kernels, const Arg &args) { auto *orch = this; CYCLE_COUNT_START(); @@ -718,7 +717,7 @@ PTO2OrchestratorState::submit_task(const MixedKernels &mixed_kernels, const Arg payload.fanin_inline_slot_states[i] = fanin_builder.inline_slots[i]; } - payload.init(args, result, prepared.alloc_result, layout, complete_in_future); + payload.init(args, result, prepared.alloc_result, layout); CYCLE_COUNT_LAP_RECORD(g_orch_args_cycle, AicpuPhaseId::ORCH_PARAMS, task_id.raw); #if PTO2_ORCH_PROFILING @@ -808,7 +807,7 @@ TaskOutputTensors PTO2OrchestratorState::alloc_tensors(const Arg &args) { TaskOutputTensors outputs; outputs.set_task_id(prepared.task_id); - payload.init(args, outputs, prepared.alloc_result, layout, false); + payload.init(args, outputs, prepared.alloc_result, layout); payload.fanin_actual_count = 0; payload.fanin_spill_start = 0; payload.fanin_spill_pool = &orch->rings[prepared.task_id.ring()].fanin_pool; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 0d21ccc44..dc1ecf461 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -125,7 +125,7 @@ struct PTO2OrchestratorState { void report_fatal(int32_t error_code, const char *func, const char *fmt, ...); void begin_scope(PTO2ScopeMode mode = PTO2ScopeMode::AUTO); void end_scope(); - TaskOutputTensors submit_task(const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future); + TaskOutputTensors submit_task(const MixedKernels &mixed_kernels, const Arg &args); TaskOutputTensors alloc_tensors(const Arg &args); void mark_done(); }; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index c2402bca2..742ced6cf 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -38,9 +38,8 @@ __attribute__((weak, visibility("hidden"))) uint64_t get_sys_cnt_aicpu() { retur // Orchestration Ops Table (function-pointer dispatch for orchestration .so) // ============================================================================= -static TaskOutputTensors -submit_task_impl(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future) { - return rt->orchestrator.submit_task(mixed_kernels, args, complete_in_future); +static TaskOutputTensors submit_task_impl(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args) { + return rt->orchestrator.submit_task(mixed_kernels, args); } static TaskOutputTensors alloc_tensors_impl(PTO2Runtime *rt, const Arg &args) { diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h index ca43bd75b..917c9c5ed 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h @@ -67,9 +67,7 @@ enum PTO2RuntimeMode { typedef struct PTO2Runtime PTO2Runtime; // forward declare for ops signatures struct PTO2RuntimeOps { - TaskOutputTensors (*submit_task)( - PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args, bool complete_in_future - ); + TaskOutputTensors (*submit_task)(PTO2Runtime *rt, const MixedKernels &mixed_kernels, const Arg &args); void (*scope_begin)(PTO2Runtime *rt); void (*scope_end)(PTO2Runtime *rt); void (*orchestration_done)(PTO2Runtime *rt); diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index 0345b631f..d10c91012 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -233,7 +233,6 @@ struct PTO2TaskPayload { int32_t scalar_count{0}; int32_t fanin_actual_count{0}; // Actual fanin count (without the +1 redundance) int32_t fanin_spill_start{0}; // Linear start index in fanin spill pool (0 = no spill) - bool complete_in_future{false}; PTO2FaninPool *fanin_spill_pool{nullptr}; PTO2TaskSlotState *fanin_inline_slot_states[PTO2_FANIN_INLINE_CAP]; // === Cache lines 9-40 (2048B) — tensors (alignas(64) forces alignment) === @@ -255,10 +254,7 @@ struct PTO2TaskPayload { * @param args Task arguments (tensors + scalars) * @param result Materialized output tensors (from TensorCreateInfo path) */ - void init( - const Arg &args, TaskOutputTensors &result, PTO2TaskAllocResult &alloc_result, PTO2OutputLayout &layout, - bool complete_in_future_flag - ) { + void init(const Arg &args, TaskOutputTensors &result, PTO2TaskAllocResult &alloc_result, PTO2OutputLayout &layout) { tensor_count = args.tensor_count(); scalar_count = args.scalar_count(); @@ -280,15 +276,13 @@ struct PTO2TaskPayload { // Round up to cache line boundary. Both arrays are 1024B so no overrun. // Eliminates branches; extra bytes within the same CL have zero additional cost. memcpy(scalars, args.scalars(), PTO2_ALIGN_UP(args.scalar_count() * sizeof(uint64_t), 64)); - complete_in_future = complete_in_future_flag; } }; // PTO2TaskPayload layout verification (offsetof requires complete type). -static_assert(offsetof(PTO2TaskPayload, complete_in_future) == 16, "deferred flag must stay in the first cache line"); -static_assert(offsetof(PTO2TaskPayload, fanin_spill_pool) == 24, "spill pool pointer layout drift"); +static_assert(offsetof(PTO2TaskPayload, fanin_spill_pool) == 16, "spill pool pointer layout drift"); static_assert( - offsetof(PTO2TaskPayload, fanin_inline_slot_states) == 32, "inline fanin array must follow spill metadata" + offsetof(PTO2TaskPayload, fanin_inline_slot_states) == 24, "inline fanin array must follow spill metadata" ); static_assert(offsetof(PTO2TaskPayload, tensors) == 576, "tensors must start at byte 576 (cache line 9)"); static_assert( diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index ec4e2d1b3..3bf2fff88 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -1019,17 +1019,9 @@ inline PTO2AsyncPollResult PTO2AsyncWaitList::poll_and_complete( for (int32_t i = count - 1; i >= 0; --i) { PTO2AsyncWaitEntry &entry = entries[i]; - uintptr_t last_invalidated_counter_line = static_cast(-1); for (int32_t c = 0; c < entry.condition_count; c++) { PTO2CompletionCondition &cond = entry.conditions[c]; if (cond.satisfied) continue; - if (cond.counter_addr) { - uintptr_t counter_line = completion_ingress_cache_line(cond.counter_addr); - if (counter_line != last_invalidated_counter_line) { - cache_invalidate_range(reinterpret_cast(counter_line), sizeof(uint32_t)); - last_invalidated_counter_line = counter_line; - } - } PTO2CompletionPollResult poll = cond.test(); if (poll.state == PTO2CompletionPollState::FAILED) { result.error_code = poll.error_code; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp index f1eb78654..f7bd06781 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp @@ -81,14 +81,14 @@ void SchedulerContext::complete_slot_task( (void)hank; #endif bool mixed_complete = sched_->on_subtask_complete(slot_state); - if (slot_state.payload != nullptr && slot_state.payload->complete_in_future) { + if (slot_state.payload != nullptr) { int32_t reg_err = PTO2_ERROR_NONE; PTO2AsyncWaitList::RegisterResult reg_result; volatile PTO2DeferredCompletionIngressBuffer *deferred_ingress = &deferred_ingress_per_core_[core_id][expected_reg_task_id & 1]; + AsyncCtx async_ctx = AsyncCtx::make(slot_state.task->task_id, deferred_ingress); do { - reg_result = - sched_->async_wait_list.register_deferred(slot_state, deferred_ingress, mixed_complete, reg_err); + reg_result = sched_->async_wait_list.register_deferred(slot_state, async_ctx, mixed_complete, reg_err); if (reg_result == PTO2AsyncWaitList::RegisterResult::Skipped) { SPIN_WAIT_HINT(); } diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h index 9586f3086..508021afa 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.h @@ -123,9 +123,10 @@ class SchedulerContext { // buf_idx = reg_task_id & 1; adjacent dispatches alternate automatically. PTO2DispatchPayload payload_per_core_[RUNTIME_MAX_WORKER][2]; - // Per-core deferred-completion ingress storage. This has the same runtime - // lifetime as payload_per_core_, but is kept out of the dispatch payload so - // normal task dispatch layout and cache footprint stay unchanged. + // Per-core deferred-completion software registration storage. This has + // the same runtime lifetime as payload_per_core_, but is kept out of the + // dispatch payload so normal task dispatch layout and cache footprint stay + // unchanged. PTO2DeferredCompletionIngressBuffer deferred_ingress_per_core_[RUNTIME_MAX_WORKER][2]; // sync_start drain coordination @@ -209,7 +210,7 @@ class SchedulerContext { void build_payload( PTO2DispatchPayload &dispatch_payload, PTO2TaskSlotState &slot_state, PTO2SubtaskSlot subslot, - PTO2DeferredCompletionIngressBuffer *deferred_ingress + const AsyncCtx &async_ctx ); void dispatch_subtask_to_core( diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp index 81e0baf14..9436942c7 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp @@ -88,7 +88,7 @@ int SchedulerContext::pop_ready_tasks_batch( void SchedulerContext::build_payload( PTO2DispatchPayload &dispatch_payload, PTO2TaskSlotState &slot_state, PTO2SubtaskSlot subslot, - PTO2DeferredCompletionIngressBuffer *deferred_ingress + const AsyncCtx &async_ctx ) { int32_t slot_idx = static_cast(subslot); uint64_t callable_addr = get_function_bin_addr(slot_state.task->kernel_id[slot_idx]); @@ -104,11 +104,7 @@ void SchedulerContext::build_payload( } dispatch_payload.local_context.s_block_idx = slot_state.next_block_idx; dispatch_payload.local_context.s_block_num = slot_state.logical_block_num; - dispatch_payload.local_context.task_token = - payload.complete_in_future ? slot_state.task->task_id : PTO2TaskId::invalid(); - dispatch_payload.local_context.deferred_ingress = payload.complete_in_future ? deferred_ingress : nullptr; - dispatch_payload.local_context.deferred_completion_capacity = - payload.complete_in_future ? PTO2_MAX_COMPLETIONS_PER_TASK : 0; + dispatch_payload.local_context.async_ctx = async_ctx; dispatch_payload.args[PAYLOAD_LOCAL_CONTEXT_INDEX] = reinterpret_cast(&dispatch_payload.local_context); dispatch_payload.args[PAYLOAD_GLOBAL_CONTEXT_INDEX] = reinterpret_cast(&dispatch_payload.global_context); } @@ -138,15 +134,11 @@ void SchedulerContext::dispatch_subtask_to_core( uint32_t buf_idx = reg_task_id & 1u; PTO2DispatchPayload &payload = payload_per_core_[core_id][buf_idx]; - PTO2DeferredCompletionIngressBuffer *deferred_ingress = nullptr; - if (slot_state.payload != nullptr && slot_state.payload->complete_in_future) { - deferred_ingress = &deferred_ingress_per_core_[core_id][buf_idx]; - deferred_ingress->count = 0; - deferred_ingress->error_code = PTO2_ERROR_NONE; - OUT_OF_ORDER_STORE_BARRIER(); - cache_flush_range(deferred_ingress, PTO2_ALIGN_SIZE); - } - build_payload(payload, slot_state, subslot, deferred_ingress); + PTO2DeferredCompletionIngressBuffer *deferred_ingress = &deferred_ingress_per_core_[core_id][buf_idx]; + deferred_ingress->count = 0; + deferred_ingress->error_code = PTO2_ERROR_NONE; + AsyncCtx async_ctx = AsyncCtx::make(slot_state.task->task_id, deferred_ingress); + build_payload(payload, slot_state, subslot, async_ctx); if (to_pending) { core_exec_state.pending_subslot = subslot; diff --git a/tests/ut/cpp/a2a3/test_a2a3_fatal.cpp b/tests/ut/cpp/a2a3/test_a2a3_fatal.cpp index 7032a8151..790daa613 100644 --- a/tests/ut/cpp/a2a3/test_a2a3_fatal.cpp +++ b/tests/ut/cpp/a2a3/test_a2a3_fatal.cpp @@ -51,7 +51,7 @@ static_assert(offsetof(FakeRuntime, ops) == 0); // Guard: reinterpret_cast belo FakeRuntime *as_fake(PTO2Runtime *rt) { return reinterpret_cast(rt); } -TaskOutputTensors fake_submit(PTO2Runtime *rt, const MixedKernels &, const Arg &, bool) { +TaskOutputTensors fake_submit(PTO2Runtime *rt, const MixedKernels &, const Arg &) { as_fake(rt)->submit_calls++; return TaskOutputTensors{}; } diff --git a/tests/ut/cpp/a5/test_a5_fatal.cpp b/tests/ut/cpp/a5/test_a5_fatal.cpp index e9659dc87..2096df6ad 100644 --- a/tests/ut/cpp/a5/test_a5_fatal.cpp +++ b/tests/ut/cpp/a5/test_a5_fatal.cpp @@ -45,7 +45,7 @@ static_assert(offsetof(FakeRuntime, ops) == 0); // Guard: reinterpret_cast belo FakeRuntime *as_fake(PTO2Runtime *rt) { return reinterpret_cast(rt); } -TaskOutputTensors fake_submit(PTO2Runtime *rt, const MixedKernels &, const Arg &, bool) { +TaskOutputTensors fake_submit(PTO2Runtime *rt, const MixedKernels &, const Arg &) { as_fake(rt)->submit_calls++; return TaskOutputTensors{}; }