/* * Copyright (c) 2026 Mykhailo Mamedov. All rights reserved. * * RESEARCH PREVIEW / REFERENCE ONLY: * This source code is provided solely for the purpose of reviewing * the author's research methods and implementation. * * NO LICENSE GRANTED: * This code is NOT for distribution, modification, or use in any * project (commercial or otherwise). Unauthorized copying or * use of this code is strictly prohibited. * * For inquiries regarding use or licensing, contact: ua.modin@gmail.com */ #ifndef WORKLOAD_HEADER #define WORKLOAD_HEADER #include #include #include #include #include #include namespace M_CORE { struct Submission2; } namespace M_CORE { namespace cfgPipe { inline constexpr auto MAX_QUERYS_PER_DEVICE = 32U; inline constexpr auto ONE_MICROSECOND = 1e-6f; } /** * @brief Structured configuration defining a single pipe stage's execution environment. */ struct StageConfig { mDevice* deviceSrc = nullptr; mDevice* deviceDst = nullptr; const char* stageName = nullptr; }; struct CrossDeviceLink { mDevice* srcDevice = nullptr; VkSemaphore srcSemaphore = nullptr; mDevice* dstDevice = nullptr; VkSemaphore dstSemaphore = nullptr; u64 lastMirroredValue = 0; // The tracking floor for THIS cross-device link. }; struct StageContext { StageConfig config; sem localTimelineSem; u64 lastProducerValue = 0; u64 m_gpuExecutionCounter = 0; u32 readIdx = 0; float lastTime = 0; float deltaTime = 0; VkQueryPool pool = nullptr; }; struct StageMetrics { u32 stageCount = 0; VkQueryPool pool = nullptr; }; /** * @brief Coordinates multi-stage workload pipelining and data flow across * independent Vulkan devices using single-threaded host synchronization. * * @details This class decouples multi-GPU synchronization from driver-dependent * external semaphore extensions. Instead of relying on native hardware casting— * which frequently fails or deadlocks across different GPU architectures—this * tracker isolates execution onto device-local timeline semaphores. * * Cross-device data boundaries are linked safely via a synchro, * once-per-frame MirrorSignals() call executed on the main CPU loop. This function * peeks at the raw execution states of producer semaphores and monotonically * advances consumer landing pads by exactly one step per frame. */ class VulkanPipelineTracker { public: VulkanPipelineTracker(mCore* m_core, const std::vector& stages, u32 bufferCount=3) : m_stageCount(static_cast(stages.size())), m_bufferCount(bufferCount) { m_stageContexts.resize(m_stageCount); std::unordered_map countsMap; for (u32 s = 0; s < m_stageCount; ++s) { std::string semName = std::format("pipeLocal_S{}_{}", s, stages[s].stageName); StageContext& stageContext = m_stageContexts[s]; stageContext.config = stages[s]; semaphoreInfo info = { stageContext.config.deviceSrc, VkSemaphoreType::VK_SEMAPHORE_TYPE_TIMELINE, 0, // Initial value. semName.c_str() }; if (stageContext.config.deviceDst && stageContext.config.deviceDst->virtualDevice) { info.flags = SEM_FLAGS::RELAY; info.remoteDevice = stageContext.config.deviceDst; } m_core->createSemaphore(info, stageContext.localTimelineSem); if (stageContext.localTimelineSem.semaphoreRemote) { AddCrossDeviceLink( stageContext.config.deviceSrc, stageContext.localTimelineSem.semaphore, stageContext.config.deviceDst, stageContext.localTimelineSem.semaphoreRemote, 0); } countsMap[stageContext.config.deviceSrc->virtualDevice].stageCount++; } /** * @brief Create a query pool for each device. * Required for performance metrics for every stage. */ for (auto& [device, metrics] : countsMap) { CreateQueryStagePool(device, cfgPipe::MAX_QUERYS_PER_DEVICE, metrics.pool); assert(metrics.pool); } // Link query pool to stages. for (u32 s = 0; s < m_stageCount; ++s) { StageContext& stageContext = m_stageContexts[s]; stageContext.pool = countsMap.at(stageContext.config.deviceSrc->virtualDevice).pool; } } ~VulkanPipelineTracker() { for (u32 s = 0; s < m_stageCount; ++s) { delete& m_stageContexts[s].localTimelineSem; // TODO: Add context destructor. } } /** * @brief Registers a cross-device pipeline handoff link. */ void AddCrossDeviceLink(mDevice* srcDev, VkSemaphore srcSem, mDevice* dstDev, VkSemaphore dstSem, u64 initialFloorValue = 0) { CrossDeviceLink link{}; link.srcDevice = srcDev; link.srcSemaphore = srcSem; link.dstDevice = dstDev; link.dstSemaphore = dstSem; link.lastMirroredValue = initialFloorValue; m_links.push_back(link); } /** * @brief Processes all registered cross-device signals. * @details Call this exactly ONCE per CPU frame loop tick (for now). * It increments the destination timeline by exactly 1 step per frame. */ void MirrorSignals() { for (size_t i = 0; i < m_links.size(); ++i) { CrossDeviceLink& link = m_links[i]; // Peek at the live source GPU execution state (Instant, non-blocking check) u64 currentSrcValue = 0; VkResult srcRes = vkGetSemaphoreCounterValue(link.srcDevice->virtualDevice, link.srcSemaphore, ¤tSrcValue); CHECK_VK_RESULT(srcRes, "vkGetSemaphoreCounterValue\n"); // If the source has jumped ahead, advance the target by exactly 1 single step if (currentSrcValue > link.lastMirroredValue) { link.lastMirroredValue++; // (e.g., 1 -> 2) VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO }; signalInfo.semaphore = link.dstSemaphore; signalInfo.value = link.lastMirroredValue; // Safe CPU signal. Because it only fires once per frame, it will never thrash the WDDM driver kernel. VkResult res = vkSignalSemaphore(link.dstDevice->virtualDevice, &signalInfo); CHECK_VK_RESULT(res, "vkSignalSemaphore\n"); } } } /** * @brief Says True if scheduled workload has been completed. * Also True if no workload was ever scheduled. */ bool IsStageComplete(u32 stageId, u64& timeLineValue) { u64 currentSelfValue = m_stageContexts[stageId].m_gpuExecutionCounter; if (currentSelfValue > 0) { u64 currentGPUValue = 0; VkResult res = vkGetSemaphoreCounterValue( m_stageContexts[stageId].config.deviceSrc->virtualDevice, m_stageContexts[stageId].localTimelineSem.semaphore, ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); timeLineValue = currentGPUValue; if (currentSelfValue != currentGPUValue) return false; } return true; } /** * @brief temporary time counter. To be replaced with vKQueryPool * DEPRECATED */ void UpdateTime(u32 stageId, float currentTime) { StageContext* context = GetStageContext(stageId); if (!context->lastTime) { context->lastTime = currentTime; } else { context->deltaTime = currentTime - context->lastTime; context->lastTime = currentTime; } } /** * @brief Create timestamp query pool for specific device. * Each device needs a pool. */ void CreateQueryStagePool(VkDevice device, u32 stageCount, VkQueryPool& resultPool) { VkQueryPoolCreateInfo poolInfo = {}; poolInfo.sType = VK_STRUCTURE_TYPE_QUERY_POOL_CREATE_INFO; poolInfo.queryType = VK_QUERY_TYPE_TIMESTAMP; poolInfo.queryCount = stageCount * 2; // One for start, one for end VkResult res = vkCreateQueryPool(device, &poolInfo, nullptr, &resultPool); CHECK_VK_RESULT(res, "vkCreateQueryPool\n"); } /** * @brief A wrapper function designed to encapsulate the boilerplate * of executing Vulkan commands while automatically measuring their actual duration on GPU. */ void recordTimedCommands( VkCommandBuffer cmdBuffer, StageContext* context, u32 stageId, std::function recordCommands) { u32 queryStart = stageId * 2; u32 queryEnd = queryStart + 1; // Reset start and end queries vkCmdResetQueryPool(cmdBuffer, context->pool, queryStart, 2); // Write Start Timestamp // BOTTOM_OF_PIPE ? vkCmdWriteTimestamp(cmdBuffer, VK_PIPELINE_STAGE_ALL_COMMANDS_BIT, context->pool, queryStart); // User-provided command recording recordCommands(); // Write End Timestamp vkCmdWriteTimestamp(cmdBuffer, VK_PIPELINE_STAGE_ALL_COMMANDS_BIT, context->pool, queryEnd); } /** * @brief Calculates the exact duration of a GPU operation stage in milliseconds * by computing the difference between its recorded start and end timestamps. * TODO: Move away. */ float getStageDurationMs(StageContext* context, u32 stageId) { u64 results[2]; // Retrieve timestamp ticks (64-bit) VkResult res = vkGetQueryPoolResults( context->config.deviceSrc->virtualDevice, context->pool, stageId * 2, 2, sizeof(results), results, sizeof(u64), VK_QUERY_RESULT_64_BIT ); CHECK_VK_RESULT(res, "vkGetQueryPoolResults\n"); // Convert to milliseconds. u64 ticksDelta = results[1] - results[0]; return static_cast(ticksDelta) * context->config.deviceSrc->phyDeviceData.m_devProps.limits.timestampPeriod * cfgPipe::ONE_MICROSECOND; } /** * @brief Says True if target index is not in progress by stage specified. * says true if that index in finished also. */ bool IsIndexSafeToWrite(u32 stageId, u32 indexTarget) { u64 stageScheduledValue = m_stageContexts[stageId].m_gpuExecutionCounter; if (stageScheduledValue > 0) { u64 currentGPUValue = 0; VkResult res = vkGetSemaphoreCounterValue( m_stageContexts[stageId].config.deviceSrc->virtualDevice, m_stageContexts[stageId].localTimelineSem.semaphore, ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); u32 scheduledWriteIdx = stageScheduledValue % m_bufferCount; u32 activeReadIdx = m_stageContexts[stageId].readIdx; u32 latestFinishedStageIndex = currentGPUValue % m_bufferCount; if (indexTarget == activeReadIdx && scheduledWriteIdx != latestFinishedStageIndex) { // The last scheduled index matches the target, and it wasn't finished yet by the stage specified. return false; } } return true; } /** * @brief Returns previous finished index by stage. */ u32 getSafeIndex(u32 stageId, u64& timeLineValue) { u64 currentGPUValue = 0; VkResult res = vkGetSemaphoreCounterValue( m_stageContexts[stageId].config.deviceSrc->virtualDevice, m_stageContexts[stageId].localTimelineSem.semaphore, ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); if (currentGPUValue > 0) { // Take previous index. u32 index = currentGPUValue % m_bufferCount; timeLineValue = currentGPUValue; return index; } } /** * @brief Provides semaphore object for the device configured * and a new timeline value to signal on submission and reference to schedule counter. * Stage supposed to call this when previous workload confirmed finished in IsStageComplete() * */ void AdvanceStageSync(u32 stageId, u64& targetValue, sem*& semObject, u32 readIdxTarget) { m_stageContexts[stageId].m_gpuExecutionCounter++; m_stageContexts[stageId].readIdx = readIdxTarget; targetValue = m_stageContexts[stageId].m_gpuExecutionCounter; semObject = &m_stageContexts[stageId].localTimelineSem; } StageContext* GetStageContext(u32 stageId) { return &m_stageContexts[stageId]; } VkDevice GetStageDevice(u32 stageId) { assert(stageId < m_stageContexts.size() && "Wrong ID requested."); return m_stageContexts[stageId].config.deviceSrc->virtualDevice; } private: u32 m_stageCount; u32 m_bufferCount; std::vector m_links; std::vector m_stageContexts; }; } // namespace #endif // WORKLOAD_HEADER #ifdef WORKLOAD_COMPONENT namespace M_CORE { } // namespace #endif