diff --git a/src/EterLib/GameThreadPool.cpp b/src/EterLib/GameThreadPool.cpp index 8289d36..90fdd7a 100644 --- a/src/EterLib/GameThreadPool.cpp +++ b/src/EterLib/GameThreadPool.cpp @@ -4,7 +4,6 @@ CGameThreadPool::CGameThreadPool() : m_bShutdown(false) , m_bInitialized(false) - , m_iNextWorkerIndex(0) { } @@ -15,7 +14,9 @@ CGameThreadPool::~CGameThreadPool() void CGameThreadPool::Initialize(int iWorkerCount) { - if (m_bInitialized) + std::lock_guard lock(m_lifecycleMutex); + + if (m_bInitialized.load(std::memory_order_acquire)) { TraceError("CGameThreadPool::Initialize - Already initialized!"); return; @@ -35,30 +36,43 @@ void CGameThreadPool::Initialize(int iWorkerCount) Tracef("CGameThreadPool::Initialize - Creating %d worker threads\n", iWorkerCount); m_bShutdown.store(false, std::memory_order_release); + m_workers.clear(); m_workers.reserve(iWorkerCount); - // Initialize each worker + // First create all workers for (int i = 0; i < iWorkerCount; ++i) { - std::unique_ptr pWorker(new TWorkerThread()); - pWorker->pTaskQueue.reset(new SPSCQueue(QUEUE_SIZE)); - pWorker->bBusy.store(false, std::memory_order_relaxed); + auto pWorker = std::make_unique(); + pWorker->pTaskQueue = std::make_unique>(QUEUE_SIZE); pWorker->uTaskCount.store(0, std::memory_order_relaxed); - pWorker->thread = std::thread(&CGameThreadPool::WorkerThreadProc, this, i); m_workers.push_back(std::move(pWorker)); } + // Mark as initialized before starting threads m_bInitialized.store(true, std::memory_order_release); + + // Then start threads after all workers are created + for (int i = 0; i < iWorkerCount; ++i) + { + TWorkerThread* pWorker = m_workers[i].get(); + // Pass worker pointer directly instead of index + pWorker->thread = std::thread(&CGameThreadPool::WorkerThreadProc, this, pWorker); + } } void CGameThreadPool::Destroy() { - if (!m_bInitialized) + std::lock_guard lock(m_lifecycleMutex); + + if (!m_bInitialized.load(std::memory_order_acquire)) return; Tracef("CGameThreadPool::Destroy - Shutting down %d worker threads\n", GetWorkerCount()); + // Signal shutdown first m_bShutdown.store(true, std::memory_order_release); + + // Mark as not initialized to prevent new enqueues m_bInitialized.store(false, std::memory_order_release); // Join all worker threads @@ -71,17 +85,23 @@ void CGameThreadPool::Destroy() m_workers.clear(); } -void CGameThreadPool::WorkerThreadProc(int iWorkerIndex) +void CGameThreadPool::WorkerThreadProc(TWorkerThread* pWorker) { - TWorkerThread* pWorker = m_workers[iWorkerIndex].get(); int iIdleCount = 0; while (!m_bShutdown.load(std::memory_order_acquire)) { TTask task; - if (pWorker->pTaskQueue->Pop(task)) + + // Pop from queue with minimal locking + bool bHasTask = false; + { + std::lock_guard lock(pWorker->queueMutex); + bHasTask = pWorker->pTaskQueue->Pop(task); + } + + if (bHasTask) { - pWorker->bBusy.store(true, std::memory_order_relaxed); iIdleCount = 0; // Execute the task @@ -91,15 +111,14 @@ void CGameThreadPool::WorkerThreadProc(int iWorkerIndex) } catch (const std::exception& e) { - TraceError("CGameThreadPool::WorkerThreadProc - Exception in worker %d: %s", iWorkerIndex, e.what()); + TraceError("CGameThreadPool::WorkerThreadProc - Exception: %s", e.what()); } catch (...) { - TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception in worker %d", iWorkerIndex); + TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception"); } pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed); - pWorker->bBusy.store(false, std::memory_order_relaxed); } else { @@ -120,9 +139,41 @@ void CGameThreadPool::WorkerThreadProc(int iWorkerIndex) { // Longer sleep for extended idle std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // Reset idle count to prevent overflow + if (iIdleCount > 10000) + iIdleCount = 1000; } } } + + // Process remaining tasks before shutdown + TTask task; + while (true) + { + bool bHasTask = false; + { + std::lock_guard lock(pWorker->queueMutex); + bHasTask = pWorker->pTaskQueue->Pop(task); + } + + if (!bHasTask) + break; + + try + { + task(); + } + catch (const std::exception& e) + { + TraceError("CGameThreadPool::WorkerThreadProc - Exception during shutdown: %s", e.what()); + } + catch (...) + { + TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception during shutdown"); + } + + pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed); + } } int CGameThreadPool::SelectLeastBusyWorker() const diff --git a/src/EterLib/GameThreadPool.h b/src/EterLib/GameThreadPool.h index d4c9059..a14ffb4 100644 --- a/src/EterLib/GameThreadPool.h +++ b/src/EterLib/GameThreadPool.h @@ -8,6 +8,7 @@ #include #include #include +#include class CGameThreadPool : public CSingleton { @@ -35,30 +36,29 @@ public: size_t GetPendingTaskCount() const; // Check if pool is initialized - bool IsInitialized() const { return m_bInitialized; } + bool IsInitialized() const { return m_bInitialized.load(std::memory_order_acquire); } private: struct TWorkerThread { std::thread thread; std::unique_ptr> pTaskQueue; - std::atomic bBusy; + std::mutex queueMutex; // Mutex to protect SPSC queue from multiple producers std::atomic uTaskCount; TWorkerThread() - : bBusy(false) - , uTaskCount(0) + : uTaskCount(0) { } }; - void WorkerThreadProc(int iWorkerIndex); + void WorkerThreadProc(TWorkerThread* pWorker); int SelectLeastBusyWorker() const; std::vector> m_workers; std::atomic m_bShutdown; std::atomic m_bInitialized; - std::atomic m_iNextWorkerIndex; // For round-robin distribution + mutable std::mutex m_lifecycleMutex; // Protects initialization/destruction static const size_t QUEUE_SIZE = 8192; }; @@ -67,9 +67,14 @@ private: template std::future CGameThreadPool::Enqueue(TFunc&& func) { - if (!m_bInitialized) + // Lock to ensure thread pool isn't being destroyed + std::unique_lock lock(m_lifecycleMutex); + + if (!m_bInitialized.load(std::memory_order_acquire)) { // If not initialized, execute on calling thread + lock.unlock(); // No need to hold lock + auto promise = std::make_shared>(); auto future = promise->get_future(); try @@ -109,10 +114,24 @@ std::future CGameThreadPool::Enqueue(TFunc&& func) int iWorkerIndex = SelectLeastBusyWorker(); TWorkerThread* pWorker = m_workers[iWorkerIndex].get(); - // Try to enqueue the task - if (!pWorker->pTaskQueue->Push(std::move(task))) + // Increment task count before pushing + pWorker->uTaskCount.fetch_add(1, std::memory_order_relaxed); + + // Try to enqueue the task with mutex protection for SPSC queue + bool bPushed = false; { - // Queue is full, execute on calling thread as fallback + std::lock_guard queueLock(pWorker->queueMutex); + bPushed = pWorker->pTaskQueue->Push(std::move(task)); + } + + if (!bPushed) + { + // Queue is full, decrement count and execute on calling thread as fallback + pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed); + + // Release lifecycle lock before executing task + lock.unlock(); + try { (*pFunc)(); @@ -123,10 +142,6 @@ std::future CGameThreadPool::Enqueue(TFunc&& func) promise->set_exception(std::current_exception()); } } - else - { - pWorker->uTaskCount.fetch_add(1, std::memory_order_relaxed); - } return future; }