runtime: replace virtual checkpoint timer with watchdog
This commit is contained in:
@@ -88,6 +88,7 @@ int thecore_idle(void)
|
||||
|
||||
void thecore_destroy(void)
|
||||
{
|
||||
signal_destroy();
|
||||
pid_deinit();
|
||||
log_destroy();
|
||||
}
|
||||
|
||||
@@ -1,10 +1,100 @@
|
||||
#include "stdafx.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
namespace
|
||||
{
|
||||
std::atomic<int> s_checkpoint_ticks { 0 };
|
||||
std::atomic<uint64_t> s_checkpoint_progress { 0 };
|
||||
|
||||
#ifndef OS_WINDOWS
|
||||
std::mutex s_checkpoint_mutex;
|
||||
std::condition_variable s_checkpoint_cv;
|
||||
std::thread s_checkpoint_thread;
|
||||
bool s_checkpoint_shutdown = false;
|
||||
bool s_checkpoint_enabled = false;
|
||||
int s_checkpoint_timeout_seconds = 0;
|
||||
uint64_t s_checkpoint_generation = 0;
|
||||
|
||||
void checkpoint_watchdog_loop()
|
||||
{
|
||||
uint64_t last_progress = s_checkpoint_progress.load(std::memory_order_relaxed);
|
||||
auto last_change = std::chrono::steady_clock::now();
|
||||
uint64_t observed_generation = 0;
|
||||
|
||||
std::unique_lock<std::mutex> lock(s_checkpoint_mutex);
|
||||
|
||||
while (!s_checkpoint_shutdown)
|
||||
{
|
||||
if (!s_checkpoint_enabled || s_checkpoint_timeout_seconds <= 0)
|
||||
{
|
||||
s_checkpoint_cv.wait(lock, []()
|
||||
{
|
||||
return s_checkpoint_shutdown || (s_checkpoint_enabled && s_checkpoint_timeout_seconds > 0);
|
||||
});
|
||||
|
||||
last_progress = s_checkpoint_progress.load(std::memory_order_relaxed);
|
||||
last_change = std::chrono::steady_clock::now();
|
||||
observed_generation = s_checkpoint_generation;
|
||||
continue;
|
||||
}
|
||||
|
||||
const int timeout_seconds = s_checkpoint_timeout_seconds;
|
||||
const uint64_t generation = s_checkpoint_generation;
|
||||
const auto poll_interval = std::chrono::seconds(1);
|
||||
|
||||
const bool reconfigured = s_checkpoint_cv.wait_for(lock, poll_interval, [generation]()
|
||||
{
|
||||
return s_checkpoint_shutdown || s_checkpoint_generation != generation;
|
||||
});
|
||||
|
||||
if (s_checkpoint_shutdown)
|
||||
break;
|
||||
|
||||
if (reconfigured || observed_generation != s_checkpoint_generation)
|
||||
{
|
||||
last_progress = s_checkpoint_progress.load(std::memory_order_relaxed);
|
||||
last_change = std::chrono::steady_clock::now();
|
||||
observed_generation = s_checkpoint_generation;
|
||||
continue;
|
||||
}
|
||||
|
||||
const uint64_t current_progress = s_checkpoint_progress.load(std::memory_order_relaxed);
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
|
||||
if (current_progress != last_progress)
|
||||
{
|
||||
last_progress = current_progress;
|
||||
last_change = now;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (now - last_change >= std::chrono::seconds(timeout_seconds))
|
||||
{
|
||||
lock.unlock();
|
||||
sys_err("CHECKPOINT shutdown: no progress observed for %d seconds.", timeout_seconds);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void checkpoint_ensure_started()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(s_checkpoint_mutex);
|
||||
|
||||
if (s_checkpoint_thread.joinable())
|
||||
return;
|
||||
|
||||
s_checkpoint_shutdown = false;
|
||||
s_checkpoint_enabled = false;
|
||||
s_checkpoint_timeout_seconds = 0;
|
||||
s_checkpoint_generation = 0;
|
||||
s_checkpoint_thread = std::thread(checkpoint_watchdog_loop);
|
||||
}
|
||||
#endif
|
||||
|
||||
const char* signal_checkpoint_backend_name_impl(ECheckpointBackend backend)
|
||||
{
|
||||
@@ -14,6 +104,8 @@ const char* signal_checkpoint_backend_name_impl(ECheckpointBackend backend)
|
||||
return "none";
|
||||
case CHECKPOINT_BACKEND_VIRTUAL_TIMER:
|
||||
return "virtual-timer";
|
||||
case CHECKPOINT_BACKEND_WATCHDOG_THREAD:
|
||||
return "watchdog-thread";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
@@ -22,6 +114,7 @@ const char* signal_checkpoint_backend_name_impl(ECheckpointBackend backend)
|
||||
|
||||
#ifdef OS_WINDOWS
|
||||
void signal_setup() {}
|
||||
void signal_destroy() {}
|
||||
void signal_timer_disable() {}
|
||||
void signal_timer_enable(int timeout_seconds) {}
|
||||
void signal_mark_progress() {}
|
||||
@@ -37,18 +130,6 @@ RETSIGTYPE reap(int sig)
|
||||
}
|
||||
|
||||
|
||||
RETSIGTYPE checkpointing(int sig)
|
||||
{
|
||||
if (!s_checkpoint_ticks.load())
|
||||
{
|
||||
sys_err("CHECKPOINT shutdown: tics did not updated.");
|
||||
abort();
|
||||
}
|
||||
else
|
||||
s_checkpoint_ticks.store(0);
|
||||
}
|
||||
|
||||
|
||||
RETSIGTYPE hupsig(int sig)
|
||||
{
|
||||
shutdowned = TRUE;
|
||||
@@ -62,56 +143,79 @@ RETSIGTYPE usrsig(int sig)
|
||||
|
||||
void signal_timer_disable(void)
|
||||
{
|
||||
struct itimerval itime;
|
||||
struct timeval interval;
|
||||
checkpoint_ensure_started();
|
||||
|
||||
interval.tv_sec = 0;
|
||||
interval.tv_usec = 0;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(s_checkpoint_mutex);
|
||||
s_checkpoint_enabled = false;
|
||||
s_checkpoint_timeout_seconds = 0;
|
||||
++s_checkpoint_generation;
|
||||
}
|
||||
|
||||
itime.it_interval = interval;
|
||||
itime.it_value = interval;
|
||||
|
||||
setitimer(ITIMER_VIRTUAL, &itime, NULL);
|
||||
s_checkpoint_cv.notify_all();
|
||||
}
|
||||
|
||||
void signal_timer_enable(int sec)
|
||||
{
|
||||
struct itimerval itime;
|
||||
struct timeval interval;
|
||||
checkpoint_ensure_started();
|
||||
|
||||
interval.tv_sec = sec;
|
||||
interval.tv_usec = 0;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(s_checkpoint_mutex);
|
||||
s_checkpoint_enabled = sec > 0;
|
||||
s_checkpoint_timeout_seconds = sec;
|
||||
++s_checkpoint_generation;
|
||||
}
|
||||
|
||||
itime.it_interval = interval;
|
||||
itime.it_value = interval;
|
||||
|
||||
setitimer(ITIMER_VIRTUAL, &itime, NULL);
|
||||
s_checkpoint_cv.notify_all();
|
||||
}
|
||||
|
||||
void signal_setup(void)
|
||||
{
|
||||
signal_timer_enable(30);
|
||||
checkpoint_ensure_started();
|
||||
signal_timer_enable(30);
|
||||
|
||||
signal(SIGVTALRM, checkpointing);
|
||||
/* just to be on the safe side: */
|
||||
signal(SIGHUP, hupsig);
|
||||
signal(SIGCHLD, reap);
|
||||
signal(SIGINT, hupsig);
|
||||
signal(SIGTERM, hupsig);
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
signal(SIGALRM, SIG_IGN);
|
||||
signal(SIGUSR1, usrsig);
|
||||
|
||||
/* just to be on the safe side: */
|
||||
signal(SIGHUP, hupsig);
|
||||
signal(SIGCHLD, reap);
|
||||
signal(SIGINT, hupsig);
|
||||
signal(SIGTERM, hupsig);
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
signal(SIGALRM, SIG_IGN);
|
||||
signal(SIGUSR1, usrsig);
|
||||
sys_log(0, "[STARTUP] checkpoint backend=%s", signal_checkpoint_backend_name(signal_checkpoint_backend()));
|
||||
}
|
||||
|
||||
void signal_destroy()
|
||||
{
|
||||
std::thread checkpoint_thread;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(s_checkpoint_mutex);
|
||||
|
||||
if (!s_checkpoint_thread.joinable())
|
||||
return;
|
||||
|
||||
s_checkpoint_shutdown = true;
|
||||
s_checkpoint_enabled = false;
|
||||
s_checkpoint_timeout_seconds = 0;
|
||||
++s_checkpoint_generation;
|
||||
checkpoint_thread = std::move(s_checkpoint_thread);
|
||||
}
|
||||
|
||||
s_checkpoint_cv.notify_all();
|
||||
checkpoint_thread.join();
|
||||
s_checkpoint_progress.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void signal_mark_progress()
|
||||
{
|
||||
s_checkpoint_ticks.fetch_add(1, std::memory_order_relaxed);
|
||||
s_checkpoint_progress.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
ECheckpointBackend signal_checkpoint_backend()
|
||||
{
|
||||
return CHECKPOINT_BACKEND_VIRTUAL_TIMER;
|
||||
return CHECKPOINT_BACKEND_WATCHDOG_THREAD;
|
||||
}
|
||||
|
||||
const char* signal_checkpoint_backend_name(ECheckpointBackend backend)
|
||||
|
||||
@@ -4,9 +4,11 @@ enum ECheckpointBackend
|
||||
{
|
||||
CHECKPOINT_BACKEND_NONE = 0,
|
||||
CHECKPOINT_BACKEND_VIRTUAL_TIMER = 1,
|
||||
CHECKPOINT_BACKEND_WATCHDOG_THREAD = 2,
|
||||
};
|
||||
|
||||
void signal_setup();
|
||||
void signal_destroy();
|
||||
void signal_timer_disable();
|
||||
void signal_timer_enable(int timeout_seconds);
|
||||
void signal_mark_progress();
|
||||
|
||||
Reference in New Issue
Block a user