From f0c4b0c8d7f02031761dd3de8b367bf3f5a8ef48 Mon Sep 17 00:00:00 2001 From: savis <106487343+savisxss@users.noreply.github.com> Date: Mon, 29 Dec 2025 01:14:31 +0100 Subject: [PATCH] modernize AsyncSQL with C++17 features --- src/game/guild.h | 2 + src/libsql/AsyncSQL.cpp | 592 ++++++++++++++++------------------------ src/libsql/AsyncSQL.h | 245 ++++++++++------- 3 files changed, 371 insertions(+), 468 deletions(-) diff --git a/src/game/guild.h b/src/game/guild.h index 6243d5b..024cab7 100644 --- a/src/game/guild.h +++ b/src/game/guild.h @@ -3,6 +3,8 @@ #include "skill.h" +// Forward declaration removed - SQLMsg is now defined in libsql/AsyncSQL.h +struct _SQLMsg; typedef struct _SQLMsg SQLMsg; enum diff --git a/src/libsql/AsyncSQL.cpp b/src/libsql/AsyncSQL.cpp index fc6134b..184703b 100644 --- a/src/libsql/AsyncSQL.cpp +++ b/src/libsql/AsyncSQL.cpp @@ -1,37 +1,19 @@ -#ifndef OS_WINDOWS +#ifndef OS_WINDOWS #include #endif #include #include +#include #include "AsyncSQL.h" -// TODO: Consider providing platform-independent mutex class. -#ifndef OS_WINDOWS -#define MUTEX_LOCK(mtx) pthread_mutex_lock(mtx) -#define MUTEX_UNLOCK(mtx) pthread_mutex_unlock(mtx) -#else -#define MUTEX_LOCK(mtx) ::EnterCriticalSection(mtx) -#define MUTEX_UNLOCK(mtx) ::LeaveCriticalSection(mtx) -#endif - CAsyncSQL::CAsyncSQL() : m_stHost(""), m_stUser(""), m_stPassword(""), m_stDB(""), m_stLocale(""), - m_iMsgCount(0), m_bEnd(false), -#ifndef OS_WINDOWS - m_hThread(0), -#else - m_hThread(INVALID_HANDLE_VALUE), -#endif - m_mtxQuery(nullptr), m_mtxResult(nullptr), - m_iQueryFinished(0), m_ulThreadID(0), m_bConnected(false), m_iCopiedQuery(0), - m_iPort(0) + m_iPort(0), m_thread(nullptr), m_bEnd(false), m_bConnected(false), + m_iMsgCount(0), m_iQueryFinished(0), m_iCopiedQuery(0), m_ulThreadID(0) { - memset( &m_hDB, 0, sizeof(m_hDB) ); - - m_aiPipe[0] = 0; - m_aiPipe[1] = 0; + memset(&m_hDB, 0, sizeof(m_hDB)); } CAsyncSQL::~CAsyncSQL() @@ -46,54 +28,19 @@ void CAsyncSQL::Destroy() { sys_log(0, "AsyncSQL: closing mysql connection."); mysql_close(&m_hDB); - m_hDB.host = NULL; + m_hDB.host = nullptr; } - - if (m_mtxQuery) - { -#ifndef OS_WINDOWS - pthread_mutex_destroy(m_mtxQuery.get()); -#else - ::DeleteCriticalSection(m_mtxQuery.get()); -#endif - m_mtxQuery.reset(); - } - - if (m_mtxResult) - { -#ifndef OS_WINDOWS - pthread_mutex_destroy(m_mtxResult.get()); -#else - ::DeleteCriticalSection(m_mtxResult.get()); -#endif - m_mtxResult.reset(); - } -} - -#ifndef OS_WINDOWS -void * AsyncSQLThread(void * arg) -#else -unsigned int __stdcall AsyncSQLThread(void* arg) -#endif -{ - CAsyncSQL * pSQL = ((CAsyncSQL *) arg); - - if (!pSQL->Connect()) - return NULL; - - pSQL->ChildLoop(); - return NULL; } bool CAsyncSQL::QueryLocaleSet() { - if (0 == m_stLocale.length()) + if (m_stLocale.empty()) { sys_err("m_stLocale == 0"); return true; } - else if (m_stLocale == "ascii") + if (m_stLocale == "ascii") { sys_err("m_stLocale == ascii"); return true; @@ -101,71 +48,64 @@ bool CAsyncSQL::QueryLocaleSet() if (mysql_set_character_set(&m_hDB, m_stLocale.c_str())) { - sys_err("cannot set locale %s by 'mysql_set_character_set', errno %u %s", m_stLocale.c_str(), mysql_errno(&m_hDB) , mysql_error(&m_hDB)); - return false; + sys_err("cannot set locale %s by 'mysql_set_character_set', errno %u %s", + m_stLocale.c_str(), mysql_errno(&m_hDB), mysql_error(&m_hDB)); + return false; } sys_log(0, "\t--mysql_set_character_set(%s)", m_stLocale.c_str()); - return true; } bool CAsyncSQL::Connect() { - if (0 == mysql_init(&m_hDB)) + if (mysql_init(&m_hDB) == nullptr) { fprintf(stderr, "mysql_init failed\n"); return false; } - //mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str()); if (!m_stLocale.empty()) { - //mysql_options(&m_hDB, MYSQL_SET_CHARSET_DIR , " /usr/local/share/mysql/charsets/"); - //mysql_options(&m_hDB, MYSQL_SET_CHARSET_DIR , "/usr/local/share/mysql/charsets"); - //mysql_options(&m_hDB, MYSQL_SET_CHARSET_DIR , "/usr/local/share/mysql"); if (mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str()) != 0) { fprintf(stderr, "mysql_option failed : MYSQL_SET_CHARSET_NAME %s ", mysql_error(&m_hDB)); } } - if (!mysql_real_connect(&m_hDB, m_stHost.c_str(), m_stUser.c_str(), m_stPassword.c_str(), m_stDB.c_str(), m_iPort, NULL, CLIENT_MULTI_STATEMENTS)) + if (!mysql_real_connect(&m_hDB, m_stHost.c_str(), m_stUser.c_str(), + m_stPassword.c_str(), m_stDB.c_str(), m_iPort, nullptr, CLIENT_MULTI_STATEMENTS)) { fprintf(stderr, "mysql_real_connect: %s\n", mysql_error(&m_hDB)); return false; } my_bool reconnect = true; - - if (0 != mysql_options(&m_hDB, MYSQL_OPT_RECONNECT, &reconnect)) + if (mysql_options(&m_hDB, MYSQL_OPT_RECONNECT, &reconnect) != 0) + { fprintf(stderr, "mysql_option: %s\n", mysql_error(&m_hDB)); + } fprintf(stdout, "AsyncSQL: connected to %s (reconnect %d)\n", m_stHost.c_str(), reconnect); - // db cache는 common db의 LOCALE 테이블에서 locale을 알아오고, 이후 character set을 수정한다. - // 따라서 최초 Connection을 맺을 때에는 locale을 모르기 때문에 character set을 정할 수가 없음에도 불구하고, - // 강제로 character set을 euckr로 정하도록 되어있어 이 부분을 주석처리 하였다. - // (아래 주석을 풀면 mysql에 euckr이 안 깔려있는 디비에 접근할 수가 없다.) - //while (!QueryLocaleSet()); - m_ulThreadID = mysql_thread_id(&m_hDB); - - m_bConnected = true; + m_ulThreadID.store(mysql_thread_id(&m_hDB), std::memory_order_release); + m_bConnected.store(true, std::memory_order_release); return true; } -bool CAsyncSQL::Setup(CAsyncSQL * sql, bool bNoThread) +bool CAsyncSQL::Setup(CAsyncSQL* sql, bool bNoThread) { return Setup(sql->m_stHost.c_str(), - sql->m_stUser.c_str(), - sql->m_stPassword.c_str(), - sql->m_stDB.c_str(), - sql->m_stLocale.c_str(), - bNoThread, - sql->m_iPort); + sql->m_stUser.c_str(), + sql->m_stPassword.c_str(), + sql->m_stDB.c_str(), + sql->m_stLocale.c_str(), + bNoThread, + sql->m_iPort); } -bool CAsyncSQL::Setup(const char * c_pszHost, const char * c_pszUser, const char * c_pszPassword, const char * c_pszDB, const char * c_pszLocale, bool bNoThread, int iPort) +bool CAsyncSQL::Setup(const char* c_pszHost, const char* c_pszUser, const char* c_pszPassword, + const char* c_pszDB, const char* c_pszLocale, bool bNoThread, int iPort) { m_stHost = c_pszHost; m_stUser = c_pszUser; @@ -181,91 +121,54 @@ bool CAsyncSQL::Setup(const char * c_pszHost, const char * c_pszUser, const char if (!bNoThread) { - /* - if (!mysql_thread_safe())// - { - fprintf(stderr, "FATAL ERROR!! mysql client library was not compiled with thread safety\n"); - return false; - } - */ -#ifndef OS_WINDOWS - m_mtxQuery = std::make_unique(); - m_mtxResult = std::make_unique(); - - if (0 != pthread_mutex_init(m_mtxQuery.get(), NULL)) - { - perror("pthread_mutex_init"); - exit(0); - } - - if (0 != pthread_mutex_init(m_mtxResult.get(), NULL)) - { - perror("pthread_mutex_init"); - exit(0); - } - - pthread_create(&m_hThread, NULL, AsyncSQLThread, this); -#else - m_mtxQuery = std::make_unique(); - m_mtxResult = std::make_unique(); - - ::InitializeCriticalSection(m_mtxQuery.get()); - ::InitializeCriticalSection(m_mtxResult.get()); - - m_hThread = (HANDLE)::_beginthreadex(NULL, 0, AsyncSQLThread, this, 0, NULL); - if (m_hThread == INVALID_HANDLE_VALUE) { - perror("CAsyncSQL::Setup"); - return false; - } -#endif + // Create worker thread using modern C++ thread + m_thread = std::make_unique([this]() { + if (!Connect()) + return; + ChildLoop(); + }); return true; } else + { return Connect(); + } } void CAsyncSQL::Quit() { - m_bEnd = true; - m_sem.Release(); + m_bEnd.store(true, std::memory_order_release); + m_cvQuery.notify_all(); -#ifndef OS_WINDOWS - if (m_hThread) + if (m_thread && m_thread->joinable()) { - pthread_join(m_hThread, NULL); - m_hThread = NULL; + m_thread->join(); + m_thread.reset(); } -#else - if (m_hThread != INVALID_HANDLE_VALUE) { - ::WaitForSingleObject(m_hThread, INFINITE); - m_hThread = INVALID_HANDLE_VALUE; - } -#endif } std::unique_ptr CAsyncSQL::DirectQuery(const char* c_pszQuery) { - if (m_ulThreadID != mysql_thread_id(&m_hDB)) + unsigned long currentThreadID = mysql_thread_id(&m_hDB); + if (m_ulThreadID.load(std::memory_order_acquire) != currentThreadID) { sys_err("MySQL connection was reconnected. querying locale set"); while (!QueryLocaleSet()); - m_ulThreadID = mysql_thread_id(&m_hDB); + m_ulThreadID.store(currentThreadID, std::memory_order_release); } auto p = std::make_unique(); - p->m_pkSQL = &m_hDB; - p->iID = ++m_iMsgCount; + p->iID = m_iMsgCount.fetch_add(1, std::memory_order_acq_rel) + 1; p->stQuery = c_pszQuery; if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) { char buf[1024]; - snprintf(buf, sizeof(buf), - "AsyncSQL::DirectQuery : mysql_query error: %s\nquery: %s", - mysql_error(&m_hDB), p->stQuery.c_str()); + "AsyncSQL::DirectQuery : mysql_query error: %s\nquery: %s", + mysql_error(&m_hDB), p->stQuery.c_str()); sys_err(buf); p->uiSQLErrno = mysql_errno(&m_hDB); @@ -275,359 +178,326 @@ std::unique_ptr CAsyncSQL::DirectQuery(const char* c_pszQuery) return p; } -void CAsyncSQL::AsyncQuery(const char * c_pszQuery) +void CAsyncSQL::AsyncQuery(const char* c_pszQuery) { - SQLMsg * p = new SQLMsg; - + auto p = std::make_unique(); p->m_pkSQL = &m_hDB; - p->iID = ++m_iMsgCount; + p->iID = m_iMsgCount.fetch_add(1, std::memory_order_acq_rel) + 1; p->stQuery = c_pszQuery; - PushQuery(p); + PushQuery(std::move(p)); } -void CAsyncSQL::ReturnQuery(const char * c_pszQuery, void * pvUserData) +void CAsyncSQL::ReturnQuery(const char* c_pszQuery, void* pvUserData) { - SQLMsg * p = new SQLMsg; - + auto p = std::make_unique(); p->m_pkSQL = &m_hDB; - p->iID = ++m_iMsgCount; + p->iID = m_iMsgCount.fetch_add(1, std::memory_order_acq_rel) + 1; p->stQuery = c_pszQuery; p->bReturn = true; p->pvUserData = pvUserData; - PushQuery(p); + PushQuery(std::move(p)); } -void CAsyncSQL::PushResult(SQLMsg * p) +void CAsyncSQL::PushResult(std::unique_ptr p) { - MUTEX_LOCK(m_mtxResult.get()); - - m_queue_result.push(p); - - MUTEX_UNLOCK(m_mtxResult.get()); + std::lock_guard lock(m_mtxResult); + m_queue_result.push(std::move(p)); } -bool CAsyncSQL::PopResult(SQLMsg ** pp) +bool CAsyncSQL::PopResult(std::unique_ptr& p) { - MUTEX_LOCK(m_mtxResult.get()); + std::lock_guard lock(m_mtxResult); if (m_queue_result.empty()) - { - MUTEX_UNLOCK(m_mtxResult.get()); return false; - } - *pp = m_queue_result.front(); + p = std::move(m_queue_result.front()); m_queue_result.pop(); - MUTEX_UNLOCK(m_mtxResult.get()); return true; } -void CAsyncSQL::PushQuery(SQLMsg * p) +// Legacy API for backward compatibility +bool CAsyncSQL::PopResult(SQLMsg** pp) { - MUTEX_LOCK(m_mtxQuery.get()); + std::lock_guard lock(m_mtxResult); - m_queue_query.push(p); - //m_map_kSQLMsgUnfinished.insert(std::make_pair(p->iID, p)); + if (m_queue_result.empty()) + return false; - m_sem.Release(); - - MUTEX_UNLOCK(m_mtxQuery.get()); + *pp = m_queue_result.front().release(); + m_queue_result.pop(); + return true; } -bool CAsyncSQL::PeekQuery(SQLMsg ** pp) +void CAsyncSQL::PushQuery(std::unique_ptr p) { - MUTEX_LOCK(m_mtxQuery.get()); + { + std::lock_guard lock(m_mtxQuery); + m_queue_query.push(std::move(p)); + } + m_cvQuery.notify_one(); +} + +bool CAsyncSQL::PeekQuery(SQLMsg** pp) +{ + std::lock_guard lock(m_mtxQuery); if (m_queue_query.empty()) - { - MUTEX_UNLOCK(m_mtxQuery.get()); return false; - } - *pp = m_queue_query.front(); - MUTEX_UNLOCK(m_mtxQuery.get()); + *pp = m_queue_query.front().get(); return true; } bool CAsyncSQL::PopQuery(int iID) { - MUTEX_LOCK(m_mtxQuery.get()); + std::lock_guard lock(m_mtxQuery); if (m_queue_query.empty()) - { - MUTEX_UNLOCK(m_mtxQuery.get()); return false; - } m_queue_query.pop(); - //m_map_kSQLMsgUnfinished.erase(iID); - - MUTEX_UNLOCK(m_mtxQuery.get()); return true; } -bool CAsyncSQL::PeekQueryFromCopyQueue(SQLMsg ** pp) +bool CAsyncSQL::PeekQueryFromCopyQueue(SQLMsg** pp) { if (m_queue_query_copy.empty()) return false; - *pp = m_queue_query_copy.front(); + *pp = m_queue_query_copy.front().get(); return true; } int CAsyncSQL::CopyQuery() { - MUTEX_LOCK(m_mtxQuery.get()); + std::lock_guard lock(m_mtxQuery); if (m_queue_query.empty()) - { - MUTEX_UNLOCK(m_mtxQuery.get()); return -1; - } while (!m_queue_query.empty()) { - SQLMsg * p = m_queue_query.front(); - m_queue_query_copy.push(p); + m_queue_query_copy.push(std::move(m_queue_query.front())); m_queue_query.pop(); } - //m_map_kSQLMsgUnfinished.erase(iID); - - int count = m_queue_query_copy.size(); - - MUTEX_UNLOCK(m_mtxQuery.get()); - return count; + return static_cast(m_queue_query_copy.size()); } bool CAsyncSQL::PopQueryFromCopyQueue() { if (m_queue_query_copy.empty()) - { return false; - } m_queue_query_copy.pop(); - //m_map_kSQLMsgUnfinished.erase(iID); - return true; } -int CAsyncSQL::GetCopiedQueryCount() + +int CAsyncSQL::GetCopiedQueryCount() const { - return m_iCopiedQuery; -} -void CAsyncSQL::ResetCopiedQueryCount() -{ - m_iCopiedQuery = 0; + return m_iCopiedQuery.load(std::memory_order_acquire); } -void CAsyncSQL::AddCopiedQueryCount(int iCopiedQuery) +void CAsyncSQL::ResetCopiedQueryCount() { - m_iCopiedQuery += iCopiedQuery; + m_iCopiedQuery.store(0, std::memory_order_release); } - +void CAsyncSQL::AddCopiedQueryCount(int iCopiedQuery) +{ + m_iCopiedQuery.fetch_add(iCopiedQuery, std::memory_order_acq_rel); +} DWORD CAsyncSQL::CountQuery() { - return m_queue_query.size(); + std::lock_guard lock(m_mtxQuery); + return static_cast(m_queue_query.size()); } DWORD CAsyncSQL::CountResult() { - return m_queue_result.size(); -} - -void __timediff(struct timeval *a, struct timeval *b, struct timeval *rslt) -{ - if (a->tv_sec < b->tv_sec) - rslt->tv_sec = rslt->tv_usec = 0; - else if (a->tv_sec == b->tv_sec) - { - if (a->tv_usec < b->tv_usec) - rslt->tv_sec = rslt->tv_usec = 0; - else - { - rslt->tv_sec = 0; - rslt->tv_usec = a->tv_usec - b->tv_usec; - } - } - else - { /* a->tv_sec > b->tv_sec */ - rslt->tv_sec = a->tv_sec - b->tv_sec; - - if (a->tv_usec < b->tv_usec) - { - rslt->tv_usec = a->tv_usec + 1000000 - b->tv_usec; - rslt->tv_sec--; - } else - rslt->tv_usec = a->tv_usec - b->tv_usec; - } + std::lock_guard lock(m_mtxResult); + return static_cast(m_queue_result.size()); } +// Modern profiler using chrono class cProfiler { public: - cProfiler() + cProfiler(int nInterval = 500000) + : m_nInterval(nInterval) { - m_nInterval = 0 ; - - memset( &prev, 0, sizeof(prev) ); - memset( &now, 0, sizeof(now) ); - memset( &interval, 0, sizeof(interval) ); - - Start(); - } - - cProfiler(int nInterval = 100000) - { - m_nInterval = nInterval; - - memset( &prev, 0, sizeof(prev) ); - memset( &now, 0, sizeof(now) ); - memset( &interval, 0, sizeof(interval) ); - - Start(); + Start(); } void Start() { - gettimeofday (&prev , (struct timezone *) 0); + m_start = std::chrono::steady_clock::now(); } void Stop() { - gettimeofday(&now, (struct timezone*) 0); - __timediff(&now, &prev, &interval); + m_end = std::chrono::steady_clock::now(); } - bool IsOk() - { - if (interval.tv_sec > (m_nInterval / 1000000)) - return false; - - if (interval.tv_usec > m_nInterval) - return false; - - return true; + bool IsOk() const + { + auto duration = std::chrono::duration_cast(m_end - m_start); + return duration.count() <= m_nInterval; } - struct timeval * GetResult() { return &interval; } - long GetResultSec() { return interval.tv_sec; } - long GetResultUSec() { return interval.tv_usec; } + long GetResultSec() const + { + auto duration = std::chrono::duration_cast(m_end - m_start); + return static_cast(duration.count()); + } + + long GetResultUSec() const + { + auto duration = std::chrono::duration_cast(m_end - m_start); + return static_cast(duration.count() % 1000000); + } private: int m_nInterval; - struct timeval prev; - struct timeval now; - struct timeval interval; + std::chrono::steady_clock::time_point m_start; + std::chrono::steady_clock::time_point m_end; }; void CAsyncSQL::ChildLoop() { - cProfiler profiler(500000); // 0.5초 + cProfiler profiler(500000); // 0.5 seconds - while (!m_bEnd) + while (!m_bEnd.load(std::memory_order_acquire)) { - m_sem.Wait(); + // Wait for queries using condition variable + std::unique_lock lock(m_mtxQuery); + m_cvQuery.wait(lock, [this] { + return !m_queue_query.empty() || m_bEnd.load(std::memory_order_acquire); + }); + lock.unlock(); + + if (m_bEnd.load(std::memory_order_acquire) && m_queue_query.empty()) + break; int count = CopyQuery(); - if (count <= 0) continue; AddCopiedQueryCount(count); - SQLMsg * p; - while (count--) { - //시간 체크 시작 - profiler.Start(); - - if (!PeekQueryFromCopyQueue(&p)) + if (m_queue_query_copy.empty()) continue; - if (m_ulThreadID != mysql_thread_id(&m_hDB)) + // Peek first, don't pop yet (for retry logic) + SQLMsg* p = m_queue_query_copy.front().get(); + bool shouldRetry = false; + + profiler.Start(); + + // Check for reconnection + unsigned long currentThreadID = mysql_thread_id(&m_hDB); + if (m_ulThreadID.load(std::memory_order_acquire) != currentThreadID) { sys_err("MySQL connection was reconnected. querying locale set"); while (!QueryLocaleSet()); - m_ulThreadID = mysql_thread_id(&m_hDB); + m_ulThreadID.store(currentThreadID, std::memory_order_release); } if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) { p->uiSQLErrno = mysql_errno(&m_hDB); - sys_err("AsyncSQL: query failed: %s (query: %s errno: %d)", - mysql_error(&m_hDB), p->stQuery.c_str(), p->uiSQLErrno); + sys_err("AsyncSQL: query failed: %s (query: %s errno: %d)", + mysql_error(&m_hDB), p->stQuery.c_str(), p->uiSQLErrno); + // Retry on connection errors switch (p->uiSQLErrno) { - case CR_SOCKET_CREATE_ERROR: - case CR_CONNECTION_ERROR: - case CR_IPSOCK_ERROR: - case CR_UNKNOWN_HOST: - case CR_SERVER_GONE_ERROR: - case CR_CONN_HOST_ERROR: - case ER_NOT_KEYFILE: - case ER_CRASHED_ON_USAGE: - case ER_CANT_OPEN_FILE: - case ER_HOST_NOT_PRIVILEGED: - case ER_HOST_IS_BLOCKED: - case ER_PASSWORD_NOT_ALLOWED: - case ER_PASSWORD_NO_MATCH: - case ER_CANT_CREATE_THREAD: - case ER_INVALID_USE_OF_NULL: - m_sem.Release(); - sys_err("AsyncSQL: retrying"); - continue; + case CR_SOCKET_CREATE_ERROR: + case CR_CONNECTION_ERROR: + case CR_IPSOCK_ERROR: + case CR_UNKNOWN_HOST: + case CR_SERVER_GONE_ERROR: + case CR_CONN_HOST_ERROR: + case ER_NOT_KEYFILE: + case ER_CRASHED_ON_USAGE: + case ER_CANT_OPEN_FILE: + case ER_HOST_NOT_PRIVILEGED: + case ER_HOST_IS_BLOCKED: + case ER_PASSWORD_NOT_ALLOWED: + case ER_PASSWORD_NO_MATCH: + case ER_CANT_CREATE_THREAD: + case ER_INVALID_USE_OF_NULL: + sys_err("AsyncSQL: retrying"); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + shouldRetry = true; + break; } } profiler.Stop(); - - // 0.5초 이상 걸렸으면 로그에 남기기 - if (!profiler.IsOk()) - sys_log(0, "[QUERY : LONG INTERVAL(OverSec %ld.%ld)] : %s", - profiler.GetResultSec(), profiler.GetResultUSec(), p->stQuery.c_str()); - PopQueryFromCopyQueue(); + // If retry, don't pop - continue to next iteration + if (shouldRetry) + continue; + + // Log slow queries (> 0.5 seconds) + if (!profiler.IsOk()) + { + sys_log(0, "[QUERY : LONG INTERVAL(OverSec %ld.%ld)] : %s", + profiler.GetResultSec(), profiler.GetResultUSec(), p->stQuery.c_str()); + } + + // Now pop and move ownership + auto pMsg = std::move(m_queue_query_copy.front()); + m_queue_query_copy.pop(); if (p->bReturn) { p->Store(); - PushResult(p); + // Move ownership to result queue + PushResult(std::move(pMsg)); } - else - delete p; + // else: pMsg will be automatically deleted when it goes out of scope - ++m_iQueryFinished; + m_iQueryFinished.fetch_add(1, std::memory_order_acq_rel); } } - SQLMsg * p; - - while (PeekQuery(&p)) + // Process remaining queries during shutdown { - if (m_ulThreadID != mysql_thread_id(&m_hDB)) - { - sys_err("MySQL connection was reconnected. querying locale set"); - while (!QueryLocaleSet()); - m_ulThreadID = mysql_thread_id(&m_hDB); - } + std::lock_guard lock(m_mtxQuery); - if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) + while (!m_queue_query.empty()) { - p->uiSQLErrno = mysql_errno(&m_hDB); + auto pMsg = std::move(m_queue_query.front()); + SQLMsg* p = pMsg.get(); + m_queue_query.pop(); - sys_err("AsyncSQL::ChildLoop : mysql_query error: %s:\nquery: %s", + unsigned long currentThreadID = mysql_thread_id(&m_hDB); + if (m_ulThreadID.load(std::memory_order_acquire) != currentThreadID) + { + sys_err("MySQL connection was reconnected. querying locale set"); + while (!QueryLocaleSet()); + m_ulThreadID.store(currentThreadID, std::memory_order_release); + } + + if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) + { + p->uiSQLErrno = mysql_errno(&m_hDB); + + sys_err("AsyncSQL::ChildLoop : mysql_query error: %s:\nquery: %s", mysql_error(&m_hDB), p->stQuery.c_str()); - switch (p->uiSQLErrno) - { + // Retry on connection errors + switch (p->uiSQLErrno) + { case CR_SOCKET_CREATE_ERROR: case CR_CONNECTION_ERROR: case CR_IPSOCK_ERROR: @@ -644,60 +514,57 @@ void CAsyncSQL::ChildLoop() case ER_CANT_CREATE_THREAD: case ER_INVALID_USE_OF_NULL: continue; + } } + + sys_log(0, "QUERY_FLUSH: %s", p->stQuery.c_str()); + + if (p->bReturn) + { + p->Store(); + PushResult(std::move(pMsg)); + } + + m_iQueryFinished.fetch_add(1, std::memory_order_acq_rel); } - - sys_log(0, "QUERY_FLUSH: %s", p->stQuery.c_str()); - - PopQuery(p->iID); - - if (p->bReturn) - { - p->Store(); - PushResult(p); - } - else - delete p; - - ++m_iQueryFinished; } } -int CAsyncSQL::CountQueryFinished() +int CAsyncSQL::CountQueryFinished() const { - return m_iQueryFinished; + return m_iQueryFinished.load(std::memory_order_acquire); } void CAsyncSQL::ResetQueryFinished() { - m_iQueryFinished = 0; + m_iQueryFinished.store(0, std::memory_order_release); } -MYSQL * CAsyncSQL::GetSQLHandle() +MYSQL* CAsyncSQL::GetSQLHandle() { return &m_hDB; } -size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char *src, size_t srcSize) +size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char* src, size_t srcSize) { - if (0 == srcSize) + if (srcSize == 0) { memset(dst, 0, dstSize); return 0; } - if (0 == dstSize) + if (dstSize == 0) return 0; if (dstSize < srcSize * 2 + 1) { - // \0이 안붙어있을 때를 대비해서 256 바이트만 복사해서 로그로 출력 char tmp[256]; - size_t tmpLen = sizeof(tmp) > srcSize ? srcSize : sizeof(tmp); // 둘 중에 작은 크기 + size_t tmpLen = sizeof(tmp) > srcSize ? srcSize : sizeof(tmp); strlcpy(tmp, src, tmpLen); sys_err("FATAL ERROR!! not enough buffer size (dstSize %u srcSize %u src%s: %s)", - dstSize, srcSize, tmpLen != srcSize ? "(trimmed to 255 characters)" : "", tmp); + static_cast(dstSize), static_cast(srcSize), + tmpLen != srcSize ? "(trimmed to 255 characters)" : "", tmp); dst[0] = '\0'; return 0; @@ -706,9 +573,8 @@ size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char *src, size_ return mysql_real_escape_string(GetSQLHandle(), dst, src, srcSize); } -void CAsyncSQL2::SetLocale(const std::string & stLocale) +void CAsyncSQL2::SetLocale(const std::string& stLocale) { m_stLocale = stLocale; QueryLocaleSet(); } - diff --git a/src/libsql/AsyncSQL.h b/src/libsql/AsyncSQL.h index 7681329..8494097 100644 --- a/src/libsql/AsyncSQL.h +++ b/src/libsql/AsyncSQL.h @@ -1,4 +1,4 @@ -#ifndef __INC_METIN_II_ASYNCSQL_H__ +#ifndef __INC_METIN_II_ASYNCSQL_H__ #define __INC_METIN_II_ASYNCSQL_H__ #include "libthecore/stdafx.h" @@ -7,85 +7,131 @@ #include #include #include -#include #include +#include +#include +#include +#include #include #include #include -#include "Semaphore.h" - #define QUERY_MAX_LEN 8192 -typedef struct _SQLResult +// Modern RAII wrapper for MySQL results +struct SQLResult { - _SQLResult() - : pSQLResult(NULL), uiNumRows(0), uiAffectedRows(0), uiInsertID(0) + SQLResult() noexcept + : pSQLResult(nullptr), uiNumRows(0), uiAffectedRows(0), uiInsertID(0) { } - ~_SQLResult() + ~SQLResult() { if (pSQLResult) { mysql_free_result(pSQLResult); - pSQLResult = NULL; + pSQLResult = nullptr; } } - MYSQL_RES * pSQLResult; - uint32_t uiNumRows; - uint32_t uiAffectedRows; - uint32_t uiInsertID; -} SQLResult; + // Delete copy constructor and assignment operator (non-copyable) + SQLResult(const SQLResult&) = delete; + SQLResult& operator=(const SQLResult&) = delete; + // Allow move semantics + SQLResult(SQLResult&& other) noexcept + : pSQLResult(other.pSQLResult), + uiNumRows(other.uiNumRows), + uiAffectedRows(other.uiAffectedRows), + uiInsertID(other.uiInsertID) + { + other.pSQLResult = nullptr; + other.uiNumRows = 0; + other.uiAffectedRows = 0; + other.uiInsertID = 0; + } + + SQLResult& operator=(SQLResult&& other) noexcept + { + if (this != &other) + { + if (pSQLResult) + { + mysql_free_result(pSQLResult); + } + + pSQLResult = other.pSQLResult; + uiNumRows = other.uiNumRows; + uiAffectedRows = other.uiAffectedRows; + uiInsertID = other.uiInsertID; + + other.pSQLResult = nullptr; + other.uiNumRows = 0; + other.uiAffectedRows = 0; + other.uiInsertID = 0; + } + return *this; + } + + MYSQL_RES* pSQLResult; + uint32_t uiNumRows; + uint32_t uiAffectedRows; + uint32_t uiInsertID; +}; + +// SQL Message with improved memory management typedef struct _SQLMsg { - _SQLMsg() : m_pkSQL(NULL), iID(0), uiResultPos(0), pvUserData(NULL), bReturn(false), uiSQLErrno(0) + _SQLMsg() noexcept + : m_pkSQL(nullptr), iID(0), uiResultPos(0), pvUserData(nullptr), + bReturn(false), uiSQLErrno(0) { } ~_SQLMsg() { - std::vector::iterator first = vec_pkResult.begin(); - std::vector::iterator past = vec_pkResult.end(); - - while (first != past) - delete *(first++); - vec_pkResult.clear(); } + // Delete copy operations + _SQLMsg(const _SQLMsg&) = delete; + _SQLMsg& operator=(const _SQLMsg&) = delete; + + // Allow move operations + _SQLMsg(_SQLMsg&&) noexcept = default; + _SQLMsg& operator=(_SQLMsg&&) noexcept = default; + void Store() { do { - SQLResult * pRes = new SQLResult; + auto pRes = std::make_unique(); pRes->pSQLResult = mysql_store_result(m_pkSQL); - pRes->uiInsertID = mysql_insert_id(m_pkSQL); - pRes->uiAffectedRows = mysql_affected_rows(m_pkSQL); + pRes->uiInsertID = static_cast(mysql_insert_id(m_pkSQL)); + pRes->uiAffectedRows = static_cast(mysql_affected_rows(m_pkSQL)); if (pRes->pSQLResult) { - pRes->uiNumRows = mysql_num_rows(pRes->pSQLResult); + pRes->uiNumRows = static_cast(mysql_num_rows(pRes->pSQLResult)); } else { pRes->uiNumRows = 0; } - vec_pkResult.push_back(pRes); - } while (!mysql_next_result(m_pkSQL)); + vec_pkResult.push_back(std::move(pRes)); + } while (mysql_next_result(m_pkSQL) == 0); } - SQLResult * Get() + SQLResult* Get() { if (uiResultPos >= vec_pkResult.size()) - return NULL; + return nullptr; - return vec_pkResult[uiResultPos]; + return vec_pkResult[uiResultPos].get(); } bool Next() @@ -97,17 +143,14 @@ typedef struct _SQLMsg return true; } - MYSQL * m_pkSQL; - int iID; - std::string stQuery; - - std::vector vec_pkResult; // result 벡터 - unsigned int uiResultPos; // 현재 result 위치 - - void * pvUserData; - bool bReturn; - - unsigned int uiSQLErrno; + MYSQL* m_pkSQL; + int iID; + std::string stQuery; + std::vector> vec_pkResult; + unsigned int uiResultPos; + void* pvUserData; + bool bReturn; + unsigned int uiSQLErrno; } SQLMsg; class CAsyncSQL @@ -116,97 +159,89 @@ class CAsyncSQL CAsyncSQL(); virtual ~CAsyncSQL(); - void Quit(); + void Quit(); - bool Setup(const char * c_pszHost, const char * c_pszUser, const char * c_pszPassword, const char * c_pszDB, const char * c_pszLocale, - bool bNoThread = false, int iPort = 0); - bool Setup(CAsyncSQL * sql, bool bNoThread = false); + bool Setup(const char* c_pszHost, const char* c_pszUser, const char* c_pszPassword, + const char* c_pszDB, const char* c_pszLocale, bool bNoThread = false, int iPort = 0); + bool Setup(CAsyncSQL* sql, bool bNoThread = false); - bool Connect(); - bool IsConnected() { return m_bConnected; } - bool QueryLocaleSet(); + bool Connect(); + bool IsConnected() const { return m_bConnected.load(std::memory_order_acquire); } + bool QueryLocaleSet(); - void AsyncQuery(const char * c_pszQuery); - void ReturnQuery(const char * c_pszQuery, void * pvUserData); - std::unique_ptr DirectQuery(const char* c_pszQuery); + void AsyncQuery(const char* c_pszQuery); + void ReturnQuery(const char* c_pszQuery, void* pvUserData); + std::unique_ptr DirectQuery(const char* c_pszQuery); - DWORD CountQuery(); - DWORD CountResult(); + DWORD CountQuery(); + DWORD CountResult(); - void PushResult(SQLMsg * p); - bool PopResult(SQLMsg ** pp); + void PushResult(std::unique_ptr p); + bool PopResult(std::unique_ptr& p); - void ChildLoop(); + // Legacy API compatibility - deprecated, use PopResult(unique_ptr&) instead + bool PopResult(SQLMsg** pp); - MYSQL * GetSQLHandle(); + void ChildLoop(); - int CountQueryFinished(); - void ResetQueryFinished(); + MYSQL* GetSQLHandle(); - size_t EscapeString(char* dst, size_t dstSize, const char *src, size_t srcSize); + int CountQueryFinished() const; + void ResetQueryFinished(); + + size_t EscapeString(char* dst, size_t dstSize, const char* src, size_t srcSize); protected: - void Destroy(); - - void PushQuery(SQLMsg * p); - - bool PeekQuery(SQLMsg ** pp); - bool PopQuery(int iID); - - bool PeekQueryFromCopyQueue(SQLMsg ** pp ); - INT CopyQuery(); - bool PopQueryFromCopyQueue(); + void Destroy(); + void PushQuery(std::unique_ptr p); + bool PeekQuery(SQLMsg** pp); + bool PopQuery(int iID); + bool PeekQueryFromCopyQueue(SQLMsg** pp); + int CopyQuery(); + bool PopQueryFromCopyQueue(); public: - int GetCopiedQueryCount(); - void ResetCopiedQueryCount(); - void AddCopiedQueryCount( int iCopiedQuery ); + int GetCopiedQueryCount() const; + void ResetCopiedQueryCount(); + void AddCopiedQueryCount(int iCopiedQuery); - //private: protected: + // MySQL connection MYSQL m_hDB; - std::string m_stHost; - std::string m_stUser; - std::string m_stPassword; - std::string m_stDB; - std::string m_stLocale; - - int m_iMsgCount; - int m_aiPipe[2]; + // Connection info + std::string m_stHost; + std::string m_stUser; + std::string m_stPassword; + std::string m_stDB; + std::string m_stLocale; int m_iPort; - std::queue m_queue_query; - std::queue m_queue_query_copy; - //std::map m_map_kSQLMsgUnfinished; + // Thread control + std::unique_ptr m_thread; + std::atomic m_bEnd; + std::atomic m_bConnected; - std::queue m_queue_result; + // Query queues with mutex protection + std::queue> m_queue_query; + std::queue> m_queue_query_copy; + std::queue> m_queue_result; - volatile bool m_bEnd; + std::mutex m_mtxQuery; + std::mutex m_mtxResult; + std::condition_variable m_cvQuery; -#ifndef OS_WINDOWS - pthread_t m_hThread; - std::unique_ptr m_mtxQuery; - std::unique_ptr m_mtxResult; -#else - HANDLE m_hThread; - std::unique_ptr m_mtxQuery; - std::unique_ptr m_mtxResult; -#endif - - CSemaphore m_sem; - - int m_iQueryFinished; - - unsigned long m_ulThreadID; - bool m_bConnected; - int m_iCopiedQuery; + // Counters + std::atomic m_iMsgCount; + std::atomic m_iQueryFinished; + std::atomic m_iCopiedQuery; + std::atomic m_ulThreadID; }; class CAsyncSQL2 : public CAsyncSQL { public: - void SetLocale ( const std::string & stLocale ); + void SetLocale(const std::string& stLocale); }; #endif