diff --git a/pipelineTracker.hpp b/pipelineTracker.hpp index 7395c28..43545e7 100644 --- a/pipelineTracker.hpp +++ b/pipelineTracker.hpp @@ -17,189 +17,359 @@ #ifndef WORKLOAD_HEADER #define WORKLOAD_HEADER +#include +#include #include #include #include #include namespace M_CORE { + struct Submission2; +} + +namespace M_CORE { + + namespace cfgPipe { + inline constexpr auto MAX_QUERYS_PER_DEVICE = 32U; + inline constexpr auto ONE_MICROSECOND = 1e-6f; + } /** - * @brief Container for a timeline semaphore handle and its target value. + * @brief Structured configuration defining a single pipe stage's execution environment. */ - struct SubmissionSync { - VkSemaphore semaphore = VK_NULL_HANDLE; - u64 value = 0; + struct StageConfig { + mDevice* deviceSrc = nullptr; + mDevice* deviceDst = nullptr; + const char* stageName = nullptr; + }; + + struct CrossDeviceLink { + mDevice* srcDevice = nullptr; + VkSemaphore srcSemaphore = nullptr; + + mDevice* dstDevice = nullptr; + VkSemaphore dstSemaphore = nullptr; + + u64 lastMirroredValue = 0; // The tracking floor for THIS cross-device link. + }; + + struct StageContext { + StageConfig config; + sem localTimelineSem; + u64 lastProducerValue = 0; + u64 m_gpuExecutionCounter = 0; + u32 readIdx = 0; + float lastTime = 0; + float deltaTime = 0; + VkQueryPool pool = nullptr; + }; + + struct StageMetrics { + u32 stageCount = 0; + VkQueryPool pool = nullptr; }; /** - * @brief Holds the input (wait) and output (signal) information for a queue submission. - */ - struct WorkloadSyncPack { - SubmissionSync wait; - SubmissionSync signal; - }; - - /** - * @brief Manages non-blocking CPU coordination and timeline semaphore tracking - * for dynamic ring-buffered workloads operating across multiple Vulkan devices. + * @brief Coordinates multi-stage workload pipelining and data flow across + * independent Vulkan devices using single-threaded host synchronization. + * + * @details This class decouples multi-GPU synchronization from driver-dependent + * external semaphore extensions. Instead of relying on native hardware casting— + * which frequently fails or deadlocks across different GPU architectures—this + * tracker isolates execution onto device-local timeline semaphores. + * + * Cross-device data boundaries are linked safely via a synchro, + * once-per-frame MirrorSignals() call executed on the main CPU loop. This function + * peeks at the raw execution states of producer semaphores and monotonically + * advances consumer landing pads by exactly one step per frame. */ class VulkanPipelineTracker { public: - /** - * @brief Allocates and initializes timeline semaphores for all stages and buffers. - * @param stageCount The total number of sequential workloads/submissions in the pipeline. - * @param stageDevices Array mapping each logical stage ID to its owning VkDevice handle. - * @param bufferCount Total slots in the ring buffer. - */ - VulkanPipelineTracker(u32 stageCount, const std::vector& stageDevices, u32 bufferCount = 3) - : m_devices(stageDevices), m_stageCount(stageCount), m_bufferCount(bufferCount) + VulkanPipelineTracker(mCore* m_core, const std::vector& stages, u32 bufferCount=3) + : m_stageCount(static_cast(stages.size())), m_bufferCount(bufferCount) { - VkResult res = VK_SUCCESS; - m_semaphores.resize(m_stageCount, std::vector(bufferCount, VK_NULL_HANDLE)); - m_currentValues.resize(m_stageCount, std::vector(bufferCount, 0)); - m_lastWrittenIndexPerStage.resize(m_stageCount, 0); - m_isDataNewPerStage.resize(m_stageCount, false); + m_stageContexts.resize(m_stageCount); + std::unordered_map countsMap; - VkSemaphoreTypeCreateInfo typeInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_TYPE_CREATE_INFO }; - typeInfo.semaphoreType = VK_SEMAPHORE_TYPE_TIMELINE; - typeInfo.initialValue = 0; - - VkSemaphoreCreateInfo createInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_CREATE_INFO }; - createInfo.pNext = &typeInfo; - - // Allocate a dedicated timeline semaphore for every stage-buffer intersection for (u32 s = 0; s < m_stageCount; ++s) { - for (u32 b = 0; b < bufferCount; ++b) { - res = vkCreateSemaphore(m_devices[s], &createInfo, nullptr, &m_semaphores[s][b]); - CHECK_VK_RESULT(res, "vkCreateSemaphore\n"); + std::string semName = std::format("pipeLocal_S{}_{}", s, stages[s].stageName); + + StageContext& stageContext = m_stageContexts[s]; + stageContext.config = stages[s]; + + semaphoreInfo info = { + stageContext.config.deviceSrc, + VkSemaphoreType::VK_SEMAPHORE_TYPE_TIMELINE, + 0, // Initial value. + semName.c_str() + }; + + if (stageContext.config.deviceDst && stageContext.config.deviceDst->virtualDevice) { + info.flags = SEM_FLAGS::RELAY; + info.remoteDevice = stageContext.config.deviceDst; } + m_core->createSemaphore(info, stageContext.localTimelineSem); + + if (stageContext.localTimelineSem.semaphoreRemote) { + AddCrossDeviceLink( + stageContext.config.deviceSrc, + stageContext.localTimelineSem.semaphore, + stageContext.config.deviceDst, + stageContext.localTimelineSem.semaphoreRemote, 0); + } + + countsMap[stageContext.config.deviceSrc->virtualDevice].stageCount++; } - // Prime Stage 1 (Index 1) to value 1 so Stage 0 is clear to write immediately on frame 0 - for (u32 b = 0; b < bufferCount; ++b) { - m_currentValues[1][b] = 1; - VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO }; - signalInfo.semaphore = m_semaphores[1][b]; - signalInfo.value = 1; - res = vkSignalSemaphore(m_devices[1], &signalInfo); - CHECK_VK_RESULT(res, "vkSignalSemaphore\n"); + /** + * @brief Create a query pool for each device. + * Required for performance metrics for every stage. + */ + for (auto& [device, metrics] : countsMap) { + CreateQueryStagePool(device, cfgPipe::MAX_QUERYS_PER_DEVICE, metrics.pool); + assert(metrics.pool); + } + + // Link query pool to stages. + for (u32 s = 0; s < m_stageCount; ++s) { + StageContext& stageContext = m_stageContexts[s]; + stageContext.pool = countsMap.at(stageContext.config.deviceSrc->virtualDevice).pool; } } - /** - * @brief Destructor. - */ ~VulkanPipelineTracker() { for (u32 s = 0; s < m_stageCount; ++s) { - for (u32 b = 0; b < m_bufferCount; ++b) { - if (m_semaphores[s][b] != VK_NULL_HANDLE) { - vkDestroySemaphore(m_devices[s], m_semaphores[s][b], nullptr); - } + delete& m_stageContexts[s].localTimelineSem; // TODO: Add context destructor. + } + } + + /** + * @brief Registers a cross-device pipeline handoff link. + */ + void AddCrossDeviceLink(mDevice* srcDev, VkSemaphore srcSem, mDevice* dstDev, VkSemaphore dstSem, u64 initialFloorValue = 0) { + CrossDeviceLink link{}; + link.srcDevice = srcDev; + link.srcSemaphore = srcSem; + link.dstDevice = dstDev; + link.dstSemaphore = dstSem; + link.lastMirroredValue = initialFloorValue; + m_links.push_back(link); + } + + /** + * @brief Processes all registered cross-device signals. + * @details Call this exactly ONCE per CPU frame loop tick (for now). + * It increments the destination timeline by exactly 1 step per frame. + */ + void MirrorSignals() { + for (size_t i = 0; i < m_links.size(); ++i) { + CrossDeviceLink& link = m_links[i]; + + // Peek at the live source GPU execution state (Instant, non-blocking check) + u64 currentSrcValue = 0; + VkResult srcRes = vkGetSemaphoreCounterValue(link.srcDevice->virtualDevice, link.srcSemaphore, ¤tSrcValue); + CHECK_VK_RESULT(srcRes, "vkGetSemaphoreCounterValue\n"); + + // If the source has jumped ahead, advance the target by exactly 1 single step + if (currentSrcValue > link.lastMirroredValue) { + link.lastMirroredValue++; // (e.g., 1 -> 2) + + VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO }; + signalInfo.semaphore = link.dstSemaphore; + signalInfo.value = link.lastMirroredValue; + + // Safe CPU signal. Because it only fires once per frame, it will never thrash the WDDM driver kernel. + VkResult res = vkSignalSemaphore(link.dstDevice->virtualDevice, &signalInfo); + CHECK_VK_RESULT(res, "vkSignalSemaphore\n"); } } } /** - * @brief Instantly queries the GPU without blocking to see if a stage is clear to submit. - * @details Stage 0 checks if Stage 1 is done reading. Downstream stages check their own history. + * @brief Says True if scheduled workload has been completed. + * Also True if no workload was ever scheduled. */ - bool IsStageReady(u32 stageId, u32 bufferIdx) { - VkResult res = VK_SUCCESS; - if (stageId == 0) { - u64 requiredValue = m_currentValues[1][bufferIdx]; + bool IsStageComplete(u32 stageId, u64& timeLineValue) { + + u64 currentSelfValue = m_stageContexts[stageId].m_gpuExecutionCounter; + if (currentSelfValue > 0) { u64 currentGPUValue = 0; - res = vkGetSemaphoreCounterValue(m_devices[1], m_semaphores[1][bufferIdx], ¤tGPUValue); + + VkResult res = vkGetSemaphoreCounterValue( + m_stageContexts[stageId].config.deviceSrc->virtualDevice, + m_stageContexts[stageId].localTimelineSem.semaphore, ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); - return currentGPUValue >= requiredValue; + timeLineValue = currentGPUValue; + if (currentSelfValue != currentGPUValue) + return false; } + return true; + } - u64 pastValue = m_currentValues[stageId][bufferIdx]; - if (pastValue == 0) return true; + /** + * @brief temporary time counter. To be replaced with vKQueryPool + * DEPRECATED + */ + void UpdateTime(u32 stageId, float currentTime) { + StageContext* context = GetStageContext(stageId); + if (!context->lastTime) { + context->lastTime = currentTime; + } + else { + context->deltaTime = currentTime - context->lastTime; + context->lastTime = currentTime; + } + } + /** + * @brief Create timestamp query pool for specific device. + * Each device needs a pool. + */ + void CreateQueryStagePool(VkDevice device, u32 stageCount, VkQueryPool& resultPool) { + VkQueryPoolCreateInfo poolInfo = {}; + poolInfo.sType = VK_STRUCTURE_TYPE_QUERY_POOL_CREATE_INFO; + poolInfo.queryType = VK_QUERY_TYPE_TIMESTAMP; + poolInfo.queryCount = stageCount * 2; // One for start, one for end + VkResult res = vkCreateQueryPool(device, &poolInfo, nullptr, &resultPool); + CHECK_VK_RESULT(res, "vkCreateQueryPool\n"); + } + + /** + * @brief A wrapper function designed to encapsulate the boilerplate + * of executing Vulkan commands while automatically measuring their actual duration on GPU. + */ + void recordTimedCommands( + VkCommandBuffer cmdBuffer, + StageContext* context, + u32 stageId, + std::function recordCommands) + { + u32 queryStart = stageId * 2; + u32 queryEnd = queryStart + 1; + + // Reset start and end queries + vkCmdResetQueryPool(cmdBuffer, context->pool, queryStart, 2); + + // Write Start Timestamp + // BOTTOM_OF_PIPE ? + vkCmdWriteTimestamp(cmdBuffer, VK_PIPELINE_STAGE_ALL_COMMANDS_BIT, context->pool, queryStart); + + // User-provided command recording + recordCommands(); + + // Write End Timestamp + vkCmdWriteTimestamp(cmdBuffer, VK_PIPELINE_STAGE_ALL_COMMANDS_BIT, context->pool, queryEnd); + } + + /** + * @brief Calculates the exact duration of a GPU operation stage in milliseconds + * by computing the difference between its recorded start and end timestamps. + * TODO: Move away. + */ + float getStageDurationMs(StageContext* context, u32 stageId) { + u64 results[2]; + + // Retrieve timestamp ticks (64-bit) + VkResult res = vkGetQueryPoolResults( + context->config.deviceSrc->virtualDevice, context->pool, + stageId * 2, 2, + sizeof(results), results, + sizeof(u64), VK_QUERY_RESULT_64_BIT + ); + CHECK_VK_RESULT(res, "vkGetQueryPoolResults\n"); + + // Convert to milliseconds. + u64 ticksDelta = results[1] - results[0]; + return static_cast(ticksDelta) + * context->config.deviceSrc->phyDeviceData.m_devProps.limits.timestampPeriod + * cfgPipe::ONE_MICROSECOND; + } + + /** + * @brief Says True if target index is not in progress by stage specified. + * says true if that index in finished also. + */ + bool IsIndexSafeToWrite(u32 stageId, u32 indexTarget) { + u64 stageScheduledValue = m_stageContexts[stageId].m_gpuExecutionCounter; + if (stageScheduledValue > 0) { + u64 currentGPUValue = 0; + VkResult res = vkGetSemaphoreCounterValue( + m_stageContexts[stageId].config.deviceSrc->virtualDevice, + m_stageContexts[stageId].localTimelineSem.semaphore, + ¤tGPUValue); + CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); + + u32 scheduledWriteIdx = stageScheduledValue % m_bufferCount; + u32 activeReadIdx = m_stageContexts[stageId].readIdx; + u32 latestFinishedStageIndex = currentGPUValue % m_bufferCount; + + if (indexTarget == activeReadIdx && scheduledWriteIdx != latestFinishedStageIndex) { + // The last scheduled index matches the target, and it wasn't finished yet by the stage specified. + return false; + } + } + return true; + } + + /** + * @brief Returns previous finished index by stage. + */ + u32 getSafeIndex(u32 stageId, u64& timeLineValue) { u64 currentGPUValue = 0; - res = vkGetSemaphoreCounterValue(m_devices[stageId], m_semaphores[stageId][bufferIdx], ¤tGPUValue); + VkResult res = vkGetSemaphoreCounterValue( + m_stageContexts[stageId].config.deviceSrc->virtualDevice, + m_stageContexts[stageId].localTimelineSem.semaphore, + ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); - return currentGPUValue >= pastValue; + + if (currentGPUValue > 0) { + // Take previous index. + u32 index = currentGPUValue % m_bufferCount; + timeLineValue = currentGPUValue; + return index; + } } /** - * @brief Packages the semaphore handles and value needed to pass a signal across devices. - * @note To be called right after a queue submission. + * @brief Provides semaphore object for the device configured + * and a new timeline value to signal on submission and reference to schedule counter. + * Stage supposed to call this when previous workload confirmed finished in IsStageComplete() + * */ - SyncJob ExtractCrossDeviceSyncJob(u32 producerStageId, u32 bufferIdx) { - u32 consumerStageId = (producerStageId + 1) % m_stageCount; - - SyncJob job{}; - job.deviceSource = m_devices[producerStageId]; - job.semSource = m_semaphores[producerStageId][bufferIdx]; - job.deviceTarget = m_devices[consumerStageId]; - job.semTarget = m_semaphores[consumerStageId][bufferIdx]; - job.waitValue = m_currentValues[producerStageId][bufferIdx]; - - return job; + void AdvanceStageSync(u32 stageId, u64& targetValue, sem*& semObject, u32 readIdxTarget) { + m_stageContexts[stageId].m_gpuExecutionCounter++; + m_stageContexts[stageId].readIdx = readIdxTarget; + targetValue = m_stageContexts[stageId].m_gpuExecutionCounter; + semObject = &m_stageContexts[stageId].localTimelineSem; } - /** - * @brief Generates wait/signal info for Stage 0 based on the current loop iteration. - */ - void GetRootStageSync(u64 iteration, WorkloadSyncPack& outSync, u32& outWriteIdx) { - outWriteIdx = iteration % m_bufferCount; - - outSync.wait.semaphore = m_semaphores[1][outWriteIdx]; - outSync.wait.value = m_currentValues[1][outWriteIdx]; - - m_currentValues[0][outWriteIdx]++; - outSync.signal.semaphore = m_semaphores[0][outWriteIdx]; - outSync.signal.value = m_currentValues[0][outWriteIdx]; - - m_lastWrittenIndexPerStage[0] = outWriteIdx; - m_isDataNewPerStage[0] = true; + StageContext* GetStageContext(u32 stageId) { + return &m_stageContexts[stageId]; } - /** - * @brief Generates wait/signal info for any downstream consumer stage (1 to N). - */ - void GetSubsequentStageSync(u32 stageId, WorkloadSyncPack& outSync, u32 bufferIdx) { - u32 producerStage = stageId - 1; - - outSync.wait.semaphore = m_semaphores[producerStage][bufferIdx]; - outSync.wait.value = m_currentValues[producerStage][bufferIdx]; - - m_currentValues[stageId][bufferIdx]++; - outSync.signal.semaphore = m_semaphores[stageId][bufferIdx]; - outSync.signal.value = m_currentValues[stageId][bufferIdx]; - - m_lastWrittenIndexPerStage[stageId] = bufferIdx; - m_isDataNewPerStage[stageId] = true; - m_isDataNewPerStage[producerStage] = false; - } - - /** - * @brief Checks if the immediate parent stage has a slot ready. - * @param outIdx Populated with the available buffer index if the function returns true. - */ - bool IsDataAvailableFromProducer(u32 stageId, u32& outIdx) { - outIdx = m_lastWrittenIndexPerStage[stageId - 1]; - return m_isDataNewPerStage[stageId - 1]; + VkDevice GetStageDevice(u32 stageId) { + assert(stageId < m_stageContexts.size() && "Wrong ID requested."); + return m_stageContexts[stageId].config.deviceSrc->virtualDevice; } private: - std::vector m_devices; u32 m_stageCount; u32 m_bufferCount; - - std::vector> m_semaphores; - std::vector> m_currentValues; - - std::vector m_lastWrittenIndexPerStage; - std::vector m_isDataNewPerStage; + std::vector m_links; + std::vector m_stageContexts; + }; - + } // namespace #endif // WORKLOAD_HEADER #ifdef WORKLOAD_COMPONENT + +namespace M_CORE { + +} // namespace + #endif