diff --git a/pipelineTracker.hpp b/pipelineTracker.hpp new file mode 100644 index 0000000..7395c28 --- /dev/null +++ b/pipelineTracker.hpp @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2026 Mykhailo Mamedov. All rights reserved. + * + * RESEARCH PREVIEW / REFERENCE ONLY: + * This source code is provided solely for the purpose of reviewing + * the author's research methods and implementation. + * + * NO LICENSE GRANTED: + * This code is NOT for distribution, modification, or use in any + * project (commercial or otherwise). Unauthorized copying or + * use of this code is strictly prohibited. + * + * For inquiries regarding use or licensing, contact: ua.modin@gmail.com + */ + + +#ifndef WORKLOAD_HEADER +#define WORKLOAD_HEADER + +#include +#include +#include +#include + +namespace M_CORE { + + /** + * @brief Container for a timeline semaphore handle and its target value. + */ + struct SubmissionSync { + VkSemaphore semaphore = VK_NULL_HANDLE; + u64 value = 0; + }; + + /** + * @brief Holds the input (wait) and output (signal) information for a queue submission. + */ + struct WorkloadSyncPack { + SubmissionSync wait; + SubmissionSync signal; + }; + + /** + * @brief Manages non-blocking CPU coordination and timeline semaphore tracking + * for dynamic ring-buffered workloads operating across multiple Vulkan devices. + */ + class VulkanPipelineTracker { + public: + /** + * @brief Allocates and initializes timeline semaphores for all stages and buffers. + * @param stageCount The total number of sequential workloads/submissions in the pipeline. + * @param stageDevices Array mapping each logical stage ID to its owning VkDevice handle. + * @param bufferCount Total slots in the ring buffer. + */ + VulkanPipelineTracker(u32 stageCount, const std::vector& stageDevices, u32 bufferCount = 3) + : m_devices(stageDevices), m_stageCount(stageCount), m_bufferCount(bufferCount) + { + VkResult res = VK_SUCCESS; + m_semaphores.resize(m_stageCount, std::vector(bufferCount, VK_NULL_HANDLE)); + m_currentValues.resize(m_stageCount, std::vector(bufferCount, 0)); + m_lastWrittenIndexPerStage.resize(m_stageCount, 0); + m_isDataNewPerStage.resize(m_stageCount, false); + + VkSemaphoreTypeCreateInfo typeInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_TYPE_CREATE_INFO }; + typeInfo.semaphoreType = VK_SEMAPHORE_TYPE_TIMELINE; + typeInfo.initialValue = 0; + + VkSemaphoreCreateInfo createInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_CREATE_INFO }; + createInfo.pNext = &typeInfo; + + // Allocate a dedicated timeline semaphore for every stage-buffer intersection + for (u32 s = 0; s < m_stageCount; ++s) { + for (u32 b = 0; b < bufferCount; ++b) { + res = vkCreateSemaphore(m_devices[s], &createInfo, nullptr, &m_semaphores[s][b]); + CHECK_VK_RESULT(res, "vkCreateSemaphore\n"); + } + } + + // Prime Stage 1 (Index 1) to value 1 so Stage 0 is clear to write immediately on frame 0 + for (u32 b = 0; b < bufferCount; ++b) { + m_currentValues[1][b] = 1; + VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO }; + signalInfo.semaphore = m_semaphores[1][b]; + signalInfo.value = 1; + res = vkSignalSemaphore(m_devices[1], &signalInfo); + CHECK_VK_RESULT(res, "vkSignalSemaphore\n"); + } + } + + /** + * @brief Destructor. + */ + ~VulkanPipelineTracker() { + for (u32 s = 0; s < m_stageCount; ++s) { + for (u32 b = 0; b < m_bufferCount; ++b) { + if (m_semaphores[s][b] != VK_NULL_HANDLE) { + vkDestroySemaphore(m_devices[s], m_semaphores[s][b], nullptr); + } + } + } + } + + /** + * @brief Instantly queries the GPU without blocking to see if a stage is clear to submit. + * @details Stage 0 checks if Stage 1 is done reading. Downstream stages check their own history. + */ + bool IsStageReady(u32 stageId, u32 bufferIdx) { + VkResult res = VK_SUCCESS; + if (stageId == 0) { + u64 requiredValue = m_currentValues[1][bufferIdx]; + u64 currentGPUValue = 0; + res = vkGetSemaphoreCounterValue(m_devices[1], m_semaphores[1][bufferIdx], ¤tGPUValue); + CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); + return currentGPUValue >= requiredValue; + } + + u64 pastValue = m_currentValues[stageId][bufferIdx]; + if (pastValue == 0) return true; + + u64 currentGPUValue = 0; + res = vkGetSemaphoreCounterValue(m_devices[stageId], m_semaphores[stageId][bufferIdx], ¤tGPUValue); + CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); + return currentGPUValue >= pastValue; + } + + /** + * @brief Packages the semaphore handles and value needed to pass a signal across devices. + * @note To be called right after a queue submission. + */ + SyncJob ExtractCrossDeviceSyncJob(u32 producerStageId, u32 bufferIdx) { + u32 consumerStageId = (producerStageId + 1) % m_stageCount; + + SyncJob job{}; + job.deviceSource = m_devices[producerStageId]; + job.semSource = m_semaphores[producerStageId][bufferIdx]; + job.deviceTarget = m_devices[consumerStageId]; + job.semTarget = m_semaphores[consumerStageId][bufferIdx]; + job.waitValue = m_currentValues[producerStageId][bufferIdx]; + + return job; + } + + /** + * @brief Generates wait/signal info for Stage 0 based on the current loop iteration. + */ + void GetRootStageSync(u64 iteration, WorkloadSyncPack& outSync, u32& outWriteIdx) { + outWriteIdx = iteration % m_bufferCount; + + outSync.wait.semaphore = m_semaphores[1][outWriteIdx]; + outSync.wait.value = m_currentValues[1][outWriteIdx]; + + m_currentValues[0][outWriteIdx]++; + outSync.signal.semaphore = m_semaphores[0][outWriteIdx]; + outSync.signal.value = m_currentValues[0][outWriteIdx]; + + m_lastWrittenIndexPerStage[0] = outWriteIdx; + m_isDataNewPerStage[0] = true; + } + + /** + * @brief Generates wait/signal info for any downstream consumer stage (1 to N). + */ + void GetSubsequentStageSync(u32 stageId, WorkloadSyncPack& outSync, u32 bufferIdx) { + u32 producerStage = stageId - 1; + + outSync.wait.semaphore = m_semaphores[producerStage][bufferIdx]; + outSync.wait.value = m_currentValues[producerStage][bufferIdx]; + + m_currentValues[stageId][bufferIdx]++; + outSync.signal.semaphore = m_semaphores[stageId][bufferIdx]; + outSync.signal.value = m_currentValues[stageId][bufferIdx]; + + m_lastWrittenIndexPerStage[stageId] = bufferIdx; + m_isDataNewPerStage[stageId] = true; + m_isDataNewPerStage[producerStage] = false; + } + + /** + * @brief Checks if the immediate parent stage has a slot ready. + * @param outIdx Populated with the available buffer index if the function returns true. + */ + bool IsDataAvailableFromProducer(u32 stageId, u32& outIdx) { + outIdx = m_lastWrittenIndexPerStage[stageId - 1]; + return m_isDataNewPerStage[stageId - 1]; + } + + private: + std::vector m_devices; + u32 m_stageCount; + u32 m_bufferCount; + + std::vector> m_semaphores; + std::vector> m_currentValues; + + std::vector m_lastWrittenIndexPerStage; + std::vector m_isDataNewPerStage; + }; + + +} // namespace + +#endif // WORKLOAD_HEADER + +#ifdef WORKLOAD_COMPONENT +#endif