Pipeline tracker refactoring, added perf metrics, ditched external semaphores.

This commit is contained in:
mik
2026-06-02 15:10:56 -04:00
parent 9d6f3eb100
commit e1a001018a

View File

@@ -17,189 +17,359 @@
#ifndef WORKLOAD_HEADER #ifndef WORKLOAD_HEADER
#define WORKLOAD_HEADER #define WORKLOAD_HEADER
#include <functional>
#include <unordered_map>
#include <vulkan/vulkan.h> #include <vulkan/vulkan.h>
#include <array> #include <array>
#include <vector> #include <vector>
#include <cstdint> #include <cstdint>
namespace M_CORE { namespace M_CORE {
struct Submission2;
}
namespace M_CORE {
namespace cfgPipe {
inline constexpr auto MAX_QUERYS_PER_DEVICE = 32U;
inline constexpr auto ONE_MICROSECOND = 1e-6f;
}
/** /**
* @brief Container for a timeline semaphore handle and its target value. * @brief Structured configuration defining a single pipe stage's execution environment.
*/ */
struct SubmissionSync { struct StageConfig {
VkSemaphore semaphore = VK_NULL_HANDLE; mDevice* deviceSrc = nullptr;
u64 value = 0; mDevice* deviceDst = nullptr;
const char* stageName = nullptr;
};
struct CrossDeviceLink {
mDevice* srcDevice = nullptr;
VkSemaphore srcSemaphore = nullptr;
mDevice* dstDevice = nullptr;
VkSemaphore dstSemaphore = nullptr;
u64 lastMirroredValue = 0; // The tracking floor for THIS cross-device link.
};
struct StageContext {
StageConfig config;
sem localTimelineSem;
u64 lastProducerValue = 0;
u64 m_gpuExecutionCounter = 0;
u32 readIdx = 0;
float lastTime = 0;
float deltaTime = 0;
VkQueryPool pool = nullptr;
};
struct StageMetrics {
u32 stageCount = 0;
VkQueryPool pool = nullptr;
}; };
/** /**
* @brief Holds the input (wait) and output (signal) information for a queue submission. * @brief Coordinates multi-stage workload pipelining and data flow across
*/ * independent Vulkan devices using single-threaded host synchronization.
struct WorkloadSyncPack { *
SubmissionSync wait; * @details This class decouples multi-GPU synchronization from driver-dependent
SubmissionSync signal; * external semaphore extensions. Instead of relying on native hardware casting—
}; * which frequently fails or deadlocks across different GPU architectures—this
* tracker isolates execution onto device-local timeline semaphores.
/** *
* @brief Manages non-blocking CPU coordination and timeline semaphore tracking * Cross-device data boundaries are linked safely via a synchro,
* for dynamic ring-buffered workloads operating across multiple Vulkan devices. * once-per-frame MirrorSignals() call executed on the main CPU loop. This function
* peeks at the raw execution states of producer semaphores and monotonically
* advances consumer landing pads by exactly one step per frame.
*/ */
class VulkanPipelineTracker { class VulkanPipelineTracker {
public: public:
/** VulkanPipelineTracker(mCore* m_core, const std::vector<StageConfig>& stages, u32 bufferCount=3)
* @brief Allocates and initializes timeline semaphores for all stages and buffers. : m_stageCount(static_cast<u32>(stages.size())), m_bufferCount(bufferCount)
* @param stageCount The total number of sequential workloads/submissions in the pipeline.
* @param stageDevices Array mapping each logical stage ID to its owning VkDevice handle.
* @param bufferCount Total slots in the ring buffer.
*/
VulkanPipelineTracker(u32 stageCount, const std::vector<VkDevice>& stageDevices, u32 bufferCount = 3)
: m_devices(stageDevices), m_stageCount(stageCount), m_bufferCount(bufferCount)
{ {
VkResult res = VK_SUCCESS; m_stageContexts.resize(m_stageCount);
m_semaphores.resize(m_stageCount, std::vector<VkSemaphore>(bufferCount, VK_NULL_HANDLE)); std::unordered_map<VkDevice, StageMetrics> countsMap;
m_currentValues.resize(m_stageCount, std::vector<u64>(bufferCount, 0));
m_lastWrittenIndexPerStage.resize(m_stageCount, 0);
m_isDataNewPerStage.resize(m_stageCount, false);
VkSemaphoreTypeCreateInfo typeInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_TYPE_CREATE_INFO };
typeInfo.semaphoreType = VK_SEMAPHORE_TYPE_TIMELINE;
typeInfo.initialValue = 0;
VkSemaphoreCreateInfo createInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_CREATE_INFO };
createInfo.pNext = &typeInfo;
// Allocate a dedicated timeline semaphore for every stage-buffer intersection
for (u32 s = 0; s < m_stageCount; ++s) { for (u32 s = 0; s < m_stageCount; ++s) {
for (u32 b = 0; b < bufferCount; ++b) { std::string semName = std::format("pipeLocal_S{}_{}", s, stages[s].stageName);
res = vkCreateSemaphore(m_devices[s], &createInfo, nullptr, &m_semaphores[s][b]);
CHECK_VK_RESULT(res, "vkCreateSemaphore\n"); StageContext& stageContext = m_stageContexts[s];
stageContext.config = stages[s];
semaphoreInfo info = {
stageContext.config.deviceSrc,
VkSemaphoreType::VK_SEMAPHORE_TYPE_TIMELINE,
0, // Initial value.
semName.c_str()
};
if (stageContext.config.deviceDst && stageContext.config.deviceDst->virtualDevice) {
info.flags = SEM_FLAGS::RELAY;
info.remoteDevice = stageContext.config.deviceDst;
} }
m_core->createSemaphore(info, stageContext.localTimelineSem);
if (stageContext.localTimelineSem.semaphoreRemote) {
AddCrossDeviceLink(
stageContext.config.deviceSrc,
stageContext.localTimelineSem.semaphore,
stageContext.config.deviceDst,
stageContext.localTimelineSem.semaphoreRemote, 0);
}
countsMap[stageContext.config.deviceSrc->virtualDevice].stageCount++;
} }
// Prime Stage 1 (Index 1) to value 1 so Stage 0 is clear to write immediately on frame 0 /**
for (u32 b = 0; b < bufferCount; ++b) { * @brief Create a query pool for each device.
m_currentValues[1][b] = 1; * Required for performance metrics for every stage.
VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO }; */
signalInfo.semaphore = m_semaphores[1][b]; for (auto& [device, metrics] : countsMap) {
signalInfo.value = 1; CreateQueryStagePool(device, cfgPipe::MAX_QUERYS_PER_DEVICE, metrics.pool);
res = vkSignalSemaphore(m_devices[1], &signalInfo); assert(metrics.pool);
CHECK_VK_RESULT(res, "vkSignalSemaphore\n"); }
// Link query pool to stages.
for (u32 s = 0; s < m_stageCount; ++s) {
StageContext& stageContext = m_stageContexts[s];
stageContext.pool = countsMap.at(stageContext.config.deviceSrc->virtualDevice).pool;
} }
} }
/**
* @brief Destructor.
*/
~VulkanPipelineTracker() { ~VulkanPipelineTracker() {
for (u32 s = 0; s < m_stageCount; ++s) { for (u32 s = 0; s < m_stageCount; ++s) {
for (u32 b = 0; b < m_bufferCount; ++b) { delete& m_stageContexts[s].localTimelineSem; // TODO: Add context destructor.
if (m_semaphores[s][b] != VK_NULL_HANDLE) { }
vkDestroySemaphore(m_devices[s], m_semaphores[s][b], nullptr); }
}
/**
* @brief Registers a cross-device pipeline handoff link.
*/
void AddCrossDeviceLink(mDevice* srcDev, VkSemaphore srcSem, mDevice* dstDev, VkSemaphore dstSem, u64 initialFloorValue = 0) {
CrossDeviceLink link{};
link.srcDevice = srcDev;
link.srcSemaphore = srcSem;
link.dstDevice = dstDev;
link.dstSemaphore = dstSem;
link.lastMirroredValue = initialFloorValue;
m_links.push_back(link);
}
/**
* @brief Processes all registered cross-device signals.
* @details Call this exactly ONCE per CPU frame loop tick (for now).
* It increments the destination timeline by exactly 1 step per frame.
*/
void MirrorSignals() {
for (size_t i = 0; i < m_links.size(); ++i) {
CrossDeviceLink& link = m_links[i];
// Peek at the live source GPU execution state (Instant, non-blocking check)
u64 currentSrcValue = 0;
VkResult srcRes = vkGetSemaphoreCounterValue(link.srcDevice->virtualDevice, link.srcSemaphore, &currentSrcValue);
CHECK_VK_RESULT(srcRes, "vkGetSemaphoreCounterValue\n");
// If the source has jumped ahead, advance the target by exactly 1 single step
if (currentSrcValue > link.lastMirroredValue) {
link.lastMirroredValue++; // (e.g., 1 -> 2)
VkSemaphoreSignalInfo signalInfo{ VK_STRUCTURE_TYPE_SEMAPHORE_SIGNAL_INFO };
signalInfo.semaphore = link.dstSemaphore;
signalInfo.value = link.lastMirroredValue;
// Safe CPU signal. Because it only fires once per frame, it will never thrash the WDDM driver kernel.
VkResult res = vkSignalSemaphore(link.dstDevice->virtualDevice, &signalInfo);
CHECK_VK_RESULT(res, "vkSignalSemaphore\n");
} }
} }
} }
/** /**
* @brief Instantly queries the GPU without blocking to see if a stage is clear to submit. * @brief Says True if scheduled workload has been completed.
* @details Stage 0 checks if Stage 1 is done reading. Downstream stages check their own history. * Also True if no workload was ever scheduled.
*/ */
bool IsStageReady(u32 stageId, u32 bufferIdx) { bool IsStageComplete(u32 stageId, u64& timeLineValue) {
VkResult res = VK_SUCCESS;
if (stageId == 0) { u64 currentSelfValue = m_stageContexts[stageId].m_gpuExecutionCounter;
u64 requiredValue = m_currentValues[1][bufferIdx]; if (currentSelfValue > 0) {
u64 currentGPUValue = 0; u64 currentGPUValue = 0;
res = vkGetSemaphoreCounterValue(m_devices[1], m_semaphores[1][bufferIdx], &currentGPUValue);
VkResult res = vkGetSemaphoreCounterValue(
m_stageContexts[stageId].config.deviceSrc->virtualDevice,
m_stageContexts[stageId].localTimelineSem.semaphore, &currentGPUValue);
CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n");
return currentGPUValue >= requiredValue; timeLineValue = currentGPUValue;
if (currentSelfValue != currentGPUValue)
return false;
} }
return true;
}
u64 pastValue = m_currentValues[stageId][bufferIdx]; /**
if (pastValue == 0) return true; * @brief temporary time counter. To be replaced with vKQueryPool
* DEPRECATED
*/
void UpdateTime(u32 stageId, float currentTime) {
StageContext* context = GetStageContext(stageId);
if (!context->lastTime) {
context->lastTime = currentTime;
}
else {
context->deltaTime = currentTime - context->lastTime;
context->lastTime = currentTime;
}
}
/**
* @brief Create timestamp query pool for specific device.
* Each device needs a pool.
*/
void CreateQueryStagePool(VkDevice device, u32 stageCount, VkQueryPool& resultPool) {
VkQueryPoolCreateInfo poolInfo = {};
poolInfo.sType = VK_STRUCTURE_TYPE_QUERY_POOL_CREATE_INFO;
poolInfo.queryType = VK_QUERY_TYPE_TIMESTAMP;
poolInfo.queryCount = stageCount * 2; // One for start, one for end
VkResult res = vkCreateQueryPool(device, &poolInfo, nullptr, &resultPool);
CHECK_VK_RESULT(res, "vkCreateQueryPool\n");
}
/**
* @brief A wrapper function designed to encapsulate the boilerplate
* of executing Vulkan commands while automatically measuring their actual duration on GPU.
*/
void recordTimedCommands(
VkCommandBuffer cmdBuffer,
StageContext* context,
u32 stageId,
std::function<void()> recordCommands)
{
u32 queryStart = stageId * 2;
u32 queryEnd = queryStart + 1;
// Reset start and end queries
vkCmdResetQueryPool(cmdBuffer, context->pool, queryStart, 2);
// Write Start Timestamp
// BOTTOM_OF_PIPE ?
vkCmdWriteTimestamp(cmdBuffer, VK_PIPELINE_STAGE_ALL_COMMANDS_BIT, context->pool, queryStart);
// User-provided command recording
recordCommands();
// Write End Timestamp
vkCmdWriteTimestamp(cmdBuffer, VK_PIPELINE_STAGE_ALL_COMMANDS_BIT, context->pool, queryEnd);
}
/**
* @brief Calculates the exact duration of a GPU operation stage in milliseconds
* by computing the difference between its recorded start and end timestamps.
* TODO: Move away.
*/
float getStageDurationMs(StageContext* context, u32 stageId) {
u64 results[2];
// Retrieve timestamp ticks (64-bit)
VkResult res = vkGetQueryPoolResults(
context->config.deviceSrc->virtualDevice, context->pool,
stageId * 2, 2,
sizeof(results), results,
sizeof(u64), VK_QUERY_RESULT_64_BIT
);
CHECK_VK_RESULT(res, "vkGetQueryPoolResults\n");
// Convert to milliseconds.
u64 ticksDelta = results[1] - results[0];
return static_cast<float>(ticksDelta)
* context->config.deviceSrc->phyDeviceData.m_devProps.limits.timestampPeriod
* cfgPipe::ONE_MICROSECOND;
}
/**
* @brief Says True if target index is not in progress by stage specified.
* says true if that index in finished also.
*/
bool IsIndexSafeToWrite(u32 stageId, u32 indexTarget) {
u64 stageScheduledValue = m_stageContexts[stageId].m_gpuExecutionCounter;
if (stageScheduledValue > 0) {
u64 currentGPUValue = 0;
VkResult res = vkGetSemaphoreCounterValue(
m_stageContexts[stageId].config.deviceSrc->virtualDevice,
m_stageContexts[stageId].localTimelineSem.semaphore,
&currentGPUValue);
CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n");
u32 scheduledWriteIdx = stageScheduledValue % m_bufferCount;
u32 activeReadIdx = m_stageContexts[stageId].readIdx;
u32 latestFinishedStageIndex = currentGPUValue % m_bufferCount;
if (indexTarget == activeReadIdx && scheduledWriteIdx != latestFinishedStageIndex) {
// The last scheduled index matches the target, and it wasn't finished yet by the stage specified.
return false;
}
}
return true;
}
/**
* @brief Returns previous finished index by stage.
*/
u32 getSafeIndex(u32 stageId, u64& timeLineValue) {
u64 currentGPUValue = 0; u64 currentGPUValue = 0;
res = vkGetSemaphoreCounterValue(m_devices[stageId], m_semaphores[stageId][bufferIdx], &currentGPUValue); VkResult res = vkGetSemaphoreCounterValue(
m_stageContexts[stageId].config.deviceSrc->virtualDevice,
m_stageContexts[stageId].localTimelineSem.semaphore,
&currentGPUValue);
CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n"); CHECK_VK_RESULT(res, "vkGetSemaphoreCounterValue\n");
return currentGPUValue >= pastValue;
if (currentGPUValue > 0) {
// Take previous index.
u32 index = currentGPUValue % m_bufferCount;
timeLineValue = currentGPUValue;
return index;
}
} }
/** /**
* @brief Packages the semaphore handles and value needed to pass a signal across devices. * @brief Provides semaphore object for the device configured
* @note To be called right after a queue submission. * and a new timeline value to signal on submission and reference to schedule counter.
* Stage supposed to call this when previous workload confirmed finished in IsStageComplete()
*
*/ */
SyncJob ExtractCrossDeviceSyncJob(u32 producerStageId, u32 bufferIdx) { void AdvanceStageSync(u32 stageId, u64& targetValue, sem*& semObject, u32 readIdxTarget) {
u32 consumerStageId = (producerStageId + 1) % m_stageCount; m_stageContexts[stageId].m_gpuExecutionCounter++;
m_stageContexts[stageId].readIdx = readIdxTarget;
SyncJob job{}; targetValue = m_stageContexts[stageId].m_gpuExecutionCounter;
job.deviceSource = m_devices[producerStageId]; semObject = &m_stageContexts[stageId].localTimelineSem;
job.semSource = m_semaphores[producerStageId][bufferIdx];
job.deviceTarget = m_devices[consumerStageId];
job.semTarget = m_semaphores[consumerStageId][bufferIdx];
job.waitValue = m_currentValues[producerStageId][bufferIdx];
return job;
} }
/** StageContext* GetStageContext(u32 stageId) {
* @brief Generates wait/signal info for Stage 0 based on the current loop iteration. return &m_stageContexts[stageId];
*/
void GetRootStageSync(u64 iteration, WorkloadSyncPack& outSync, u32& outWriteIdx) {
outWriteIdx = iteration % m_bufferCount;
outSync.wait.semaphore = m_semaphores[1][outWriteIdx];
outSync.wait.value = m_currentValues[1][outWriteIdx];
m_currentValues[0][outWriteIdx]++;
outSync.signal.semaphore = m_semaphores[0][outWriteIdx];
outSync.signal.value = m_currentValues[0][outWriteIdx];
m_lastWrittenIndexPerStage[0] = outWriteIdx;
m_isDataNewPerStage[0] = true;
} }
/** VkDevice GetStageDevice(u32 stageId) {
* @brief Generates wait/signal info for any downstream consumer stage (1 to N). assert(stageId < m_stageContexts.size() && "Wrong ID requested.");
*/ return m_stageContexts[stageId].config.deviceSrc->virtualDevice;
void GetSubsequentStageSync(u32 stageId, WorkloadSyncPack& outSync, u32 bufferIdx) {
u32 producerStage = stageId - 1;
outSync.wait.semaphore = m_semaphores[producerStage][bufferIdx];
outSync.wait.value = m_currentValues[producerStage][bufferIdx];
m_currentValues[stageId][bufferIdx]++;
outSync.signal.semaphore = m_semaphores[stageId][bufferIdx];
outSync.signal.value = m_currentValues[stageId][bufferIdx];
m_lastWrittenIndexPerStage[stageId] = bufferIdx;
m_isDataNewPerStage[stageId] = true;
m_isDataNewPerStage[producerStage] = false;
}
/**
* @brief Checks if the immediate parent stage has a slot ready.
* @param outIdx Populated with the available buffer index if the function returns true.
*/
bool IsDataAvailableFromProducer(u32 stageId, u32& outIdx) {
outIdx = m_lastWrittenIndexPerStage[stageId - 1];
return m_isDataNewPerStage[stageId - 1];
} }
private: private:
std::vector<VkDevice> m_devices;
u32 m_stageCount; u32 m_stageCount;
u32 m_bufferCount; u32 m_bufferCount;
std::vector<CrossDeviceLink> m_links;
std::vector<std::vector<VkSemaphore>> m_semaphores; std::vector<StageContext> m_stageContexts;
std::vector<std::vector<u64>> m_currentValues;
std::vector<u32> m_lastWrittenIndexPerStage;
std::vector<bool> m_isDataNewPerStage;
}; };
} // namespace } // namespace
#endif // WORKLOAD_HEADER #endif // WORKLOAD_HEADER
#ifdef WORKLOAD_COMPONENT #ifdef WORKLOAD_COMPONENT
namespace M_CORE {
} // namespace
#endif #endif