Refactor GameThreadPool for improved thread-safety and lifecycle management

This commit is contained in:
savis
2026-01-07 15:26:37 +01:00
parent 3b32269a74
commit b94b971e32
2 changed files with 95 additions and 29 deletions

View File

@@ -4,7 +4,6 @@
CGameThreadPool::CGameThreadPool()
: m_bShutdown(false)
, m_bInitialized(false)
, m_iNextWorkerIndex(0)
{
}
@@ -15,7 +14,9 @@ CGameThreadPool::~CGameThreadPool()
void CGameThreadPool::Initialize(int iWorkerCount)
{
if (m_bInitialized)
std::lock_guard<std::mutex> lock(m_lifecycleMutex);
if (m_bInitialized.load(std::memory_order_acquire))
{
TraceError("CGameThreadPool::Initialize - Already initialized!");
return;
@@ -35,30 +36,43 @@ void CGameThreadPool::Initialize(int iWorkerCount)
Tracef("CGameThreadPool::Initialize - Creating %d worker threads\n", iWorkerCount);
m_bShutdown.store(false, std::memory_order_release);
m_workers.clear();
m_workers.reserve(iWorkerCount);
// Initialize each worker
// First create all workers
for (int i = 0; i < iWorkerCount; ++i)
{
std::unique_ptr<TWorkerThread> pWorker(new TWorkerThread());
pWorker->pTaskQueue.reset(new SPSCQueue<TTask>(QUEUE_SIZE));
pWorker->bBusy.store(false, std::memory_order_relaxed);
auto pWorker = std::make_unique<TWorkerThread>();
pWorker->pTaskQueue = std::make_unique<SPSCQueue<TTask>>(QUEUE_SIZE);
pWorker->uTaskCount.store(0, std::memory_order_relaxed);
pWorker->thread = std::thread(&CGameThreadPool::WorkerThreadProc, this, i);
m_workers.push_back(std::move(pWorker));
}
// Mark as initialized before starting threads
m_bInitialized.store(true, std::memory_order_release);
// Then start threads after all workers are created
for (int i = 0; i < iWorkerCount; ++i)
{
TWorkerThread* pWorker = m_workers[i].get();
// Pass worker pointer directly instead of index
pWorker->thread = std::thread(&CGameThreadPool::WorkerThreadProc, this, pWorker);
}
}
void CGameThreadPool::Destroy()
{
if (!m_bInitialized)
std::lock_guard<std::mutex> lock(m_lifecycleMutex);
if (!m_bInitialized.load(std::memory_order_acquire))
return;
Tracef("CGameThreadPool::Destroy - Shutting down %d worker threads\n", GetWorkerCount());
// Signal shutdown first
m_bShutdown.store(true, std::memory_order_release);
// Mark as not initialized to prevent new enqueues
m_bInitialized.store(false, std::memory_order_release);
// Join all worker threads
@@ -71,17 +85,23 @@ void CGameThreadPool::Destroy()
m_workers.clear();
}
void CGameThreadPool::WorkerThreadProc(int iWorkerIndex)
void CGameThreadPool::WorkerThreadProc(TWorkerThread* pWorker)
{
TWorkerThread* pWorker = m_workers[iWorkerIndex].get();
int iIdleCount = 0;
while (!m_bShutdown.load(std::memory_order_acquire))
{
TTask task;
if (pWorker->pTaskQueue->Pop(task))
// Pop from queue with minimal locking
bool bHasTask = false;
{
std::lock_guard<std::mutex> lock(pWorker->queueMutex);
bHasTask = pWorker->pTaskQueue->Pop(task);
}
if (bHasTask)
{
pWorker->bBusy.store(true, std::memory_order_relaxed);
iIdleCount = 0;
// Execute the task
@@ -91,15 +111,14 @@ void CGameThreadPool::WorkerThreadProc(int iWorkerIndex)
}
catch (const std::exception& e)
{
TraceError("CGameThreadPool::WorkerThreadProc - Exception in worker %d: %s", iWorkerIndex, e.what());
TraceError("CGameThreadPool::WorkerThreadProc - Exception: %s", e.what());
}
catch (...)
{
TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception in worker %d", iWorkerIndex);
TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception");
}
pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed);
pWorker->bBusy.store(false, std::memory_order_relaxed);
}
else
{
@@ -120,9 +139,41 @@ void CGameThreadPool::WorkerThreadProc(int iWorkerIndex)
{
// Longer sleep for extended idle
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// Reset idle count to prevent overflow
if (iIdleCount > 10000)
iIdleCount = 1000;
}
}
}
// Process remaining tasks before shutdown
TTask task;
while (true)
{
bool bHasTask = false;
{
std::lock_guard<std::mutex> lock(pWorker->queueMutex);
bHasTask = pWorker->pTaskQueue->Pop(task);
}
if (!bHasTask)
break;
try
{
task();
}
catch (const std::exception& e)
{
TraceError("CGameThreadPool::WorkerThreadProc - Exception during shutdown: %s", e.what());
}
catch (...)
{
TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception during shutdown");
}
pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed);
}
}
int CGameThreadPool::SelectLeastBusyWorker() const

