modernize AsyncSQL with C++17 features

This commit is contained in:
savis
2025-12-29 01:14:31 +01:00
parent c34a721328
commit f0c4b0c8d7
3 changed files with 371 additions and 468 deletions

View File

@@ -3,6 +3,8 @@
#include "skill.h" #include "skill.h"
// Forward declaration removed - SQLMsg is now defined in libsql/AsyncSQL.h
struct _SQLMsg;
typedef struct _SQLMsg SQLMsg; typedef struct _SQLMsg SQLMsg;
enum enum

View File

@@ -1,37 +1,19 @@
#ifndef OS_WINDOWS #ifndef OS_WINDOWS
#include <sys/time.h> #include <sys/time.h>
#endif #endif
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
#include <chrono>
#include "AsyncSQL.h" #include "AsyncSQL.h"
// TODO: Consider providing platform-independent mutex class.
#ifndef OS_WINDOWS
#define MUTEX_LOCK(mtx) pthread_mutex_lock(mtx)
#define MUTEX_UNLOCK(mtx) pthread_mutex_unlock(mtx)
#else
#define MUTEX_LOCK(mtx) ::EnterCriticalSection(mtx)
#define MUTEX_UNLOCK(mtx) ::LeaveCriticalSection(mtx)
#endif
CAsyncSQL::CAsyncSQL() CAsyncSQL::CAsyncSQL()
: m_stHost(""), m_stUser(""), m_stPassword(""), m_stDB(""), m_stLocale(""), : m_stHost(""), m_stUser(""), m_stPassword(""), m_stDB(""), m_stLocale(""),
m_iMsgCount(0), m_bEnd(false), m_iPort(0), m_thread(nullptr), m_bEnd(false), m_bConnected(false),
#ifndef OS_WINDOWS m_iMsgCount(0), m_iQueryFinished(0), m_iCopiedQuery(0), m_ulThreadID(0)
m_hThread(0),
#else
m_hThread(INVALID_HANDLE_VALUE),
#endif
m_mtxQuery(nullptr), m_mtxResult(nullptr),
m_iQueryFinished(0), m_ulThreadID(0), m_bConnected(false), m_iCopiedQuery(0),
m_iPort(0)
{ {
memset( &m_hDB, 0, sizeof(m_hDB) ); memset(&m_hDB, 0, sizeof(m_hDB));
m_aiPipe[0] = 0;
m_aiPipe[1] = 0;
} }
CAsyncSQL::~CAsyncSQL() CAsyncSQL::~CAsyncSQL()
@@ -46,54 +28,19 @@ void CAsyncSQL::Destroy()
{ {
sys_log(0, "AsyncSQL: closing mysql connection."); sys_log(0, "AsyncSQL: closing mysql connection.");
mysql_close(&m_hDB); mysql_close(&m_hDB);
m_hDB.host = NULL; m_hDB.host = nullptr;
} }
if (m_mtxQuery)
{
#ifndef OS_WINDOWS
pthread_mutex_destroy(m_mtxQuery.get());
#else
::DeleteCriticalSection(m_mtxQuery.get());
#endif
m_mtxQuery.reset();
}
if (m_mtxResult)
{
#ifndef OS_WINDOWS
pthread_mutex_destroy(m_mtxResult.get());
#else
::DeleteCriticalSection(m_mtxResult.get());
#endif
m_mtxResult.reset();
}
}
#ifndef OS_WINDOWS
void * AsyncSQLThread(void * arg)
#else
unsigned int __stdcall AsyncSQLThread(void* arg)
#endif
{
CAsyncSQL * pSQL = ((CAsyncSQL *) arg);
if (!pSQL->Connect())
return NULL;
pSQL->ChildLoop();
return NULL;
} }
bool CAsyncSQL::QueryLocaleSet() bool CAsyncSQL::QueryLocaleSet()
{ {
if (0 == m_stLocale.length()) if (m_stLocale.empty())
{ {
sys_err("m_stLocale == 0"); sys_err("m_stLocale == 0");
return true; return true;
} }
else if (m_stLocale == "ascii") if (m_stLocale == "ascii")
{ {
sys_err("m_stLocale == ascii"); sys_err("m_stLocale == ascii");
return true; return true;
@@ -101,71 +48,64 @@ bool CAsyncSQL::QueryLocaleSet()
if (mysql_set_character_set(&m_hDB, m_stLocale.c_str())) if (mysql_set_character_set(&m_hDB, m_stLocale.c_str()))
{ {
sys_err("cannot set locale %s by 'mysql_set_character_set', errno %u %s", m_stLocale.c_str(), mysql_errno(&m_hDB) , mysql_error(&m_hDB)); sys_err("cannot set locale %s by 'mysql_set_character_set', errno %u %s",
return false; m_stLocale.c_str(), mysql_errno(&m_hDB), mysql_error(&m_hDB));
return false;
} }
sys_log(0, "\t--mysql_set_character_set(%s)", m_stLocale.c_str()); sys_log(0, "\t--mysql_set_character_set(%s)", m_stLocale.c_str());
return true; return true;
} }
bool CAsyncSQL::Connect() bool CAsyncSQL::Connect()
{ {
if (0 == mysql_init(&m_hDB)) if (mysql_init(&m_hDB) == nullptr)
{ {
fprintf(stderr, "mysql_init failed\n"); fprintf(stderr, "mysql_init failed\n");
return false; return false;
} }
//mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str());
if (!m_stLocale.empty()) if (!m_stLocale.empty())
{ {
//mysql_options(&m_hDB, MYSQL_SET_CHARSET_DIR , " /usr/local/share/mysql/charsets/");
//mysql_options(&m_hDB, MYSQL_SET_CHARSET_DIR , "/usr/local/share/mysql/charsets");
//mysql_options(&m_hDB, MYSQL_SET_CHARSET_DIR , "/usr/local/share/mysql");
if (mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str()) != 0) if (mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str()) != 0)
{ {
fprintf(stderr, "mysql_option failed : MYSQL_SET_CHARSET_NAME %s ", mysql_error(&m_hDB)); fprintf(stderr, "mysql_option failed : MYSQL_SET_CHARSET_NAME %s ", mysql_error(&m_hDB));
} }
} }
if (!mysql_real_connect(&m_hDB, m_stHost.c_str(), m_stUser.c_str(), m_stPassword.c_str(), m_stDB.c_str(), m_iPort, NULL, CLIENT_MULTI_STATEMENTS)) if (!mysql_real_connect(&m_hDB, m_stHost.c_str(), m_stUser.c_str(),
m_stPassword.c_str(), m_stDB.c_str(), m_iPort, nullptr, CLIENT_MULTI_STATEMENTS))
{ {
fprintf(stderr, "mysql_real_connect: %s\n", mysql_error(&m_hDB)); fprintf(stderr, "mysql_real_connect: %s\n", mysql_error(&m_hDB));
return false; return false;
} }
my_bool reconnect = true; my_bool reconnect = true;
if (mysql_options(&m_hDB, MYSQL_OPT_RECONNECT, &reconnect) != 0)
if (0 != mysql_options(&m_hDB, MYSQL_OPT_RECONNECT, &reconnect)) {
fprintf(stderr, "mysql_option: %s\n", mysql_error(&m_hDB)); fprintf(stderr, "mysql_option: %s\n", mysql_error(&m_hDB));
}
fprintf(stdout, "AsyncSQL: connected to %s (reconnect %d)\n", m_stHost.c_str(), reconnect); fprintf(stdout, "AsyncSQL: connected to %s (reconnect %d)\n", m_stHost.c_str(), reconnect);
// db cache는 common db의 LOCALE 테이블에서 locale을 알아오고, 이후 character set을 수정한다. m_ulThreadID.store(mysql_thread_id(&m_hDB), std::memory_order_release);
// 따라서 최초 Connection을 맺을 때에는 locale을 모르기 때문에 character set을 정할 수가 없음에도 불구하고, m_bConnected.store(true, std::memory_order_release);
// 강제로 character set을 euckr로 정하도록 되어있어 이 부분을 주석처리 하였다.
// (아래 주석을 풀면 mysql에 euckr이 안 깔려있는 디비에 접근할 수가 없다.)
//while (!QueryLocaleSet());
m_ulThreadID = mysql_thread_id(&m_hDB);
m_bConnected = true;
return true; return true;
} }
bool CAsyncSQL::Setup(CAsyncSQL * sql, bool bNoThread) bool CAsyncSQL::Setup(CAsyncSQL* sql, bool bNoThread)
{ {
return Setup(sql->m_stHost.c_str(), return Setup(sql->m_stHost.c_str(),
sql->m_stUser.c_str(), sql->m_stUser.c_str(),
sql->m_stPassword.c_str(), sql->m_stPassword.c_str(),
sql->m_stDB.c_str(), sql->m_stDB.c_str(),
sql->m_stLocale.c_str(), sql->m_stLocale.c_str(),
bNoThread, bNoThread,
sql->m_iPort); sql->m_iPort);
} }
bool CAsyncSQL::Setup(const char * c_pszHost, const char * c_pszUser, const char * c_pszPassword, const char * c_pszDB, const char * c_pszLocale, bool bNoThread, int iPort) bool CAsyncSQL::Setup(const char* c_pszHost, const char* c_pszUser, const char* c_pszPassword,
const char* c_pszDB, const char* c_pszLocale, bool bNoThread, int iPort)
{ {
m_stHost = c_pszHost; m_stHost = c_pszHost;
m_stUser = c_pszUser; m_stUser = c_pszUser;
@@ -181,91 +121,54 @@ bool CAsyncSQL::Setup(const char * c_pszHost, const char * c_pszUser, const char
if (!bNoThread) if (!bNoThread)
{ {
/* // Create worker thread using modern C++ thread
if (!mysql_thread_safe())// m_thread = std::make_unique<std::thread>([this]() {
{ if (!Connect())
fprintf(stderr, "FATAL ERROR!! mysql client library was not compiled with thread safety\n"); return;
return false; ChildLoop();
} });
*/
#ifndef OS_WINDOWS
m_mtxQuery = std::make_unique<pthread_mutex_t>();
m_mtxResult = std::make_unique<pthread_mutex_t>();
if (0 != pthread_mutex_init(m_mtxQuery.get(), NULL))
{
perror("pthread_mutex_init");
exit(0);
}
if (0 != pthread_mutex_init(m_mtxResult.get(), NULL))
{
perror("pthread_mutex_init");
exit(0);
}
pthread_create(&m_hThread, NULL, AsyncSQLThread, this);
#else
m_mtxQuery = std::make_unique<CRITICAL_SECTION>();
m_mtxResult = std::make_unique<CRITICAL_SECTION>();
::InitializeCriticalSection(m_mtxQuery.get());
::InitializeCriticalSection(m_mtxResult.get());
m_hThread = (HANDLE)::_beginthreadex(NULL, 0, AsyncSQLThread, this, 0, NULL);
if (m_hThread == INVALID_HANDLE_VALUE) {
perror("CAsyncSQL::Setup");
return false;
}
#endif
return true; return true;
} }
else else
{
return Connect(); return Connect();
}
} }
void CAsyncSQL::Quit() void CAsyncSQL::Quit()
{ {
m_bEnd = true; m_bEnd.store(true, std::memory_order_release);
m_sem.Release(); m_cvQuery.notify_all();
#ifndef OS_WINDOWS if (m_thread && m_thread->joinable())
if (m_hThread)
{ {
pthread_join(m_hThread, NULL); m_thread->join();
m_hThread = NULL; m_thread.reset();
} }
#else
if (m_hThread != INVALID_HANDLE_VALUE) {
::WaitForSingleObject(m_hThread, INFINITE);
m_hThread = INVALID_HANDLE_VALUE;
}
#endif
} }
std::unique_ptr<SQLMsg> CAsyncSQL::DirectQuery(const char* c_pszQuery) std::unique_ptr<SQLMsg> CAsyncSQL::DirectQuery(const char* c_pszQuery)
{ {
if (m_ulThreadID != mysql_thread_id(&m_hDB)) unsigned long currentThreadID = mysql_thread_id(&m_hDB);
if (m_ulThreadID.load(std::memory_order_acquire) != currentThreadID)
{ {
sys_err("MySQL connection was reconnected. querying locale set"); sys_err("MySQL connection was reconnected. querying locale set");
while (!QueryLocaleSet()); while (!QueryLocaleSet());
m_ulThreadID = mysql_thread_id(&m_hDB); m_ulThreadID.store(currentThreadID, std::memory_order_release);
} }
auto p = std::make_unique<SQLMsg>(); auto p = std::make_unique<SQLMsg>();
p->m_pkSQL = &m_hDB; p->m_pkSQL = &m_hDB;
p->iID = ++m_iMsgCount; p->iID = m_iMsgCount.fetch_add(1, std::memory_order_acq_rel) + 1;
p->stQuery = c_pszQuery; p->stQuery = c_pszQuery;
if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
{ {
char buf[1024]; char buf[1024];
snprintf(buf, sizeof(buf), snprintf(buf, sizeof(buf),
"AsyncSQL::DirectQuery : mysql_query error: %s\nquery: %s", "AsyncSQL::DirectQuery : mysql_query error: %s\nquery: %s",
mysql_error(&m_hDB), p->stQuery.c_str()); mysql_error(&m_hDB), p->stQuery.c_str());
sys_err(buf); sys_err(buf);
p->uiSQLErrno = mysql_errno(&m_hDB); p->uiSQLErrno = mysql_errno(&m_hDB);
@@ -275,359 +178,326 @@ std::unique_ptr<SQLMsg> CAsyncSQL::DirectQuery(const char* c_pszQuery)
return p; return p;
} }
void CAsyncSQL::AsyncQuery(const char * c_pszQuery) void CAsyncSQL::AsyncQuery(const char* c_pszQuery)
{ {
SQLMsg * p = new SQLMsg; auto p = std::make_unique<SQLMsg>();
p->m_pkSQL = &m_hDB; p->m_pkSQL = &m_hDB;
p->iID = ++m_iMsgCount; p->iID = m_iMsgCount.fetch_add(1, std::memory_order_acq_rel) + 1;
p->stQuery = c_pszQuery; p->stQuery = c_pszQuery;
PushQuery(p); PushQuery(std::move(p));
} }
void CAsyncSQL::ReturnQuery(const char * c_pszQuery, void * pvUserData) void CAsyncSQL::ReturnQuery(const char* c_pszQuery, void* pvUserData)
{ {
SQLMsg * p = new SQLMsg; auto p = std::make_unique<SQLMsg>();
p->m_pkSQL = &m_hDB; p->m_pkSQL = &m_hDB;
p->iID = ++m_iMsgCount; p->iID = m_iMsgCount.fetch_add(1, std::memory_order_acq_rel) + 1;
p->stQuery = c_pszQuery; p->stQuery = c_pszQuery;
p->bReturn = true; p->bReturn = true;
p->pvUserData = pvUserData; p->pvUserData = pvUserData;
PushQuery(p); PushQuery(std::move(p));
} }
void CAsyncSQL::PushResult(SQLMsg * p) void CAsyncSQL::PushResult(std::unique_ptr<SQLMsg> p)
{ {
MUTEX_LOCK(m_mtxResult.get()); std::lock_guard<std::mutex> lock(m_mtxResult);
m_queue_result.push(std::move(p));
m_queue_result.push(p);
MUTEX_UNLOCK(m_mtxResult.get());
} }
bool CAsyncSQL::PopResult(SQLMsg ** pp) bool CAsyncSQL::PopResult(std::unique_ptr<SQLMsg>& p)
{ {
MUTEX_LOCK(m_mtxResult.get()); std::lock_guard<std::mutex> lock(m_mtxResult);
if (m_queue_result.empty()) if (m_queue_result.empty())
{
MUTEX_UNLOCK(m_mtxResult.get());
return false; return false;
}
*pp = m_queue_result.front(); p = std::move(m_queue_result.front());
m_queue_result.pop(); m_queue_result.pop();
MUTEX_UNLOCK(m_mtxResult.get());
return true; return true;
} }
void CAsyncSQL::PushQuery(SQLMsg * p) // Legacy API for backward compatibility
bool CAsyncSQL::PopResult(SQLMsg** pp)
{ {
MUTEX_LOCK(m_mtxQuery.get()); std::lock_guard<std::mutex> lock(m_mtxResult);
m_queue_query.push(p); if (m_queue_result.empty())
//m_map_kSQLMsgUnfinished.insert(std::make_pair(p->iID, p)); return false;
m_sem.Release(); *pp = m_queue_result.front().release();
m_queue_result.pop();
MUTEX_UNLOCK(m_mtxQuery.get()); return true;
} }
bool CAsyncSQL::PeekQuery(SQLMsg ** pp) void CAsyncSQL::PushQuery(std::unique_ptr<SQLMsg> p)
{ {
MUTEX_LOCK(m_mtxQuery.get()); {
std::lock_guard<std::mutex> lock(m_mtxQuery);
m_queue_query.push(std::move(p));
}
m_cvQuery.notify_one();
}
bool CAsyncSQL::PeekQuery(SQLMsg** pp)
{
std::lock_guard<std::mutex> lock(m_mtxQuery);
if (m_queue_query.empty()) if (m_queue_query.empty())
{
MUTEX_UNLOCK(m_mtxQuery.get());
return false; return false;
}
*pp = m_queue_query.front(); *pp = m_queue_query.front().get();
MUTEX_UNLOCK(m_mtxQuery.get());
return true; return true;
} }
bool CAsyncSQL::PopQuery(int iID) bool CAsyncSQL::PopQuery(int iID)
{ {
MUTEX_LOCK(m_mtxQuery.get()); std::lock_guard<std::mutex> lock(m_mtxQuery);
if (m_queue_query.empty()) if (m_queue_query.empty())
{
MUTEX_UNLOCK(m_mtxQuery.get());
return false; return false;
}
m_queue_query.pop(); m_queue_query.pop();
//m_map_kSQLMsgUnfinished.erase(iID);
MUTEX_UNLOCK(m_mtxQuery.get());
return true; return true;
} }
bool CAsyncSQL::PeekQueryFromCopyQueue(SQLMsg ** pp) bool CAsyncSQL::PeekQueryFromCopyQueue(SQLMsg** pp)
{ {
if (m_queue_query_copy.empty()) if (m_queue_query_copy.empty())
return false; return false;
*pp = m_queue_query_copy.front(); *pp = m_queue_query_copy.front().get();
return true; return true;
} }
int CAsyncSQL::CopyQuery() int CAsyncSQL::CopyQuery()
{ {
MUTEX_LOCK(m_mtxQuery.get()); std::lock_guard<std::mutex> lock(m_mtxQuery);
if (m_queue_query.empty()) if (m_queue_query.empty())
{
MUTEX_UNLOCK(m_mtxQuery.get());
return -1; return -1;
}
while (!m_queue_query.empty()) while (!m_queue_query.empty())
{ {
SQLMsg * p = m_queue_query.front(); m_queue_query_copy.push(std::move(m_queue_query.front()));
m_queue_query_copy.push(p);
m_queue_query.pop(); m_queue_query.pop();
} }
//m_map_kSQLMsgUnfinished.erase(iID); return static_cast<int>(m_queue_query_copy.size());
int count = m_queue_query_copy.size();
MUTEX_UNLOCK(m_mtxQuery.get());
return count;
} }
bool CAsyncSQL::PopQueryFromCopyQueue() bool CAsyncSQL::PopQueryFromCopyQueue()
{ {
if (m_queue_query_copy.empty()) if (m_queue_query_copy.empty())
{
return false; return false;
}
m_queue_query_copy.pop(); m_queue_query_copy.pop();
//m_map_kSQLMsgUnfinished.erase(iID);
return true; return true;
} }
int CAsyncSQL::GetCopiedQueryCount()
int CAsyncSQL::GetCopiedQueryCount() const
{ {
return m_iCopiedQuery; return m_iCopiedQuery.load(std::memory_order_acquire);
}
void CAsyncSQL::ResetCopiedQueryCount()
{
m_iCopiedQuery = 0;
} }
void CAsyncSQL::AddCopiedQueryCount(int iCopiedQuery) void CAsyncSQL::ResetCopiedQueryCount()
{ {
m_iCopiedQuery += iCopiedQuery; m_iCopiedQuery.store(0, std::memory_order_release);
} }
void CAsyncSQL::AddCopiedQueryCount(int iCopiedQuery)
{
m_iCopiedQuery.fetch_add(iCopiedQuery, std::memory_order_acq_rel);
}
DWORD CAsyncSQL::CountQuery() DWORD CAsyncSQL::CountQuery()
{ {
return m_queue_query.size(); std::lock_guard<std::mutex> lock(m_mtxQuery);
return static_cast<DWORD>(m_queue_query.size());
} }
DWORD CAsyncSQL::CountResult() DWORD CAsyncSQL::CountResult()
{ {
return m_queue_result.size(); std::lock_guard<std::mutex> lock(m_mtxResult);
} return static_cast<DWORD>(m_queue_result.size());
void __timediff(struct timeval *a, struct timeval *b, struct timeval *rslt)
{
if (a->tv_sec < b->tv_sec)
rslt->tv_sec = rslt->tv_usec = 0;
else if (a->tv_sec == b->tv_sec)
{
if (a->tv_usec < b->tv_usec)
rslt->tv_sec = rslt->tv_usec = 0;
else
{
rslt->tv_sec = 0;
rslt->tv_usec = a->tv_usec - b->tv_usec;
}
}
else
{ /* a->tv_sec > b->tv_sec */
rslt->tv_sec = a->tv_sec - b->tv_sec;
if (a->tv_usec < b->tv_usec)
{
rslt->tv_usec = a->tv_usec + 1000000 - b->tv_usec;
rslt->tv_sec--;
} else
rslt->tv_usec = a->tv_usec - b->tv_usec;
}
} }
// Modern profiler using chrono
class cProfiler class cProfiler
{ {
public: public:
cProfiler() cProfiler(int nInterval = 500000)
: m_nInterval(nInterval)
{ {
m_nInterval = 0 ; Start();
memset( &prev, 0, sizeof(prev) );
memset( &now, 0, sizeof(now) );
memset( &interval, 0, sizeof(interval) );
Start();
}
cProfiler(int nInterval = 100000)
{
m_nInterval = nInterval;
memset( &prev, 0, sizeof(prev) );
memset( &now, 0, sizeof(now) );
memset( &interval, 0, sizeof(interval) );
Start();
} }
void Start() void Start()
{ {
gettimeofday (&prev , (struct timezone *) 0); m_start = std::chrono::steady_clock::now();
} }
void Stop() void Stop()
{ {
gettimeofday(&now, (struct timezone*) 0); m_end = std::chrono::steady_clock::now();
__timediff(&now, &prev, &interval);
} }
bool IsOk() bool IsOk() const
{ {
if (interval.tv_sec > (m_nInterval / 1000000)) auto duration = std::chrono::duration_cast<std::chrono::microseconds>(m_end - m_start);
return false; return duration.count() <= m_nInterval;
if (interval.tv_usec > m_nInterval)
return false;
return true;
} }
struct timeval * GetResult() { return &interval; } long GetResultSec() const
long GetResultSec() { return interval.tv_sec; } {
long GetResultUSec() { return interval.tv_usec; } auto duration = std::chrono::duration_cast<std::chrono::seconds>(m_end - m_start);
return static_cast<long>(duration.count());
}
long GetResultUSec() const
{
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(m_end - m_start);
return static_cast<long>(duration.count() % 1000000);
}
private: private:
int m_nInterval; int m_nInterval;
struct timeval prev; std::chrono::steady_clock::time_point m_start;
struct timeval now; std::chrono::steady_clock::time_point m_end;
struct timeval interval;
}; };
void CAsyncSQL::ChildLoop() void CAsyncSQL::ChildLoop()
{ {
cProfiler profiler(500000); // 0.5 cProfiler profiler(500000); // 0.5 seconds
while (!m_bEnd) while (!m_bEnd.load(std::memory_order_acquire))
{ {
m_sem.Wait(); // Wait for queries using condition variable
std::unique_lock<std::mutex> lock(m_mtxQuery);
m_cvQuery.wait(lock, [this] {
return !m_queue_query.empty() || m_bEnd.load(std::memory_order_acquire);
});
lock.unlock();
if (m_bEnd.load(std::memory_order_acquire) && m_queue_query.empty())
break;
int count = CopyQuery(); int count = CopyQuery();
if (count <= 0) if (count <= 0)
continue; continue;
AddCopiedQueryCount(count); AddCopiedQueryCount(count);
SQLMsg * p;
while (count--) while (count--)
{ {
//시간 체크 시작 if (m_queue_query_copy.empty())
profiler.Start();
if (!PeekQueryFromCopyQueue(&p))
continue; continue;
if (m_ulThreadID != mysql_thread_id(&m_hDB)) // Peek first, don't pop yet (for retry logic)
SQLMsg* p = m_queue_query_copy.front().get();
bool shouldRetry = false;
profiler.Start();
// Check for reconnection
unsigned long currentThreadID = mysql_thread_id(&m_hDB);
if (m_ulThreadID.load(std::memory_order_acquire) != currentThreadID)
{ {
sys_err("MySQL connection was reconnected. querying locale set"); sys_err("MySQL connection was reconnected. querying locale set");
while (!QueryLocaleSet()); while (!QueryLocaleSet());
m_ulThreadID = mysql_thread_id(&m_hDB); m_ulThreadID.store(currentThreadID, std::memory_order_release);
} }
if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
{ {
p->uiSQLErrno = mysql_errno(&m_hDB); p->uiSQLErrno = mysql_errno(&m_hDB);
sys_err("AsyncSQL: query failed: %s (query: %s errno: %d)", sys_err("AsyncSQL: query failed: %s (query: %s errno: %d)",
mysql_error(&m_hDB), p->stQuery.c_str(), p->uiSQLErrno); mysql_error(&m_hDB), p->stQuery.c_str(), p->uiSQLErrno);
// Retry on connection errors
switch (p->uiSQLErrno) switch (p->uiSQLErrno)
{ {
case CR_SOCKET_CREATE_ERROR: case CR_SOCKET_CREATE_ERROR:
case CR_CONNECTION_ERROR: case CR_CONNECTION_ERROR:
case CR_IPSOCK_ERROR: case CR_IPSOCK_ERROR:
case CR_UNKNOWN_HOST: case CR_UNKNOWN_HOST:
case CR_SERVER_GONE_ERROR: case CR_SERVER_GONE_ERROR:
case CR_CONN_HOST_ERROR: case CR_CONN_HOST_ERROR:
case ER_NOT_KEYFILE: case ER_NOT_KEYFILE:
case ER_CRASHED_ON_USAGE: case ER_CRASHED_ON_USAGE:
case ER_CANT_OPEN_FILE: case ER_CANT_OPEN_FILE:
case ER_HOST_NOT_PRIVILEGED: case ER_HOST_NOT_PRIVILEGED:
case ER_HOST_IS_BLOCKED: case ER_HOST_IS_BLOCKED:
case ER_PASSWORD_NOT_ALLOWED: case ER_PASSWORD_NOT_ALLOWED:
case ER_PASSWORD_NO_MATCH: case ER_PASSWORD_NO_MATCH:
case ER_CANT_CREATE_THREAD: case ER_CANT_CREATE_THREAD:
case ER_INVALID_USE_OF_NULL: case ER_INVALID_USE_OF_NULL:
m_sem.Release(); sys_err("AsyncSQL: retrying");
sys_err("AsyncSQL: retrying"); std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue; shouldRetry = true;
break;
} }
} }
profiler.Stop(); profiler.Stop();
// 0.5초 이상 걸렸으면 로그에 남기기
if (!profiler.IsOk())
sys_log(0, "[QUERY : LONG INTERVAL(OverSec %ld.%ld)] : %s",
profiler.GetResultSec(), profiler.GetResultUSec(), p->stQuery.c_str());
PopQueryFromCopyQueue(); // If retry, don't pop - continue to next iteration
if (shouldRetry)
continue;
// Log slow queries (> 0.5 seconds)
if (!profiler.IsOk())
{
sys_log(0, "[QUERY : LONG INTERVAL(OverSec %ld.%ld)] : %s",
profiler.GetResultSec(), profiler.GetResultUSec(), p->stQuery.c_str());
}
// Now pop and move ownership
auto pMsg = std::move(m_queue_query_copy.front());
m_queue_query_copy.pop();
if (p->bReturn) if (p->bReturn)
{ {
p->Store(); p->Store();
PushResult(p); // Move ownership to result queue
PushResult(std::move(pMsg));
} }
else // else: pMsg will be automatically deleted when it goes out of scope
delete p;
++m_iQueryFinished; m_iQueryFinished.fetch_add(1, std::memory_order_acq_rel);
} }
} }
SQLMsg * p; // Process remaining queries during shutdown
while (PeekQuery(&p))
{ {
if (m_ulThreadID != mysql_thread_id(&m_hDB)) std::lock_guard<std::mutex> lock(m_mtxQuery);
{
sys_err("MySQL connection was reconnected. querying locale set");
while (!QueryLocaleSet());
m_ulThreadID = mysql_thread_id(&m_hDB);
}
if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length())) while (!m_queue_query.empty())
{ {
p->uiSQLErrno = mysql_errno(&m_hDB); auto pMsg = std::move(m_queue_query.front());
SQLMsg* p = pMsg.get();
m_queue_query.pop();
sys_err("AsyncSQL::ChildLoop : mysql_query error: %s:\nquery: %s", unsigned long currentThreadID = mysql_thread_id(&m_hDB);
if (m_ulThreadID.load(std::memory_order_acquire) != currentThreadID)
{
sys_err("MySQL connection was reconnected. querying locale set");
while (!QueryLocaleSet());
m_ulThreadID.store(currentThreadID, std::memory_order_release);
}
if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
{
p->uiSQLErrno = mysql_errno(&m_hDB);
sys_err("AsyncSQL::ChildLoop : mysql_query error: %s:\nquery: %s",
mysql_error(&m_hDB), p->stQuery.c_str()); mysql_error(&m_hDB), p->stQuery.c_str());
switch (p->uiSQLErrno) // Retry on connection errors
{ switch (p->uiSQLErrno)
{
case CR_SOCKET_CREATE_ERROR: case CR_SOCKET_CREATE_ERROR:
case CR_CONNECTION_ERROR: case CR_CONNECTION_ERROR:
case CR_IPSOCK_ERROR: case CR_IPSOCK_ERROR:
@@ -644,60 +514,57 @@ void CAsyncSQL::ChildLoop()
case ER_CANT_CREATE_THREAD: case ER_CANT_CREATE_THREAD:
case ER_INVALID_USE_OF_NULL: case ER_INVALID_USE_OF_NULL:
continue; continue;
}
} }
sys_log(0, "QUERY_FLUSH: %s", p->stQuery.c_str());
if (p->bReturn)
{
p->Store();
PushResult(std::move(pMsg));
}
m_iQueryFinished.fetch_add(1, std::memory_order_acq_rel);
} }
sys_log(0, "QUERY_FLUSH: %s", p->stQuery.c_str());
PopQuery(p->iID);
if (p->bReturn)
{
p->Store();
PushResult(p);
}
else
delete p;
++m_iQueryFinished;
} }
} }
int CAsyncSQL::CountQueryFinished() int CAsyncSQL::CountQueryFinished() const
{ {
return m_iQueryFinished; return m_iQueryFinished.load(std::memory_order_acquire);
} }
void CAsyncSQL::ResetQueryFinished() void CAsyncSQL::ResetQueryFinished()
{ {
m_iQueryFinished = 0; m_iQueryFinished.store(0, std::memory_order_release);
} }
MYSQL * CAsyncSQL::GetSQLHandle() MYSQL* CAsyncSQL::GetSQLHandle()
{ {
return &m_hDB; return &m_hDB;
} }
size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char *src, size_t srcSize) size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char* src, size_t srcSize)
{ {
if (0 == srcSize) if (srcSize == 0)
{ {
memset(dst, 0, dstSize); memset(dst, 0, dstSize);
return 0; return 0;
} }
if (0 == dstSize) if (dstSize == 0)
return 0; return 0;
if (dstSize < srcSize * 2 + 1) if (dstSize < srcSize * 2 + 1)
{ {
// \0이 안붙어있을 때를 대비해서 256 바이트만 복사해서 로그로 출력
char tmp[256]; char tmp[256];
size_t tmpLen = sizeof(tmp) > srcSize ? srcSize : sizeof(tmp); // 둘 중에 작은 크기 size_t tmpLen = sizeof(tmp) > srcSize ? srcSize : sizeof(tmp);
strlcpy(tmp, src, tmpLen); strlcpy(tmp, src, tmpLen);
sys_err("FATAL ERROR!! not enough buffer size (dstSize %u srcSize %u src%s: %s)", sys_err("FATAL ERROR!! not enough buffer size (dstSize %u srcSize %u src%s: %s)",
dstSize, srcSize, tmpLen != srcSize ? "(trimmed to 255 characters)" : "", tmp); static_cast<unsigned int>(dstSize), static_cast<unsigned int>(srcSize),
tmpLen != srcSize ? "(trimmed to 255 characters)" : "", tmp);
dst[0] = '\0'; dst[0] = '\0';
return 0; return 0;
@@ -706,9 +573,8 @@ size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char *src, size_
return mysql_real_escape_string(GetSQLHandle(), dst, src, srcSize); return mysql_real_escape_string(GetSQLHandle(), dst, src, srcSize);
} }
void CAsyncSQL2::SetLocale(const std::string & stLocale) void CAsyncSQL2::SetLocale(const std::string& stLocale)
{ {
m_stLocale = stLocale; m_stLocale = stLocale;
QueryLocaleSet(); QueryLocaleSet();
} }

View File

@@ -1,4 +1,4 @@
#ifndef __INC_METIN_II_ASYNCSQL_H__ #ifndef __INC_METIN_II_ASYNCSQL_H__
#define __INC_METIN_II_ASYNCSQL_H__ #define __INC_METIN_II_ASYNCSQL_H__
#include "libthecore/stdafx.h" #include "libthecore/stdafx.h"
@@ -7,85 +7,131 @@
#include <string> #include <string>
#include <queue> #include <queue>
#include <vector> #include <vector>
#include <map>
#include <memory> #include <memory>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <mysql.h> #include <mysql.h>
#include <errmsg.h> #include <errmsg.h>
#include <mysqld_error.h> #include <mysqld_error.h>
#include "Semaphore.h"
#define QUERY_MAX_LEN 8192 #define QUERY_MAX_LEN 8192
typedef struct _SQLResult // Modern RAII wrapper for MySQL results
struct SQLResult
{ {
_SQLResult() SQLResult() noexcept
: pSQLResult(NULL), uiNumRows(0), uiAffectedRows(0), uiInsertID(0) : pSQLResult(nullptr), uiNumRows(0), uiAffectedRows(0), uiInsertID(0)
{ {
} }
~_SQLResult() ~SQLResult()
{ {
if (pSQLResult) if (pSQLResult)
{ {
mysql_free_result(pSQLResult); mysql_free_result(pSQLResult);
pSQLResult = NULL; pSQLResult = nullptr;
} }
} }
MYSQL_RES * pSQLResult; // Delete copy constructor and assignment operator (non-copyable)
uint32_t uiNumRows; SQLResult(const SQLResult&) = delete;
uint32_t uiAffectedRows; SQLResult& operator=(const SQLResult&) = delete;
uint32_t uiInsertID;
} SQLResult;
// Allow move semantics
SQLResult(SQLResult&& other) noexcept
: pSQLResult(other.pSQLResult),
uiNumRows(other.uiNumRows),
uiAffectedRows(other.uiAffectedRows),
uiInsertID(other.uiInsertID)
{
other.pSQLResult = nullptr;
other.uiNumRows = 0;
other.uiAffectedRows = 0;
other.uiInsertID = 0;
}
SQLResult& operator=(SQLResult&& other) noexcept
{
if (this != &other)
{
if (pSQLResult)
{
mysql_free_result(pSQLResult);
}
pSQLResult = other.pSQLResult;
uiNumRows = other.uiNumRows;
uiAffectedRows = other.uiAffectedRows;
uiInsertID = other.uiInsertID;
other.pSQLResult = nullptr;
other.uiNumRows = 0;
other.uiAffectedRows = 0;
other.uiInsertID = 0;
}
return *this;
}
MYSQL_RES* pSQLResult;
uint32_t uiNumRows;
uint32_t uiAffectedRows;
uint32_t uiInsertID;
};
// SQL Message with improved memory management
typedef struct _SQLMsg typedef struct _SQLMsg
{ {
_SQLMsg() : m_pkSQL(NULL), iID(0), uiResultPos(0), pvUserData(NULL), bReturn(false), uiSQLErrno(0) _SQLMsg() noexcept
: m_pkSQL(nullptr), iID(0), uiResultPos(0), pvUserData(nullptr),
bReturn(false), uiSQLErrno(0)
{ {
} }
~_SQLMsg() ~_SQLMsg()
{ {
std::vector<SQLResult *>::iterator first = vec_pkResult.begin();
std::vector<SQLResult *>::iterator past = vec_pkResult.end();
while (first != past)
delete *(first++);
vec_pkResult.clear(); vec_pkResult.clear();
} }
// Delete copy operations
_SQLMsg(const _SQLMsg&) = delete;
_SQLMsg& operator=(const _SQLMsg&) = delete;
// Allow move operations
_SQLMsg(_SQLMsg&&) noexcept = default;
_SQLMsg& operator=(_SQLMsg&&) noexcept = default;
void Store() void Store()
{ {
do do
{ {
SQLResult * pRes = new SQLResult; auto pRes = std::make_unique<SQLResult>();
pRes->pSQLResult = mysql_store_result(m_pkSQL); pRes->pSQLResult = mysql_store_result(m_pkSQL);
pRes->uiInsertID = mysql_insert_id(m_pkSQL); pRes->uiInsertID = static_cast<uint32_t>(mysql_insert_id(m_pkSQL));
pRes->uiAffectedRows = mysql_affected_rows(m_pkSQL); pRes->uiAffectedRows = static_cast<uint32_t>(mysql_affected_rows(m_pkSQL));
if (pRes->pSQLResult) if (pRes->pSQLResult)
{ {
pRes->uiNumRows = mysql_num_rows(pRes->pSQLResult); pRes->uiNumRows = static_cast<uint32_t>(mysql_num_rows(pRes->pSQLResult));
} }
else else
{ {
pRes->uiNumRows = 0; pRes->uiNumRows = 0;
} }
vec_pkResult.push_back(pRes); vec_pkResult.push_back(std::move(pRes));
} while (!mysql_next_result(m_pkSQL)); } while (mysql_next_result(m_pkSQL) == 0);
} }
SQLResult * Get() SQLResult* Get()
{ {
if (uiResultPos >= vec_pkResult.size()) if (uiResultPos >= vec_pkResult.size())
return NULL; return nullptr;
return vec_pkResult[uiResultPos]; return vec_pkResult[uiResultPos].get();
} }
bool Next() bool Next()
@@ -97,17 +143,14 @@ typedef struct _SQLMsg
return true; return true;
} }
MYSQL * m_pkSQL; MYSQL* m_pkSQL;
int iID; int iID;
std::string stQuery; std::string stQuery;
std::vector<std::unique_ptr<SQLResult>> vec_pkResult;
std::vector<SQLResult *> vec_pkResult; // result 벡터 unsigned int uiResultPos;
unsigned int uiResultPos; // 현재 result 위치 void* pvUserData;
bool bReturn;
void * pvUserData; unsigned int uiSQLErrno;
bool bReturn;
unsigned int uiSQLErrno;
} SQLMsg; } SQLMsg;
class CAsyncSQL class CAsyncSQL
@@ -116,97 +159,89 @@ class CAsyncSQL
CAsyncSQL(); CAsyncSQL();
virtual ~CAsyncSQL(); virtual ~CAsyncSQL();
void Quit(); void Quit();
bool Setup(const char * c_pszHost, const char * c_pszUser, const char * c_pszPassword, const char * c_pszDB, const char * c_pszLocale, bool Setup(const char* c_pszHost, const char* c_pszUser, const char* c_pszPassword,
bool bNoThread = false, int iPort = 0); const char* c_pszDB, const char* c_pszLocale, bool bNoThread = false, int iPort = 0);
bool Setup(CAsyncSQL * sql, bool bNoThread = false); bool Setup(CAsyncSQL* sql, bool bNoThread = false);
bool Connect(); bool Connect();
bool IsConnected() { return m_bConnected; } bool IsConnected() const { return m_bConnected.load(std::memory_order_acquire); }
bool QueryLocaleSet(); bool QueryLocaleSet();
void AsyncQuery(const char * c_pszQuery); void AsyncQuery(const char* c_pszQuery);
void ReturnQuery(const char * c_pszQuery, void * pvUserData); void ReturnQuery(const char* c_pszQuery, void* pvUserData);
std::unique_ptr<SQLMsg> DirectQuery(const char* c_pszQuery); std::unique_ptr<SQLMsg> DirectQuery(const char* c_pszQuery);
DWORD CountQuery(); DWORD CountQuery();
DWORD CountResult(); DWORD CountResult();
void PushResult(SQLMsg * p); void PushResult(std::unique_ptr<SQLMsg> p);
bool PopResult(SQLMsg ** pp); bool PopResult(std::unique_ptr<SQLMsg>& p);
void ChildLoop(); // Legacy API compatibility - deprecated, use PopResult(unique_ptr&) instead
bool PopResult(SQLMsg** pp);
MYSQL * GetSQLHandle(); void ChildLoop();
int CountQueryFinished(); MYSQL* GetSQLHandle();
void ResetQueryFinished();
size_t EscapeString(char* dst, size_t dstSize, const char *src, size_t srcSize); int CountQueryFinished() const;
void ResetQueryFinished();
size_t EscapeString(char* dst, size_t dstSize, const char* src, size_t srcSize);
protected: protected:
void Destroy(); void Destroy();
void PushQuery(std::unique_ptr<SQLMsg> p);
void PushQuery(SQLMsg * p); bool PeekQuery(SQLMsg** pp);
bool PopQuery(int iID);
bool PeekQuery(SQLMsg ** pp); bool PeekQueryFromCopyQueue(SQLMsg** pp);
bool PopQuery(int iID); int CopyQuery();
bool PopQueryFromCopyQueue();
bool PeekQueryFromCopyQueue(SQLMsg ** pp );
INT CopyQuery();
bool PopQueryFromCopyQueue();
public: public:
int GetCopiedQueryCount(); int GetCopiedQueryCount() const;
void ResetCopiedQueryCount(); void ResetCopiedQueryCount();
void AddCopiedQueryCount( int iCopiedQuery ); void AddCopiedQueryCount(int iCopiedQuery);
//private:
protected: protected:
// MySQL connection
MYSQL m_hDB; MYSQL m_hDB;
std::string m_stHost; // Connection info
std::string m_stUser; std::string m_stHost;
std::string m_stPassword; std::string m_stUser;
std::string m_stDB; std::string m_stPassword;
std::string m_stLocale; std::string m_stDB;
std::string m_stLocale;
int m_iMsgCount;
int m_aiPipe[2];
int m_iPort; int m_iPort;
std::queue<SQLMsg *> m_queue_query; // Thread control
std::queue<SQLMsg *> m_queue_query_copy; std::unique_ptr<std::thread> m_thread;
//std::map<int, SQLMsg *> m_map_kSQLMsgUnfinished; std::atomic<bool> m_bEnd;
std::atomic<bool> m_bConnected;
std::queue<SQLMsg *> m_queue_result; // Query queues with mutex protection
std::queue<std::unique_ptr<SQLMsg>> m_queue_query;
std::queue<std::unique_ptr<SQLMsg>> m_queue_query_copy;
std::queue<std::unique_ptr<SQLMsg>> m_queue_result;
volatile bool m_bEnd; std::mutex m_mtxQuery;
std::mutex m_mtxResult;
std::condition_variable m_cvQuery;
#ifndef OS_WINDOWS // Counters
pthread_t m_hThread; std::atomic<int> m_iMsgCount;
std::unique_ptr<pthread_mutex_t> m_mtxQuery; std::atomic<int> m_iQueryFinished;
std::unique_ptr<pthread_mutex_t> m_mtxResult; std::atomic<int> m_iCopiedQuery;
#else std::atomic<unsigned long> m_ulThreadID;
HANDLE m_hThread;
std::unique_ptr<CRITICAL_SECTION> m_mtxQuery;
std::unique_ptr<CRITICAL_SECTION> m_mtxResult;
#endif
CSemaphore m_sem;
int m_iQueryFinished;
unsigned long m_ulThreadID;
bool m_bConnected;
int m_iCopiedQuery;
}; };
class CAsyncSQL2 : public CAsyncSQL class CAsyncSQL2 : public CAsyncSQL
{ {
public: public:
void SetLocale ( const std::string & stLocale ); void SetLocale(const std::string& stLocale);
}; };
#endif #endif