From 7718c65d7a87b3396603b40211a4eeedc16d72cb Mon Sep 17 00:00:00 2001 From: savis <106487343+savisxss@users.noreply.github.com> Date: Mon, 5 Jan 2026 17:20:12 +0100 Subject: [PATCH] add GameThreadPool --- src/EterLib/GameThreadPool.cpp | 158 ++++++++++++++ src/EterLib/GameThreadPool.h | 132 ++++++++++++ src/EterLib/ResourceManager.cpp | 3 + src/EterLib/ResourceManager.h | 3 + src/GameLib/ActorInstanceRender.cpp | 4 + src/GameLib/AreaLoaderThread.cpp | 268 +++++++----------------- src/GameLib/AreaLoaderThread.h | 51 +---- src/GameLib/RaceManager.cpp | 208 ++++++++++++++---- src/GameLib/RaceManager.h | 10 + src/UserInterface/PythonApplication.cpp | 10 + src/UserInterface/PythonApplication.h | 1 + src/UserInterface/UserInterface.cpp | 12 +- 12 files changed, 575 insertions(+), 285 deletions(-) create mode 100644 src/EterLib/GameThreadPool.cpp create mode 100644 src/EterLib/GameThreadPool.h diff --git a/src/EterLib/GameThreadPool.cpp b/src/EterLib/GameThreadPool.cpp new file mode 100644 index 0000000..8289d36 --- /dev/null +++ b/src/EterLib/GameThreadPool.cpp @@ -0,0 +1,158 @@ +#include "StdAfx.h" +#include "GameThreadPool.h" + +CGameThreadPool::CGameThreadPool() + : m_bShutdown(false) + , m_bInitialized(false) + , m_iNextWorkerIndex(0) +{ +} + +CGameThreadPool::~CGameThreadPool() +{ + Destroy(); +} + +void CGameThreadPool::Initialize(int iWorkerCount) +{ + if (m_bInitialized) + { + TraceError("CGameThreadPool::Initialize - Already initialized!"); + return; + } + + // Determine worker count + if (iWorkerCount <= 0) + { + iWorkerCount = static_cast(std::thread::hardware_concurrency()); + if (iWorkerCount <= 0) + iWorkerCount = 4; // Fallback to 4 workers + } + + // Clamp worker count to reasonable range + iWorkerCount = std::max(2, std::min(16, iWorkerCount)); + + Tracef("CGameThreadPool::Initialize - Creating %d worker threads\n", iWorkerCount); + + m_bShutdown.store(false, std::memory_order_release); + m_workers.reserve(iWorkerCount); + + // Initialize each worker + for (int i = 0; i < iWorkerCount; ++i) + { + std::unique_ptr pWorker(new TWorkerThread()); + pWorker->pTaskQueue.reset(new SPSCQueue(QUEUE_SIZE)); + pWorker->bBusy.store(false, std::memory_order_relaxed); + pWorker->uTaskCount.store(0, std::memory_order_relaxed); + pWorker->thread = std::thread(&CGameThreadPool::WorkerThreadProc, this, i); + m_workers.push_back(std::move(pWorker)); + } + + m_bInitialized.store(true, std::memory_order_release); +} + +void CGameThreadPool::Destroy() +{ + if (!m_bInitialized) + return; + + Tracef("CGameThreadPool::Destroy - Shutting down %d worker threads\n", GetWorkerCount()); + + m_bShutdown.store(true, std::memory_order_release); + m_bInitialized.store(false, std::memory_order_release); + + // Join all worker threads + for (auto& pWorker : m_workers) + { + if (pWorker->thread.joinable()) + pWorker->thread.join(); + } + + m_workers.clear(); +} + +void CGameThreadPool::WorkerThreadProc(int iWorkerIndex) +{ + TWorkerThread* pWorker = m_workers[iWorkerIndex].get(); + int iIdleCount = 0; + + while (!m_bShutdown.load(std::memory_order_acquire)) + { + TTask task; + if (pWorker->pTaskQueue->Pop(task)) + { + pWorker->bBusy.store(true, std::memory_order_relaxed); + iIdleCount = 0; + + // Execute the task + try + { + task(); + } + catch (const std::exception& e) + { + TraceError("CGameThreadPool::WorkerThreadProc - Exception in worker %d: %s", iWorkerIndex, e.what()); + } + catch (...) + { + TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception in worker %d", iWorkerIndex); + } + + pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed); + pWorker->bBusy.store(false, std::memory_order_relaxed); + } + else + { + // No work available - idle strategy + ++iIdleCount; + + if (iIdleCount < 100) + { + // Spin briefly for immediate work + std::this_thread::yield(); + } + else if (iIdleCount < 1000) + { + // Short sleep for moderate idle + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + else + { + // Longer sleep for extended idle + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + } +} + +int CGameThreadPool::SelectLeastBusyWorker() const +{ + if (m_workers.empty()) + return 0; + + // Simple load balancing: find worker with least pending tasks + int iBestWorker = 0; + uint32_t uMinTasks = m_workers[0]->uTaskCount.load(std::memory_order_relaxed); + + for (size_t i = 1; i < m_workers.size(); ++i) + { + uint32_t uTasks = m_workers[i]->uTaskCount.load(std::memory_order_relaxed); + if (uTasks < uMinTasks) + { + uMinTasks = uTasks; + iBestWorker = static_cast(i); + } + } + + return iBestWorker; +} + +size_t CGameThreadPool::GetPendingTaskCount() const +{ + size_t uTotal = 0; + for (const auto& pWorker : m_workers) + { + uTotal += pWorker->uTaskCount.load(std::memory_order_relaxed); + } + return uTotal; +} diff --git a/src/EterLib/GameThreadPool.h b/src/EterLib/GameThreadPool.h new file mode 100644 index 0000000..d4c9059 --- /dev/null +++ b/src/EterLib/GameThreadPool.h @@ -0,0 +1,132 @@ +#pragma once + +#include "SPSCQueue.h" +#include "EterBase/Singleton.h" +#include +#include +#include +#include +#include +#include + +class CGameThreadPool : public CSingleton +{ +public: + using TTask = std::function; + + CGameThreadPool(); + ~CGameThreadPool(); + + // Initialize thread pool with specified worker count + // If count <= 0, uses hardware_concurrency + void Initialize(int iWorkerCount = -1); + + // Shutdown and join all worker threads + void Destroy(); + + // Enqueue a task and get a future to track completion + template + std::future Enqueue(TFunc&& func); + + // Get number of active workers + int GetWorkerCount() const { return static_cast(m_workers.size()); } + + // Get approximate number of pending tasks across all queues + size_t GetPendingTaskCount() const; + + // Check if pool is initialized + bool IsInitialized() const { return m_bInitialized; } + +private: + struct TWorkerThread + { + std::thread thread; + std::unique_ptr> pTaskQueue; + std::atomic bBusy; + std::atomic uTaskCount; + + TWorkerThread() + : bBusy(false) + , uTaskCount(0) + { + } + }; + + void WorkerThreadProc(int iWorkerIndex); + int SelectLeastBusyWorker() const; + + std::vector> m_workers; + std::atomic m_bShutdown; + std::atomic m_bInitialized; + std::atomic m_iNextWorkerIndex; // For round-robin distribution + + static const size_t QUEUE_SIZE = 8192; +}; + +// Template implementation +template +std::future CGameThreadPool::Enqueue(TFunc&& func) +{ + if (!m_bInitialized) + { + // If not initialized, execute on calling thread + auto promise = std::make_shared>(); + auto future = promise->get_future(); + try + { + func(); + promise->set_value(); + } + catch (...) + { + promise->set_exception(std::current_exception()); + } + return future; + } + + // Create a promise to track task completion + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + // Wrap function in shared_ptr to avoid move issues with std::function + auto pFunc = std::make_shared::type>(std::forward(func)); + + // Wrap the task with promise completion + TTask task = [promise, pFunc]() + { + try + { + (*pFunc)(); + promise->set_value(); + } + catch (...) + { + promise->set_exception(std::current_exception()); + } + }; + + // Select worker with least load + int iWorkerIndex = SelectLeastBusyWorker(); + TWorkerThread* pWorker = m_workers[iWorkerIndex].get(); + + // Try to enqueue the task + if (!pWorker->pTaskQueue->Push(std::move(task))) + { + // Queue is full, execute on calling thread as fallback + try + { + (*pFunc)(); + promise->set_value(); + } + catch (...) + { + promise->set_exception(std::current_exception()); + } + } + else + { + pWorker->uTaskCount.fetch_add(1, std::memory_order_relaxed); + } + + return future; +} diff --git a/src/EterLib/ResourceManager.cpp b/src/EterLib/ResourceManager.cpp index 8c6a714..e030137 100644 --- a/src/EterLib/ResourceManager.cpp +++ b/src/EterLib/ResourceManager.cpp @@ -252,6 +252,9 @@ void CResourceManager::RegisterResourceNewFunctionByTypePointer(int iType, CReso CResource * CResourceManager::InsertResourcePointer(DWORD dwFileCRC, CResource* pResource) { + // Thread-safe check and insert + std::lock_guard lock(m_ResourceMapMutex); + TResourcePointerMap::iterator itor = m_pResMap.find(dwFileCRC); if (m_pResMap.end() != itor) diff --git a/src/EterLib/ResourceManager.h b/src/EterLib/ResourceManager.h index 71ee2b4..9841d41 100644 --- a/src/EterLib/ResourceManager.h +++ b/src/EterLib/ResourceManager.h @@ -7,6 +7,7 @@ #include #include #include +#include class CTextureCache; @@ -76,6 +77,8 @@ class CResourceManager : public CSingleton static CFileLoaderThread ms_loadingThread; CFileLoaderThreadPool* m_pLoaderThreadPool; CTextureCache* m_pTextureCache; + + mutable std::mutex m_ResourceMapMutex; // Thread-safe resource map access }; extern int g_iLoadingDelayTime; \ No newline at end of file diff --git a/src/GameLib/ActorInstanceRender.cpp b/src/GameLib/ActorInstanceRender.cpp index 81066f0..e6e703a 100644 --- a/src/GameLib/ActorInstanceRender.cpp +++ b/src/GameLib/ActorInstanceRender.cpp @@ -32,6 +32,10 @@ void CActorInstance::SetMaterialAlpha(DWORD dwAlpha) void CActorInstance::OnRender() { + // Early out if race data is not loaded yet (async loading) + if (!m_pkCurRaceData) + return; + // MR-5: Fix effect rendering when actor is semi-transparent // Credits to d1str4ught if (GetAlphaValue() < 1.0f) diff --git a/src/GameLib/AreaLoaderThread.cpp b/src/GameLib/AreaLoaderThread.cpp index 268934d..ce84a25 100644 --- a/src/GameLib/AreaLoaderThread.cpp +++ b/src/GameLib/AreaLoaderThread.cpp @@ -5,6 +5,7 @@ #include "StdAfx.h" #include "EterLib/ResourceManager.h" +#include "EterLib/GameThreadPool.h" #include "AreaLoaderThread.h" #include "AreaTerrain.h" @@ -14,213 +15,112 @@ // Construction/Destruction ////////////////////////////////////////////////////////////////////// -TEMP_CAreaLoaderThread::TEMP_CAreaLoaderThread() : m_bShutdowned(false), m_pArg(NULL), m_hThread(NULL), m_uThreadID(0) +TEMP_CAreaLoaderThread::TEMP_CAreaLoaderThread() : m_bShutdowned(false) { } TEMP_CAreaLoaderThread::~TEMP_CAreaLoaderThread() { - Destroy(); + Shutdown(); } bool TEMP_CAreaLoaderThread::Create(void * arg) { - Arg(arg); - m_hThread = (HANDLE) _beginthreadex(NULL, 0, EntryPoint, this, 0, &m_uThreadID); - - if (!m_hThread) - return false; - - SetThreadPriority(m_hThread, THREAD_PRIORITY_NORMAL); + // Modern implementation doesn't need explicit thread creation + // The global CGameThreadPool handles threading + m_bShutdowned = false; return true; } -UINT TEMP_CAreaLoaderThread::Run(void * arg) -{ - if (!Setup()) - return 0; - - return (Execute(arg)); -} - -/* Static */ -UINT CALLBACK TEMP_CAreaLoaderThread::EntryPoint(void * pThis) -{ - TEMP_CAreaLoaderThread * pThread = (TEMP_CAreaLoaderThread *) pThis; - return pThread->Run(pThread->Arg()); -} - -////////////////////////////////////////////////////////////////////////// -void TEMP_CAreaLoaderThread::Destroy() -{ - if (m_hSemaphore) - { - CloseHandle(m_hSemaphore); - m_hSemaphore = NULL; - } - -/* - while(!m_pTerrainRequestDeque.empty()) - { - CTerrain * pTerrain = m_pTerrainRequestDeque.front(); - delete pTerrain; - pTerrain = NULL; - m_pTerrainRequestDeque.pop_front(); - } - - while(!m_pTerrainCompleteDeque.empty()) - { - CTerrain * pTerrain = m_pTerrainCompleteDeque.front(); - delete pTerrain; - pTerrain = NULL; - m_pTerrainCompleteDeque.pop_front(); - } -*/ - - /*stl_wipe(m_pTerrainRequestDeque); - stl_wipe(m_pTerrainCompleteDeque); - stl_wipe(m_pAreaRequestDeque); - stl_wipe(m_pAreaCompleteDeque);*/ -} - -UINT TEMP_CAreaLoaderThread::Setup() -{ - m_hSemaphore = CreateSemaphore(NULL, // no security attributes - 0, // initial count - 65535, // maximum count - NULL); // unnamed semaphore - if (!m_hSemaphore) - return 0; - - return 1; -} - void TEMP_CAreaLoaderThread::Shutdown() { - if (!m_hSemaphore) - return; - - BOOL bRet; - m_bShutdowned = true; - do + // Clear any pending completed items { - bRet = ReleaseSemaphore(m_hSemaphore, 1, NULL); - } - while (!bRet); - - WaitForSingleObject(m_hThread, 10000); // 쓰레드가 종료 되기를 10초 기다림 -} - -UINT TEMP_CAreaLoaderThread::Execute(void * pvArg) -{ - bool bProcessTerrain = true; - while (!m_bShutdowned) - { - DWORD dwWaitResult; - - dwWaitResult = WaitForSingleObject(m_hSemaphore, INFINITE); - - if (m_bShutdowned) - break; - - switch (dwWaitResult) - { - case WAIT_OBJECT_0: - if (bProcessTerrain) - ProcessTerrain(); - else - ProcessArea(); - break; - case WAIT_TIMEOUT: - TraceError("TEMP_CAreaLoaderThread::Execute: Timeout occured while time-out interval is INIFITE"); - break; - } + std::lock_guard lock(m_TerrainCompleteMutex); + m_pTerrainCompleteDeque.clear(); } - Destroy(); - return 1; + { + std::lock_guard lock(m_AreaCompleteMutex); + m_pAreaCompleteDeque.clear(); + } } -void TEMP_CAreaLoaderThread::Request(CTerrain * pTerrain) // called in main thread +void TEMP_CAreaLoaderThread::Request(CTerrain * pTerrain) { - m_TerrainRequestMutex.Lock(); - m_pTerrainRequestDeque.push_back(pTerrain); - m_TerrainRequestMutex.Unlock(); + if (m_bShutdowned) + return; - ++m_iRestSemCount; - - if (!ReleaseSemaphore(m_hSemaphore, m_iRestSemCount, NULL)) - TraceError("TEMP_CAreaLoaderThread::Request: ReleaseSemaphore error"); - - --m_iRestSemCount; + // Enqueue terrain loading to the global thread pool + CGameThreadPool* pThreadPool = CGameThreadPool::InstancePtr(); + if (pThreadPool) + { + pThreadPool->Enqueue([this, pTerrain]() + { + ProcessTerrain(pTerrain); + }); + } + else + { + // Fallback to synchronous loading if thread pool not available + ProcessTerrain(pTerrain); + } } -bool TEMP_CAreaLoaderThread::Fetch(CTerrain ** ppTerrain) // called in main thread +bool TEMP_CAreaLoaderThread::Fetch(CTerrain ** ppTerrain) { - m_TerrainCompleteMutex.Lock(); + std::lock_guard lock(m_TerrainCompleteMutex); if (m_pTerrainCompleteDeque.empty()) - { - m_TerrainCompleteMutex.Unlock(); return false; - } *ppTerrain = m_pTerrainCompleteDeque.front(); m_pTerrainCompleteDeque.pop_front(); - m_TerrainCompleteMutex.Unlock(); return true; } -void TEMP_CAreaLoaderThread::Request(CArea * pArea) // called in main thread +void TEMP_CAreaLoaderThread::Request(CArea * pArea) { - m_AreaRequestMutex.Lock(); - m_pAreaRequestDeque.push_back(pArea); - m_AreaRequestMutex.Unlock(); + if (m_bShutdowned) + return; - ++m_iRestSemCount; - - if (!ReleaseSemaphore(m_hSemaphore, m_iRestSemCount, NULL)) - TraceError("TEMP_CAreaLoaderThread::Request: ReleaseSemaphore error"); - - --m_iRestSemCount; + // Enqueue area loading to the global thread pool + CGameThreadPool* pThreadPool = CGameThreadPool::InstancePtr(); + if (pThreadPool) + { + pThreadPool->Enqueue([this, pArea]() + { + ProcessArea(pArea); + }); + } + else + { + // Fallback to synchronous loading if thread pool not available + ProcessArea(pArea); + } } -bool TEMP_CAreaLoaderThread::Fetch(CArea ** ppArea) // called in main thread +bool TEMP_CAreaLoaderThread::Fetch(CArea ** ppArea) { - m_AreaCompleteMutex.Lock(); + std::lock_guard lock(m_AreaCompleteMutex); if (m_pAreaCompleteDeque.empty()) - { - m_AreaCompleteMutex.Unlock(); return false; - } *ppArea = m_pAreaCompleteDeque.front(); m_pAreaCompleteDeque.pop_front(); - m_AreaCompleteMutex.Unlock(); return true; } -void TEMP_CAreaLoaderThread::ProcessArea() // called in loader thread +void TEMP_CAreaLoaderThread::ProcessArea(CArea * pArea) { - m_AreaRequestMutex.Lock(); - - if (m_pAreaRequestDeque.empty()) - { - m_AreaRequestMutex.Unlock(); + if (m_bShutdowned) return; - } - - CArea * pArea = m_pAreaRequestDeque.front(); - m_pAreaRequestDeque.pop_front(); - - Tracef("TEMP_CAreaLoaderThread::ProcessArea() RequestDeque Size : %d\n", m_pAreaRequestDeque.size()); - m_AreaRequestMutex.Unlock(); DWORD dwStartTime = ELTimer_GetMSec(); @@ -238,28 +138,17 @@ void TEMP_CAreaLoaderThread::ProcessArea() // called in loader thread Tracef("TEMP_CAreaLoaderThread::ProcessArea LoadArea : %d ms elapsed\n", ELTimer_GetMSec() - dwStartTime); - m_AreaCompleteMutex.Lock(); - m_pAreaCompleteDeque.push_back(pArea); - m_AreaCompleteMutex.Unlock(); - - Sleep(g_iLoadingDelayTime); + // Add to completed queue + { + std::lock_guard lock(m_AreaCompleteMutex); + m_pAreaCompleteDeque.push_back(pArea); + } } -void TEMP_CAreaLoaderThread::ProcessTerrain() // called in loader thread +void TEMP_CAreaLoaderThread::ProcessTerrain(CTerrain * pTerrain) { - m_TerrainRequestMutex.Lock(); - - if (m_pTerrainRequestDeque.empty()) - { - m_TerrainRequestMutex.Unlock(); + if (m_bShutdowned) return; - } - - CTerrain * pTerrain = m_pTerrainRequestDeque.front(); - m_pTerrainRequestDeque.pop_front(); - - Tracef("TEMP_CAreaLoaderThread::ProcessTerrain() RequestDeque Size : %d\n", m_pTerrainRequestDeque.size()); - m_TerrainRequestMutex.Unlock(); DWORD dwStartTime = ELTimer_GetMSec(); @@ -271,26 +160,24 @@ void TEMP_CAreaLoaderThread::ProcessTerrain() // called in loader thread const std::string & c_rStrMapName = pTerrain->GetOwner()->GetName(); char filename[256]; sprintf(filename, "%s\\%06u\\AreaProperty.txt", c_rStrMapName.c_str(), dwID); - + CTokenVectorMap stTokenVectorMap; - + if (!LoadMultipleTextData(filename, stTokenVectorMap)) return; - - Sleep(g_iLoadingDelayTime); if (stTokenVectorMap.end() == stTokenVectorMap.find("scripttype")) return; - + if (stTokenVectorMap.end() == stTokenVectorMap.find("areaname")) return; - + const std::string & c_rstrType = stTokenVectorMap["scripttype"][0]; const std::string & c_rstrAreaName = stTokenVectorMap["areaname"][0]; - + if (c_rstrType != "AreaProperty") return; - + char szRawHeightFieldname[64+1]; char szWaterMapName[64+1]; char szAttrMapName[64+1]; @@ -298,7 +185,7 @@ void TEMP_CAreaLoaderThread::ProcessTerrain() // called in loader thread char szShadowMapName[64+1]; char szMiniMapTexName[64+1]; char szSplatName[64+1]; - + _snprintf(szRawHeightFieldname, sizeof(szRawHeightFieldname), "%s\\%06u\\height.raw", c_rStrMapName.c_str(), dwID); _snprintf(szSplatName, sizeof(szSplatName), "%s\\%06u\\tile.raw", c_rStrMapName.c_str(), dwID); _snprintf(szAttrMapName, sizeof(szAttrMapName), "%s\\%06u\\attr.atr", c_rStrMapName.c_str(), dwID); @@ -306,35 +193,26 @@ void TEMP_CAreaLoaderThread::ProcessTerrain() // called in loader thread _snprintf(szShadowTexName, sizeof(szShadowTexName), "%s\\%06u\\shadowmap.dds", c_rStrMapName.c_str(), dwID); _snprintf(szShadowMapName, sizeof(szShadowMapName), "%s\\%06u\\shadowmap.raw", c_rStrMapName.c_str(), dwID); _snprintf(szMiniMapTexName, sizeof(szMiniMapTexName), "%s\\%06u\\minimap.dds", c_rStrMapName.c_str(), dwID); - + pTerrain->CopySettingFromGlobalSetting(); pTerrain->LoadWaterMap(szWaterMapName); - Sleep(g_iLoadingDelayTime); pTerrain->LoadHeightMap(szRawHeightFieldname); - Sleep(g_iLoadingDelayTime); pTerrain->LoadAttrMap(szAttrMapName); - Sleep(g_iLoadingDelayTime); pTerrain->RAW_LoadTileMap(szSplatName, true); - Sleep(g_iLoadingDelayTime); pTerrain->LoadShadowTexture(szShadowTexName); - Sleep(g_iLoadingDelayTime); pTerrain->LoadShadowMap(szShadowMapName); - Sleep(g_iLoadingDelayTime); pTerrain->LoadMiniMapTexture(szMiniMapTexName); - Sleep(g_iLoadingDelayTime); pTerrain->SetName(c_rstrAreaName.c_str()); - Sleep(g_iLoadingDelayTime); pTerrain->CalculateTerrainPatch(); - Sleep(g_iLoadingDelayTime); pTerrain->SetReady(); Tracef("TEMP_CAreaLoaderThread::ProcessTerrain LoadTerrain : %d ms elapsed\n", ELTimer_GetMSec() - dwStartTime); - m_TerrainCompleteMutex.Lock(); - m_pTerrainCompleteDeque.push_back(pTerrain); - m_TerrainCompleteMutex.Unlock(); - - Sleep(g_iLoadingDelayTime); + // Add to completed queue + { + std::lock_guard lock(m_TerrainCompleteMutex); + m_pTerrainCompleteDeque.push_back(pTerrain); + } } diff --git a/src/GameLib/AreaLoaderThread.h b/src/GameLib/AreaLoaderThread.h index dd42740..52a37fe 100644 --- a/src/GameLib/AreaLoaderThread.h +++ b/src/GameLib/AreaLoaderThread.h @@ -2,19 +2,15 @@ // ////////////////////////////////////////////////////////////////////// -#if !defined(AFX_AREALOADERTHREAD_H__E43FBE42_42F4_4F0E_B9DA_D7B7C5EA0753__INCLUDED_) -#define AFX_AREALOADERTHREAD_H__E43FBE42_42F4_4F0E_B9DA_D7B7C5EA0753__INCLUDED_ - -#if _MSC_VER > 1000 #pragma once -#endif // _MSC_VER > 1000 -#include "EterLib/Mutex.h" +#include +#include class CTerrain; class CArea; -class TEMP_CAreaLoaderThread +class TEMP_CAreaLoaderThread { public: TEMP_CAreaLoaderThread(); @@ -24,50 +20,21 @@ public: void Shutdown(); void Request(CTerrain * pTerrain); - bool Fetch(CTerrain ** ppTerrian); void Request(CArea * pArea); - bool Fetch(CArea ** ppArea); -protected: - static UINT CALLBACK EntryPoint(void * pThis); - UINT Run(void * arg); - - void * Arg() const { return m_pArg; } - void Arg(void * arg) { m_pArg = arg; } - - HANDLE m_hThread; - private: - void * m_pArg; - unsigned m_uThreadID; - -protected: - UINT Setup(); - UINT Execute(void * pvArg); - void Destroy(); - void ProcessTerrain(); - void ProcessArea(); - + void ProcessTerrain(CTerrain * pTerrain); + void ProcessArea(CArea * pArea); + private: - std::deque m_pTerrainRequestDeque; - Mutex m_TerrainRequestMutex; - std::deque m_pTerrainCompleteDeque; - Mutex m_TerrainCompleteMutex; - - std::deque m_pAreaRequestDeque; - Mutex m_AreaRequestMutex; - + std::mutex m_TerrainCompleteMutex; + std::deque m_pAreaCompleteDeque; - Mutex m_AreaCompleteMutex; + std::mutex m_AreaCompleteMutex; - HANDLE m_hSemaphore; - int m_iRestSemCount; bool m_bShutdowned; - }; - -#endif // !defined(AFX_AREALOADERTHREAD_H__E43FBE42_42F4_4F0E_B9DA_D7B7C5EA0753__INCLUDED_) diff --git a/src/GameLib/RaceManager.cpp b/src/GameLib/RaceManager.cpp index 03ab625..306a4cb 100644 --- a/src/GameLib/RaceManager.cpp +++ b/src/GameLib/RaceManager.cpp @@ -2,6 +2,7 @@ #include "RaceManager.h" #include "RaceMotionData.h" #include "PackLib/PackManager.h" +#include "EterLib/GameThreadPool.h" #include #include #include @@ -374,25 +375,61 @@ CRaceData * CRaceManager::GetSelectedRaceDataPointer() BOOL CRaceManager::GetRaceDataPointer(DWORD dwRaceIndex, CRaceData ** ppRaceData) { - TRaceDataIterator itor = m_RaceDataMap.find(dwRaceIndex); - - if (m_RaceDataMap.end() == itor) + // Thread-safe lookup { - CRaceData* pRaceData = __LoadRaceData(dwRaceIndex); + std::lock_guard lock(m_RaceDataMapMutex); + TRaceDataIterator itor = m_RaceDataMap.find(dwRaceIndex); - if (pRaceData) + if (m_RaceDataMap.end() != itor) { - m_RaceDataMap.insert(TRaceDataMap::value_type(dwRaceIndex, pRaceData)); - *ppRaceData = pRaceData; + *ppRaceData = itor->second; return TRUE; } - - TraceError("CRaceManager::GetRaceDataPointer: cannot load data by dwRaceIndex %lu", dwRaceIndex); - return FALSE; } - *ppRaceData = itor->second; - return TRUE; + // Check if already loading asynchronously + { + std::lock_guard lock(m_LoadingRacesMutex); + if (m_LoadingRaces.find(dwRaceIndex) != m_LoadingRaces.end()) + { + // Race is being loaded asynchronously + // Wait for it to complete by loading synchronously now (needed for immediate visibility) + Tracef("CRaceManager::GetRaceDataPointer: Race %lu is loading async, switching to sync load\n", dwRaceIndex); + } + } + + // Not found - load synchronously to ensure immediate availability + CRaceData* pRaceData = __LoadRaceData(dwRaceIndex); + + if (pRaceData) + { + std::lock_guard lock(m_RaceDataMapMutex); + // Check again in case another thread loaded it + TRaceDataIterator itor = m_RaceDataMap.find(dwRaceIndex); + if (m_RaceDataMap.end() != itor) + { + // Already loaded by another thread, use that one + delete pRaceData; + *ppRaceData = itor->second; + } + else + { + // Insert our newly loaded data + m_RaceDataMap.insert(TRaceDataMap::value_type(dwRaceIndex, pRaceData)); + *ppRaceData = pRaceData; + } + + // Remove from loading set if present + { + std::lock_guard lock2(m_LoadingRacesMutex); + m_LoadingRaces.erase(dwRaceIndex); + } + + return TRUE; + } + + *ppRaceData = NULL; + return FALSE; } void CRaceManager::SetPathName(const char * c_szPathName) @@ -462,27 +499,19 @@ void CRaceManager::PreloadPlayerRaceMotions() CRaceManager& rkRaceMgr = CRaceManager::Instance(); - // Phase 1: Parallel Load Race Data (MSM) - std::vector> raceLoadFutures; - for (DWORD dwRace = 0; dwRace <= 7; ++dwRace) { TRaceDataIterator it = rkRaceMgr.m_RaceDataMap.find(dwRace); - if (it == rkRaceMgr.m_RaceDataMap.end()) { - raceLoadFutures.push_back(std::async(std::launch::async, [&rkRaceMgr, dwRace]() { - return rkRaceMgr.__LoadRaceData(dwRace); - })); + if (it == rkRaceMgr.m_RaceDataMap.end()) + { + CRaceData* pRaceData = rkRaceMgr.__LoadRaceData(dwRace); + if (pRaceData) + { + rkRaceMgr.m_RaceDataMap.insert(TRaceDataMap::value_type(pRaceData->GetRaceIndex(), pRaceData)); + } } } - for (auto& f : raceLoadFutures) { - CRaceData* pRaceData = f.get(); - if (pRaceData) { - rkRaceMgr.m_RaceDataMap.insert(TRaceDataMap::value_type(pRaceData->GetRaceIndex(), pRaceData)); - } - } - - // Phase 2: Parallel Load Motions std::set uniqueMotions; for (DWORD dwRace = 0; dwRace <= 7; ++dwRace) @@ -518,28 +547,117 @@ void CRaceManager::PreloadPlayerRaceMotions() std::vector motionVec(uniqueMotions.begin(), uniqueMotions.end()); size_t total = motionVec.size(); - if (total > 0) { - size_t threadCount = std::thread::hardware_concurrency(); - if (threadCount == 0) threadCount = 4; - - size_t chunkSize = (total + threadCount - 1) / threadCount; - std::vector> motionFutures; + if (total > 0) + { + CGameThreadPool* pThreadPool = CGameThreadPool::InstancePtr(); + if (pThreadPool && pThreadPool->IsInitialized()) + { + size_t workerCount = pThreadPool->GetWorkerCount(); + size_t chunkSize = (total + workerCount - 1) / workerCount; - for (size_t i = 0; i < threadCount; ++i) { - size_t start = i * chunkSize; - size_t end = std::min(start + chunkSize, total); - - if (start < end) { - motionFutures.push_back(std::async(std::launch::async, [start, end, &motionVec]() { - for (size_t k = start; k < end; ++k) { - motionVec[k]->AddReference(); - } - })); + std::vector> futures; + futures.reserve(workerCount); + + for (size_t i = 0; i < workerCount; ++i) + { + size_t start = i * chunkSize; + size_t end = std::min(start + chunkSize, total); + + if (start < end) + { + // Copy values instead of capturing by reference + futures.push_back(pThreadPool->Enqueue([start, end, motionVec]() { + for (size_t k = start; k < end; ++k) + { + motionVec[k]->AddReference(); + } + })); + } + } + + // Wait for all tasks to complete + for (auto& f : futures) + { + f.wait(); + } + } + else + { + // Fallback to sequential if thread pool not available + for (auto* pMotion : motionVec) + { + pMotion->AddReference(); } } - - for (auto& f : motionFutures) f.get(); } s_bPreloaded = true; } + +void CRaceManager::RequestAsyncRaceLoad(DWORD dwRaceIndex) +{ + // Mark as loading + { + std::lock_guard lock(m_LoadingRacesMutex); + if (m_LoadingRaces.find(dwRaceIndex) != m_LoadingRaces.end()) + { + // Already loading + return; + } + m_LoadingRaces.insert(dwRaceIndex); + } + + // Enqueue async load to game thread pool + CGameThreadPool* pThreadPool = CGameThreadPool::InstancePtr(); + if (pThreadPool) + { + pThreadPool->Enqueue([this, dwRaceIndex]() + { + CRaceData* pRaceData = __LoadRaceData(dwRaceIndex); + + if (pRaceData) + { + // Thread-safe insertion + { + std::lock_guard lock(m_RaceDataMapMutex); + m_RaceDataMap.insert(TRaceDataMap::value_type(dwRaceIndex, pRaceData)); + } + + Tracef("CRaceManager::RequestAsyncRaceLoad: Successfully loaded race %lu asynchronously\n", dwRaceIndex); + } + else + { + TraceError("CRaceManager::RequestAsyncRaceLoad: Failed to load race %lu", dwRaceIndex); + } + + // Remove from loading set + { + std::lock_guard lock(m_LoadingRacesMutex); + m_LoadingRaces.erase(dwRaceIndex); + } + }); + } + else + { + // Fallback to synchronous loading if thread pool not available + CRaceData* pRaceData = __LoadRaceData(dwRaceIndex); + + if (pRaceData) + { + std::lock_guard lock(m_RaceDataMapMutex); + m_RaceDataMap.insert(TRaceDataMap::value_type(dwRaceIndex, pRaceData)); + } + + // Remove from loading set + { + std::lock_guard lock(m_LoadingRacesMutex); + m_LoadingRaces.erase(dwRaceIndex); + } + } +} + +bool CRaceManager::IsRaceLoading(DWORD dwRaceIndex) const +{ + std::lock_guard lock(m_LoadingRacesMutex); + return m_LoadingRaces.find(dwRaceIndex) != m_LoadingRaces.end(); +} diff --git a/src/GameLib/RaceManager.h b/src/GameLib/RaceManager.h index 88fb01d..e2e5f27 100644 --- a/src/GameLib/RaceManager.h +++ b/src/GameLib/RaceManager.h @@ -1,6 +1,8 @@ #pragma once #include "RaceData.h" +#include +#include class CRaceManager : public CSingleton { @@ -29,6 +31,10 @@ class CRaceManager : public CSingleton BOOL GetRaceDataPointer(DWORD dwRaceIndex, CRaceData ** ppRaceData); + // Async race loading + void RequestAsyncRaceLoad(DWORD dwRaceIndex); + bool IsRaceLoading(DWORD dwRaceIndex) const; + // Race motion preloading static void PreloadPlayerRaceMotions(); static bool IsPreloaded() { return s_bPreloaded; } @@ -42,6 +48,10 @@ class CRaceManager : public CSingleton protected: TRaceDataMap m_RaceDataMap; + mutable std::mutex m_RaceDataMapMutex; + + std::set m_LoadingRaces; + mutable std::mutex m_LoadingRacesMutex; std::map m_kMap_stRaceName_stSrcName; std::map m_kMap_dwRaceKey_stRaceName; diff --git a/src/UserInterface/PythonApplication.cpp b/src/UserInterface/PythonApplication.cpp index fcd103e..fe40388 100644 --- a/src/UserInterface/PythonApplication.cpp +++ b/src/UserInterface/PythonApplication.cpp @@ -939,6 +939,13 @@ unsigned __GetWindowMode(bool windowed) bool CPythonApplication::Create(PyObject * poSelf, const char * c_szName, int width, int height, int Windowed) { + // Initialize Game Thread Pool first - required by other systems + CGameThreadPool* pThreadPool = CGameThreadPool::InstancePtr(); + if (pThreadPool) + { + pThreadPool->Initialize(); + } + NANOBEGIN Windowed = CPythonSystem::Instance().IsWindowed() ? 1 : 0; @@ -1250,6 +1257,9 @@ void CPythonApplication::Destroy() m_kEftMgr.Destroy(); m_LightManager.Destroy(); + // Game Thread Pool + CGameThreadPool::Instance().Destroy(); + // DEFAULT_FONT DefaultFont_Cleanup(); // END_OF_DEFAULT_FONT diff --git a/src/UserInterface/PythonApplication.h b/src/UserInterface/PythonApplication.h index 169515e..3d01812 100644 --- a/src/UserInterface/PythonApplication.h +++ b/src/UserInterface/PythonApplication.h @@ -6,6 +6,7 @@ #include "eterLib/GrpDevice.h" #include "eterLib/NetDevice.h" #include "eterLib/GrpLightManager.h" +#include "eterLib/GameThreadPool.h" #include "EffectLib/EffectManager.h" #include "gamelib/RaceManager.h" #include "gamelib/ItemManager.h" diff --git a/src/UserInterface/UserInterface.cpp b/src/UserInterface/UserInterface.cpp index cafbdfe..9670490 100644 --- a/src/UserInterface/UserInterface.cpp +++ b/src/UserInterface/UserInterface.cpp @@ -10,6 +10,7 @@ #endif #include "eterLib/Util.h" +#include "EterLib/GameThreadPool.h" #include "EterBase/lzo.h" #include "PackLib/PackManager.h" @@ -180,14 +181,16 @@ bool PackInitialize(const char * c_pszFolder) const size_t packsPerThread = (packFiles.size() + numThreads - 1) / numThreads; std::vector threads; + threads.reserve(numThreads); // Pre-allocate to prevent reallocation std::atomic failedCount(0); + // Create all threads first (prevents vector reallocation during emplace_back) for (size_t t = 0; t < numThreads; ++t) { - threads.emplace_back([&, t]() { - size_t start = t * packsPerThread; - size_t end = std::min(start + packsPerThread, packFiles.size()); + size_t start = t * packsPerThread; + size_t end = std::min(start + packsPerThread, packFiles.size()); + threads.emplace_back([&failedCount, &packFiles, c_pszFolder, start, end]() { for (size_t i = start; i < end; ++i) { std::string packPath = std::format("{}/{}.pck", c_pszFolder, packFiles[i]); @@ -296,6 +299,9 @@ static bool Main(HINSTANCE hInstance, LPSTR lpCmdLine) return false; } + // Create game thread pool singleton before CPythonApplication + static CGameThreadPool gameThreadPool; + auto app = new CPythonApplication; app->Initialize (hInstance); CPythonLauncher pyLauncher;