#include "stdafx.h" #include #include #include #include #include namespace { std::atomic s_checkpoint_progress { 0 }; #ifndef OS_WINDOWS std::mutex s_checkpoint_mutex; std::condition_variable s_checkpoint_cv; std::thread s_checkpoint_thread; bool s_checkpoint_shutdown = false; bool s_checkpoint_enabled = false; int s_checkpoint_timeout_seconds = 0; uint64_t s_checkpoint_generation = 0; void checkpoint_watchdog_loop() { uint64_t last_progress = s_checkpoint_progress.load(std::memory_order_relaxed); auto last_change = std::chrono::steady_clock::now(); uint64_t observed_generation = 0; std::unique_lock lock(s_checkpoint_mutex); while (!s_checkpoint_shutdown) { if (!s_checkpoint_enabled || s_checkpoint_timeout_seconds <= 0) { s_checkpoint_cv.wait(lock, []() { return s_checkpoint_shutdown || (s_checkpoint_enabled && s_checkpoint_timeout_seconds > 0); }); last_progress = s_checkpoint_progress.load(std::memory_order_relaxed); last_change = std::chrono::steady_clock::now(); observed_generation = s_checkpoint_generation; continue; } const int timeout_seconds = s_checkpoint_timeout_seconds; const uint64_t generation = s_checkpoint_generation; const auto poll_interval = std::chrono::seconds(1); const bool reconfigured = s_checkpoint_cv.wait_for(lock, poll_interval, [generation]() { return s_checkpoint_shutdown || s_checkpoint_generation != generation; }); if (s_checkpoint_shutdown) break; if (reconfigured || observed_generation != s_checkpoint_generation) { last_progress = s_checkpoint_progress.load(std::memory_order_relaxed); last_change = std::chrono::steady_clock::now(); observed_generation = s_checkpoint_generation; continue; } const uint64_t current_progress = s_checkpoint_progress.load(std::memory_order_relaxed); const auto now = std::chrono::steady_clock::now(); if (current_progress != last_progress) { last_progress = current_progress; last_change = now; continue; } if (now - last_change >= std::chrono::seconds(timeout_seconds)) { lock.unlock(); sys_err("CHECKPOINT shutdown: no progress observed for %d seconds.", timeout_seconds); abort(); } } } void checkpoint_ensure_started() { std::lock_guard lock(s_checkpoint_mutex); if (s_checkpoint_thread.joinable()) return; s_checkpoint_shutdown = false; s_checkpoint_enabled = false; s_checkpoint_timeout_seconds = 0; s_checkpoint_generation = 0; s_checkpoint_thread = std::thread(checkpoint_watchdog_loop); } #endif const char* signal_checkpoint_backend_name_impl(ECheckpointBackend backend) { switch (backend) { case CHECKPOINT_BACKEND_NONE: return "none"; case CHECKPOINT_BACKEND_VIRTUAL_TIMER: return "virtual-timer"; case CHECKPOINT_BACKEND_WATCHDOG_THREAD: return "watchdog-thread"; default: return "unknown"; } } } #ifdef OS_WINDOWS void signal_setup() {} void signal_destroy() {} void signal_timer_disable() {} void signal_timer_enable(int timeout_seconds) {} void signal_mark_progress() {} ECheckpointBackend signal_checkpoint_backend() { return CHECKPOINT_BACKEND_NONE; } const char* signal_checkpoint_backend_name(ECheckpointBackend backend) { return signal_checkpoint_backend_name_impl(backend); } #else #define RETSIGTYPE void RETSIGTYPE reap(int sig) { while (waitpid(-1, NULL, WNOHANG) > 0); signal(SIGCHLD, reap); } RETSIGTYPE hupsig(int sig) { shutdowned = TRUE; sys_err("SIGHUP, SIGINT, SIGTERM signal has been received. shutting down."); } RETSIGTYPE usrsig(int sig) { core_dump(); } void signal_timer_disable(void) { checkpoint_ensure_started(); { std::lock_guard lock(s_checkpoint_mutex); s_checkpoint_enabled = false; s_checkpoint_timeout_seconds = 0; ++s_checkpoint_generation; } s_checkpoint_cv.notify_all(); } void signal_timer_enable(int sec) { checkpoint_ensure_started(); { std::lock_guard lock(s_checkpoint_mutex); s_checkpoint_enabled = sec > 0; s_checkpoint_timeout_seconds = sec; ++s_checkpoint_generation; } s_checkpoint_cv.notify_all(); } void signal_setup(void) { checkpoint_ensure_started(); signal_timer_enable(30); /* just to be on the safe side: */ signal(SIGHUP, hupsig); signal(SIGCHLD, reap); signal(SIGINT, hupsig); signal(SIGTERM, hupsig); signal(SIGPIPE, SIG_IGN); signal(SIGALRM, SIG_IGN); signal(SIGUSR1, usrsig); sys_log(0, "[STARTUP] checkpoint backend=%s", signal_checkpoint_backend_name(signal_checkpoint_backend())); } void signal_destroy() { std::thread checkpoint_thread; { std::lock_guard lock(s_checkpoint_mutex); if (!s_checkpoint_thread.joinable()) return; s_checkpoint_shutdown = true; s_checkpoint_enabled = false; s_checkpoint_timeout_seconds = 0; ++s_checkpoint_generation; checkpoint_thread = std::move(s_checkpoint_thread); } s_checkpoint_cv.notify_all(); checkpoint_thread.join(); s_checkpoint_progress.store(0, std::memory_order_relaxed); } void signal_mark_progress() { s_checkpoint_progress.fetch_add(1, std::memory_order_relaxed); } ECheckpointBackend signal_checkpoint_backend() { return CHECKPOINT_BACKEND_WATCHDOG_THREAD; } const char* signal_checkpoint_backend_name(ECheckpointBackend backend) { return signal_checkpoint_backend_name_impl(backend); } #endif