From 3a82330837f25acb4ae621221f530871b94d7fa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20=C5=9Alusarczyk?= Date: Tue, 13 Jan 2026 10:43:24 +0100 Subject: [PATCH] feature: worker thread in immediate command list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related-To: NEO-13856 Signed-off-by: Łukasz Ślusarczyk --- .../source/cmdlist/cmdlist_hw_immediate.h | 183 +++++++++++++++++- .../source/cmdlist/cmdlist_hw_immediate.inl | 50 ++++- 2 files changed, 227 insertions(+), 6 deletions(-) diff --git a/level_zero/core/source/cmdlist/cmdlist_hw_immediate.h b/level_zero/core/source/cmdlist/cmdlist_hw_immediate.h index e5a3c9d3554cf..188c06e41b1bc 100644 --- a/level_zero/core/source/cmdlist/cmdlist_hw_immediate.h +++ b/level_zero/core/source/cmdlist/cmdlist_hw_immediate.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2020-2025 Intel Corporation + * Copyright (C) 2020-2026 Intel Corporation * * SPDX-License-Identifier: MIT * @@ -12,9 +12,12 @@ #include "level_zero/core/source/cmdlist/cmdlist_hw.h" +#include "cmdlist_launch_params.h" + #include #include #include +#include namespace NEO { struct SvmAllocationData; @@ -43,6 +46,144 @@ struct CpuMemCopyInfo { CpuMemCopyInfo(void *dstPtr, void *srcPtr, size_t size) : dstPtr(dstPtr), srcPtr(srcPtr), size(size) {} }; +enum class WorkItemType : uint8_t { + Barrier, + LaunchKernel +}; + +static constexpr uint32_t MaxWaitEvents = 8; + +struct WaitEventList { + uint32_t count = 0; + ze_event_handle_t events[MaxWaitEvents]; + + void set(uint32_t num, const ze_event_handle_t *src) { + count = num; + if (count > MaxWaitEvents) { + abort(); + } + for (uint32_t i = 0; i < num; ++i) { + events[i] = src[i]; + } + } +}; + +struct BarrierPayload { + ze_event_handle_t signalEvent = nullptr; + WaitEventList waitEvents; + bool relaxedOrderingDispatch; +}; + +struct LaunchKernelPayload { + ze_kernel_handle_t kernel = nullptr; + ze_group_count_t groupCount{}; + ze_event_handle_t signalEvent = nullptr; + WaitEventList waitEvents; + CmdListKernelLaunchParams launchParams; +}; + +struct alignas(64) WorkItem { + WorkItemType type; + + union { + BarrierPayload barrier; + LaunchKernelPayload launch; + }; + + WorkItem() : type(WorkItemType::Barrier) {} +}; + +// ---- Factory helpers ---- +inline WorkItem makeBarrier(ze_event_handle_t signal, + uint32_t waitCount, + const ze_event_handle_t *waitList, + bool relaxedOrderingDispatch) { + WorkItem item; + item.type = WorkItemType::Barrier; + item.barrier.signalEvent = signal; + item.barrier.waitEvents.set(waitCount, waitList); + item.barrier.relaxedOrderingDispatch = relaxedOrderingDispatch; + return item; +} + +inline WorkItem makeLaunchKernel(ze_kernel_handle_t kernel, + const ze_group_count_t &groupCount, + ze_event_handle_t signal, + uint32_t waitCount, + const ze_event_handle_t *waitList, + CmdListKernelLaunchParams &launchParams) { + WorkItem item; + item.type = WorkItemType::LaunchKernel; + item.launch.kernel = kernel; + item.launch.groupCount = groupCount; + item.launch.signalEvent = signal; + item.launch.waitEvents.set(waitCount, waitList); + item.launch.launchParams = launchParams; + return item; +}; + +template +class SpscRing { + public: + static_assert((Capacity & (Capacity - 1)) == 0, + "Capacity must be power of two"); + + SpscRing() = default; + ~SpscRing() { + // destroy any remaining elements + size_t t = tail.load(std::memory_order_relaxed); + size_t h = head.load(std::memory_order_relaxed); + while (t != h) { + T *ptr = ptr_at(t); + ptr->~T(); + t = (t + 1) & (Capacity - 1); + } + } + + // Enqueue by value; moved into place + bool enqueue(T item) { + const size_t h = head.load(std::memory_order_relaxed); + const size_t next = (h + 1) & (Capacity - 1); + + if (next == tail.load(std::memory_order_acquire)) { + return false; // full + } + + // Placement-new into pre-allocated aligned storage + new (ptr_at(h)) T(std::move(item)); + + head.store(next, std::memory_order_release); + return true; + } + + // Dequeue: returns pointer to item in-place; advances tail + T *dequeue() { + const size_t t = tail.load(std::memory_order_relaxed); + if (t == head.load(std::memory_order_acquire)) { + return nullptr; // empty + } + + T *item = ptr_at(t); + + const size_t next = (t + 1) & (Capacity - 1); + tail.store(next, std::memory_order_release); + + return item; // caller may use in-place + } + + private: + // Helpers + T *ptr_at(size_t index) { + return reinterpret_cast(&buffer[index]); + } + + alignas(64) std::atomic head{0}; + alignas(64) std::atomic tail{0}; + + // Use aligned storage to avoid false positives + typename std::aligned_storage::type buffer[Capacity]; +}; + template struct CommandListCoreFamilyImmediate : public CommandListCoreFamily { using GfxFamily = typename NEO::GfxFamilyMapper::GfxFamily; @@ -63,12 +204,36 @@ struct CommandListCoreFamilyImmediate : public CommandListCoreFamily::*)(NEO::LinearStream &, size_t, bool, bool, NEO::AppendOperations, bool); CommandListCoreFamilyImmediate(uint32_t numIddsPerBlock); + ~CommandListCoreFamilyImmediate() override; + + SpscRing queue; + std::atomic running{true}; + std::thread workerThread; + + void workerThreadRun(); ze_result_t appendLaunchKernel(ze_kernel_handle_t kernelHandle, const ze_group_count_t &threadGroupDimensions, ze_event_handle_t hEvent, uint32_t numWaitEvents, ze_event_handle_t *phWaitEvents, - CmdListKernelLaunchParams &launchParams) override; + CmdListKernelLaunchParams &launchParams) override { + + while (!queue.enqueue(makeLaunchKernel(kernelHandle, + threadGroupDimensions, + hEvent, + numWaitEvents, + phWaitEvents, + launchParams))) { + std::this_thread::yield(); + } + return ZE_RESULT_SUCCESS; + } + + ze_result_t appendLaunchKernel_inworker(ze_kernel_handle_t kernelHandle, + const ze_group_count_t &threadGroupDimensions, + ze_event_handle_t hEvent, uint32_t numWaitEvents, + ze_event_handle_t *phWaitEvents, + CmdListKernelLaunchParams &launchParams); ze_result_t appendLaunchKernelIndirect(ze_kernel_handle_t kernelHandle, const ze_group_count_t &pDispatchArgumentsBuffer, @@ -77,7 +242,19 @@ struct CommandListCoreFamilyImmediate : public CommandListCoreFamily -CommandListCoreFamilyImmediate::CommandListCoreFamilyImmediate(uint32_t numIddsPerBlock) : BaseClass(numIddsPerBlock) { +void CommandListCoreFamilyImmediate::workerThreadRun() { + + while (running.load(std::memory_order_acquire)) { + WorkItem *item = queue.dequeue(); + if (!item) { + std::this_thread::yield(); + continue; + } + + switch (item->type) { + case WorkItemType::Barrier: { + BarrierPayload &b = item->barrier; + + appendBarrier_inworker( + b.signalEvent, + b.waitEvents.count, + b.waitEvents.events, + b.relaxedOrderingDispatch); + break; + } + + case WorkItemType::LaunchKernel: { + LaunchKernelPayload &k = item->launch; + + appendLaunchKernel_inworker( + k.kernel, + k.groupCount, + k.signalEvent, + k.waitEvents.count, + k.waitEvents.events, + k.launchParams); + break; + } + } + } +} + +template +CommandListCoreFamilyImmediate::CommandListCoreFamilyImmediate(uint32_t numIddsPerBlock) : BaseClass(numIddsPerBlock), workerThread(&CommandListCoreFamilyImmediate::workerThreadRun, this) { computeFlushMethod = &CommandListCoreFamilyImmediate::flushRegularTask; } +template +CommandListCoreFamilyImmediate::~CommandListCoreFamilyImmediate() { + running.store(false, std::memory_order_release); + workerThread.join(); +} + template void CommandListCoreFamilyImmediate::checkAvailableSpace(uint32_t numEvents, bool hasRelaxedOrderingDependencies, size_t commandSize, bool requestCommandBufferInLocalMem) { this->commandContainer.fillReusableAllocationLists(); @@ -568,7 +612,7 @@ void CommandListCoreFamilyImmediate::tryResetKernelWithAssertFlag } template -ze_result_t CommandListCoreFamilyImmediate::appendLaunchKernel( +ze_result_t CommandListCoreFamilyImmediate::appendLaunchKernel_inworker( ze_kernel_handle_t kernelHandle, const ze_group_count_t &threadGroupDimensions, ze_event_handle_t hSignalEvent, uint32_t numWaitEvents, ze_event_handle_t *phWaitEvents, CmdListKernelLaunchParams &launchParams) { @@ -621,7 +665,7 @@ ze_result_t CommandListCoreFamilyImmediate::appendLaunchKernelInd } template -ze_result_t CommandListCoreFamilyImmediate::appendBarrier(ze_event_handle_t hSignalEvent, uint32_t numWaitEvents, ze_event_handle_t *phWaitEvents, bool relaxedOrderingDispatch) { +ze_result_t CommandListCoreFamilyImmediate::appendBarrier_inworker(ze_event_handle_t hSignalEvent, uint32_t numWaitEvents, ze_event_handle_t *phWaitEvents, bool relaxedOrderingDispatch) { ze_result_t ret = ZE_RESULT_SUCCESS; bool isStallingOperation = true;