add GameThreadPool

This commit is contained in:
savis
2026-01-05 17:20:12 +01:00
parent dbdfd57c41
commit 7718c65d7a
12 changed files with 575 additions and 285 deletions

View File

@@ -0,0 +1,158 @@
#include "StdAfx.h"
#include "GameThreadPool.h"
CGameThreadPool::CGameThreadPool()
: m_bShutdown(false)
, m_bInitialized(false)
, m_iNextWorkerIndex(0)
{
}
CGameThreadPool::~CGameThreadPool()
{
Destroy();
}
void CGameThreadPool::Initialize(int iWorkerCount)
{
if (m_bInitialized)
{
TraceError("CGameThreadPool::Initialize - Already initialized!");
return;
}
// Determine worker count
if (iWorkerCount <= 0)
{
iWorkerCount = static_cast<int>(std::thread::hardware_concurrency());
if (iWorkerCount <= 0)
iWorkerCount = 4; // Fallback to 4 workers
}
// Clamp worker count to reasonable range
iWorkerCount = std::max(2, std::min(16, iWorkerCount));
Tracef("CGameThreadPool::Initialize - Creating %d worker threads\n", iWorkerCount);
m_bShutdown.store(false, std::memory_order_release);
m_workers.reserve(iWorkerCount);
// Initialize each worker
for (int i = 0; i < iWorkerCount; ++i)
{
std::unique_ptr<TWorkerThread> pWorker(new TWorkerThread());
pWorker->pTaskQueue.reset(new SPSCQueue<TTask>(QUEUE_SIZE));
pWorker->bBusy.store(false, std::memory_order_relaxed);
pWorker->uTaskCount.store(0, std::memory_order_relaxed);
pWorker->thread = std::thread(&CGameThreadPool::WorkerThreadProc, this, i);
m_workers.push_back(std::move(pWorker));
}
m_bInitialized.store(true, std::memory_order_release);
}
void CGameThreadPool::Destroy()
{
if (!m_bInitialized)
return;
Tracef("CGameThreadPool::Destroy - Shutting down %d worker threads\n", GetWorkerCount());
m_bShutdown.store(true, std::memory_order_release);
m_bInitialized.store(false, std::memory_order_release);
// Join all worker threads
for (auto& pWorker : m_workers)
{
if (pWorker->thread.joinable())
pWorker->thread.join();
}
m_workers.clear();
}
void CGameThreadPool::WorkerThreadProc(int iWorkerIndex)
{
TWorkerThread* pWorker = m_workers[iWorkerIndex].get();
int iIdleCount = 0;
while (!m_bShutdown.load(std::memory_order_acquire))
{
TTask task;
if (pWorker->pTaskQueue->Pop(task))
{
pWorker->bBusy.store(true, std::memory_order_relaxed);
iIdleCount = 0;
// Execute the task
try
{
task();
}
catch (const std::exception& e)
{
TraceError("CGameThreadPool::WorkerThreadProc - Exception in worker %d: %s", iWorkerIndex, e.what());
}
catch (...)
{
TraceError("CGameThreadPool::WorkerThreadProc - Unknown exception in worker %d", iWorkerIndex);
}
pWorker->uTaskCount.fetch_sub(1, std::memory_order_relaxed);
pWorker->bBusy.store(false, std::memory_order_relaxed);
}
else
{
// No work available - idle strategy
++iIdleCount;
if (iIdleCount < 100)
{
// Spin briefly for immediate work
std::this_thread::yield();
}
else if (iIdleCount < 1000)
{
// Short sleep for moderate idle
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
else
{
// Longer sleep for extended idle
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
}
int CGameThreadPool::SelectLeastBusyWorker() const
{
if (m_workers.empty())
return 0;
// Simple load balancing: find worker with least pending tasks
int iBestWorker = 0;
uint32_t uMinTasks = m_workers[0]->uTaskCount.load(std::memory_order_relaxed);
for (size_t i = 1; i < m_workers.size(); ++i)
{
uint32_t uTasks = m_workers[i]->uTaskCount.load(std::memory_order_relaxed);
if (uTasks < uMinTasks)
{
uMinTasks = uTasks;
iBestWorker = static_cast<int>(i);
}
}
return iBestWorker;
}
size_t CGameThreadPool::GetPendingTaskCount() const
{
size_t uTotal = 0;
for (const auto& pWorker : m_workers)
{
uTotal += pWorker->uTaskCount.load(std::memory_order_relaxed);
}
return uTotal;
}

View File

@@ -0,0 +1,132 @@
#pragma once
#include "SPSCQueue.h"
#include "EterBase/Singleton.h"
#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <atomic>
#include <memory>
class CGameThreadPool : public CSingleton<CGameThreadPool>
{
public:
using TTask = std::function<void()>;
CGameThreadPool();
~CGameThreadPool();
// Initialize thread pool with specified worker count
// If count <= 0, uses hardware_concurrency
void Initialize(int iWorkerCount = -1);
// Shutdown and join all worker threads
void Destroy();
// Enqueue a task and get a future to track completion
template<typename TFunc>
std::future<void> Enqueue(TFunc&& func);
// Get number of active workers
int GetWorkerCount() const { return static_cast<int>(m_workers.size()); }
// Get approximate number of pending tasks across all queues
size_t GetPendingTaskCount() const;
// Check if pool is initialized
bool IsInitialized() const { return m_bInitialized; }
private:
struct TWorkerThread
{
std::thread thread;
std::unique_ptr<SPSCQueue<TTask>> pTaskQueue;
std::atomic<bool> bBusy;
std::atomic<uint32_t> uTaskCount;
TWorkerThread()
: bBusy(false)
, uTaskCount(0)
{
}
};
void WorkerThreadProc(int iWorkerIndex);
int SelectLeastBusyWorker() const;
std::vector<std::unique_ptr<TWorkerThread>> m_workers;
std::atomic<bool> m_bShutdown;
std::atomic<bool> m_bInitialized;
std::atomic<int> m_iNextWorkerIndex; // For round-robin distribution
static const size_t QUEUE_SIZE = 8192;
};
// Template implementation
template<typename TFunc>
std::future<void> CGameThreadPool::Enqueue(TFunc&& func)
{
if (!m_bInitialized)
{
// If not initialized, execute on calling thread
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
try
{
func();
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
return future;
}
// Create a promise to track task completion
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
// Wrap function in shared_ptr to avoid move issues with std::function
auto pFunc = std::make_shared<typename std::decay<TFunc>::type>(std::forward<TFunc>(func));
// Wrap the task with promise completion
TTask task = [promise, pFunc]()
{
try
{
(*pFunc)();
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
};
// Select worker with least load
int iWorkerIndex = SelectLeastBusyWorker();
TWorkerThread* pWorker = m_workers[iWorkerIndex].get();
// Try to enqueue the task
if (!pWorker->pTaskQueue->Push(std::move(task)))
{
// Queue is full, execute on calling thread as fallback
try
{
(*pFunc)();
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
}
else
{
pWorker->uTaskCount.fetch_add(1, std::memory_order_relaxed);
}
return future;
}

View File

@@ -252,6 +252,9 @@ void CResourceManager::RegisterResourceNewFunctionByTypePointer(int iType, CReso
CResource * CResourceManager::InsertResourcePointer(DWORD dwFileCRC, CResource* pResource)
{
// Thread-safe check and insert
std::lock_guard<std::mutex> lock(m_ResourceMapMutex);
TResourcePointerMap::iterator itor = m_pResMap.find(dwFileCRC);
if (m_pResMap.end() != itor)

View File

@@ -7,6 +7,7 @@
#include <set>
#include <map>
#include <string>
#include <mutex>
class CTextureCache;
@@ -76,6 +77,8 @@ class CResourceManager : public CSingleton<CResourceManager>
static CFileLoaderThread ms_loadingThread;
CFileLoaderThreadPool* m_pLoaderThreadPool;
CTextureCache* m_pTextureCache;
mutable std::mutex m_ResourceMapMutex; // Thread-safe resource map access
};
extern int g_iLoadingDelayTime;