/* * Copyright (c) 2026 Mykhailo Mamedov. All rights reserved. * * RESEARCH PREVIEW / REFERENCE ONLY: * This source code is provided solely for the purpose of reviewing * the author's research methods and implementation. * * NO LICENSE GRANTED: * This code is NOT for distribution, modification, or use in any * project (commercial or otherwise). Unauthorized copying or * use of this code is strictly prohibited. * * For inquiries regarding use or licensing, contact: ua.modin@gmail.com */ #ifndef WORKLOAD_HEADER #define WORKLOAD_HEADER #include #include #include #include namespace M_CORE { /** * @brief Container for a timeline semaphore handle and its target value. */ struct SubmissionSync { VkSemaphore semaphore = VK_NULL_HANDLE; u64 value = 0; }; /** * @brief Holds the input (wait) and output (signal) information for a queue submission. */ struct WorkloadSyncPack { SubmissionSync wait; SubmissionSync signal; }; /** * @brief Manages non-blocking CPU coordination and timeline semaphore tracking * for dynamic ring-buffered workloads operating across multiple Vulkan devices. */ class VulkanPipelineTracker { public: /** * @brief Allocates and initializes timeline semaphores for all stages and buffers. * @param stageCount The total number of sequential workloads/submissions in the pipeline. * @param stageDevices Array mapping each logical stage ID to its owning VkDevice handle. * @param bufferCount Total slots in the ring buffer. */ VulkanPipelineTracker(u32 stageCount, const std::vector& stageDevices, u32 bufferCount = 3) : m_devices(stageDevices), m_stageCount(stageCount), m_bufferCount(bufferCount) { VkResult res = VK_SUCCESS; m_semaphores.resize(m_stageCount, std::vector(bufferCount, VK_NULL_HANDLE)); m_currentValues.resize(m_stageCount, std::vector(bufferCount, 0)); m_lastWrittenIndexPerStage.resize(m_stageCount, 0); m_isDataNewPerStage.resize(m_stageCount, false); VkSemaphoreTypeCreateInfo typeInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_TYPE_CREATE_INFO }; typeInfo.semaphoreType = VK_SEMAPHORE_TYPE_TIMELINE; typeInfo.initialValue = 0; VkSemaphoreCreateInfo createInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_CREATE_INFO }; createInfo.pNext = &typeInfo; // Allocate a dedicated timeline semaphore for every stage-buffer intersection for (u32 s = 0; s < m_stageCount; ++s) { for (u32 b = 0; b < bufferCount; ++b) { res = vkCreateSemaphore(m_devices[s], &createInfo, nullptr, &m_semaphores[s][b]); CHECK_VK_RESULT(res, "vkCreateSemaphore\n"); } } // Prime Stage 1 (Index 1) to value 1 so Stage 0 is clear to write immediately on frame 0 for (u32 b = 0; b < bufferCount; ++b) { m_currentValues[1][b] = 1; VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO }; signalInfo.semaphore = m_semaphores[1][b]; signalInfo.value = 1; res = vkSignalSemaphore(m_devices[1], &signalInfo); CHECK_VK_RESULT(res, "vkSignalSemaphore\n"); } } /** * @brief Destructor. */ ~VulkanPipelineTracker() { for (u32 s = 0; s < m_stageCount; ++s) { for (u32 b = 0; b < m_bufferCount; ++b) { if (m_semaphores[s][b] != VK_NULL_HANDLE) { vkDestroySemaphore(m_devices[s], m_semaphores[s][b], nullptr); } } } } /** * @brief Instantly queries the GPU without blocking to see if a stage is clear to submit. * @details Stage 0 checks if Stage 1 is done reading. Downstream stages check their own history. */ bool IsStageReady(u32 stageId, u32 bufferIdx) { VkResult res = VK_SUCCESS; if (stageId == 0) { u64 requiredValue = m_currentValues[1][bufferIdx]; u64 currentGPUValue = 0; res = vkGetSemaphoreCounterValue(m_devices[1], m_semaphores[1][bufferIdx], ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); return currentGPUValue >= requiredValue; } u64 pastValue = m_currentValues[stageId][bufferIdx]; if (pastValue == 0) return true; u64 currentGPUValue = 0; res = vkGetSemaphoreCounterValue(m_devices[stageId], m_semaphores[stageId][bufferIdx], ¤tGPUValue); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); return currentGPUValue >= pastValue; } /** * @brief Packages the semaphore handles and value needed to pass a signal across devices. * @note To be called right after a queue submission. */ SyncJob ExtractCrossDeviceSyncJob(u32 producerStageId, u32 bufferIdx) { u32 consumerStageId = (producerStageId + 1) % m_stageCount; SyncJob job{}; job.deviceSource = m_devices[producerStageId]; job.semSource = m_semaphores[producerStageId][bufferIdx]; job.deviceTarget = m_devices[consumerStageId]; job.semTarget = m_semaphores[consumerStageId][bufferIdx]; job.waitValue = m_currentValues[producerStageId][bufferIdx]; return job; } /** * @brief Generates wait/signal info for Stage 0 based on the current loop iteration. */ void GetRootStageSync(u64 iteration, WorkloadSyncPack& outSync, u32& outWriteIdx) { outWriteIdx = iteration % m_bufferCount; outSync.wait.semaphore = m_semaphores[1][outWriteIdx]; outSync.wait.value = m_currentValues[1][outWriteIdx]; m_currentValues[0][outWriteIdx]++; outSync.signal.semaphore = m_semaphores[0][outWriteIdx]; outSync.signal.value = m_currentValues[0][outWriteIdx]; m_lastWrittenIndexPerStage[0] = outWriteIdx; m_isDataNewPerStage[0] = true; } /** * @brief Generates wait/signal info for any downstream consumer stage (1 to N). */ void GetSubsequentStageSync(u32 stageId, WorkloadSyncPack& outSync, u32 bufferIdx) { u32 producerStage = stageId - 1; outSync.wait.semaphore = m_semaphores[producerStage][bufferIdx]; outSync.wait.value = m_currentValues[producerStage][bufferIdx]; m_currentValues[stageId][bufferIdx]++; outSync.signal.semaphore = m_semaphores[stageId][bufferIdx]; outSync.signal.value = m_currentValues[stageId][bufferIdx]; m_lastWrittenIndexPerStage[stageId] = bufferIdx; m_isDataNewPerStage[stageId] = true; m_isDataNewPerStage[producerStage] = false; } /** * @brief Checks if the immediate parent stage has a slot ready. * @param outIdx Populated with the available buffer index if the function returns true. */ bool IsDataAvailableFromProducer(u32 stageId, u32& outIdx) { outIdx = m_lastWrittenIndexPerStage[stageId - 1]; return m_isDataNewPerStage[stageId - 1]; } private: std::vector m_devices; u32 m_stageCount; u32 m_bufferCount; std::vector> m_semaphores; std::vector> m_currentValues; std::vector m_lastWrittenIndexPerStage; std::vector m_isDataNewPerStage; }; } // namespace #endif // WORKLOAD_HEADER #ifdef WORKLOAD_COMPONENT #endif