View File

@@ -8,6 +8,7 @@
#include <future>
#include <atomic>
#include <memory>
#include <mutex>
class CGameThreadPool : public CSingleton<CGameThreadPool>
{
@@ -35,30 +36,29 @@ public:
size_t GetPendingTaskCount() const;
// Check if pool is initialized
bool IsInitialized() const { return m_bInitialized; }
bool IsInitialized() const { return m_bInitialized.load(std::memory_order_acquire); }
private:
struct TWorkerThread
{
std::thread thread;
std::unique_ptr<SPSCQueue<TTask>> pTaskQueue;
std::atomic<bool> bBusy;
std::mutex queueMutex; // Mutex to protect SPSC queue from multiple producers
std::atomic<uint32_t> uTaskCount;
TWorkerThread()
: bBusy(false)
, uTaskCount(0)
: uTaskCount(0)
{
}
};
void WorkerThreadProc(int iWorkerIndex);
void WorkerThreadProc(TWorkerThread* pWorker);
int SelectLeastBusyWorker() const;
std::vector<std::unique_ptr<TWorkerThread>> m_workers;
std::atomic<bool> m_bShutdown;
std::atomic<bool> m_bInitialized;
std::atomic<int> m_iNextWorkerIndex; // For round-robin distribution
mutable std::mutex m_lifecycleMutex; // Protects initialization/destruction
static const size_t QUEUE_SIZE = 8192;
};
@@ -67,9 +67,14 @@ private:
template<typename TFunc>
std::future<void> CGameThreadPool::Enqueue(TFunc&& func)
{
if (!m_bInitialized)
// Lock to ensure thread pool isn't being destroyed
std::unique_lock<std::mutex> lock(m_lifecycleMutex);
if (!m_bInitialized.load(std::memory_order_acquire))
{
// If not initialized, execute on calling thread
lock.unlock(); // No need to hold lock
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
try
@@ -109,10 +114,24 @@ std::future<void> CGameThreadPool::Enqueue(TFunc&& func)
int iWorkerIndex = SelectLeastBusyWorker();
TWorkerThread* pWorker = m_workers[iWorkerIndex].get();
// Try to enqueue the task
if (!pWorker->pTaskQueue->Push(std::move(task)))
// Increment task count before pushing
pWorker->uTaskCount.fetch_add(1, std::memory_order_relaxed);
// Try to enqueue the task with mutex protection for SPSC queue
bool bPushed = false;
{
// Queue is full, execute on calling thread as fallback
std::lock_guard<std::mutex> queueLock(pWorker->queueMutex);
bPushed = pWorker->pTaskQueue->Push(std::move(task));
}
if (!bPushed)
{
// Queue is full, decrement count and execute on calling thread as fallback
pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed);
// Release lifecycle lock before executing task
lock.unlock();
try
{
(*pFunc)();
@@ -123,10 +142,6 @@ std::future<void> CGameThreadPool::Enqueue(TFunc&& func)
promise->set_exception(std::current_exception());
}
}
else
{
pWorker->uTaskCount.fetch_add(1, std::memory_order_relaxed);
}
return future;
}