* Preserve interface of std::condition_variable through "perfect
  forwarding"
* Decouple SignalHandler and AtomicCondition
* Callback function for signals
* Improve readme
* Bump version to v0.2
This commit is contained in:
Tom 2019-10-13 15:00:20 +02:00
parent 9aaa7180fc
commit 9af897fa3b
5 changed files with 248 additions and 211 deletions

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.8 FATAL_ERROR)
project(signal-handler VERSION 0.1.0)
project(signal-handler VERSION 0.2.0)
# Add the top-level cmake module directory to CMAKE_MODULE_PATH
list(INSERT CMAKE_MODULE_PATH 0 ${PROJECT_SOURCE_DIR}/cmake)

View File

@ -1,60 +1,51 @@
Signal handler for multithreaded C++ applications on Linux
==========================================================
Signal handler that uses [pthread_sigmask](http://man7.org/linux/man-pages/man3/pthread_sigmask.3.html) and [sigwait](http://man7.org/linux/man-pages/man3/sigwait.3.html).
## Dependencies
* C++17
* Clang or GCC
* linux
* pthread
* cmake (recommended, but optional)
## Example usage
```
{
// Block signals
sgnl::SignalHandler signal_handler({SIGINT, SIGINT});
// Wait for a signal
int signal_number = signal_handler.sigwait();
// Or, pass a handler
auto handler = [](int signum) {
if( signum == SIGINT )
// continue waiting for signals
return false;
if( signum == SIGTERM )
// stop waiting for signals
return true;
};
int last_signal = signal_handler.sigwait_handler(handler);
} // signals are unblocked again
```
## Build & Install
```
mkdir -p build/ && cd build/
cmake ..
make
make test
# build and run tests
make sgnl-test && ./test/sgnl-test
# install headers and CMake config
make install
```
Example usage:
```
bool worker(sgnl::AtomicCondition<bool>& exit_condition)
{
while( true )
{
exit_condition.wait_for(std::chrono::milliseconds(1000), true);
if( exit_condition.get() )
return true;
}
}
sgnl::AtomicCondition exit_condition(false);
sgnl::SignalHandler signal_handler(
{{SIGINT, true}, {SIGTERM, true}},
exit_condition);
std::promise<pthread_t> signal_handler_thread_id;
std::future<int> ft_sig_handler =
std::async(std::launch::async, [&]() {
signal_handler_thread_id.set_value(pthread_self());
return signal_handler();
});
std::vector<std::future<bool>> futures;
for(int i = 0; i < 10; ++i)
futures.push_back(
std::async(
std::launch::async,
&worker,
std::ref(exit_condition)));
// simulate [ctrl]+[c], which sends SIGINT
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pthread_kill(
signal_handler_thread_id.get_future().get(),
SIGINT);
for(auto& future : futures)
future.get();
int signal = ft_sig_handler.get();
```

View File

@ -7,17 +7,22 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <utility>
namespace sgnl {
/// Paires a `std::condition_variable` with a `std::atomic<ValueType>`,
/// protected by a mutex.
/// The interface of `std::condition_variable` is preserved through "perfect
/// forwarding".
template<typename ValueType>
class AtomicCondition
{
public:
explicit AtomicCondition(ValueType val)
: value_(val)
explicit AtomicCondition(ValueType initial_value)
: value_(initial_value)
, mutex_()
, condvar_()
{
@ -30,38 +35,36 @@ public:
void set(ValueType val) noexcept
{
{
// This ensures that wait_for is either
// 1. not running, or
// 2. in a waiting state
// to avoid a data race between value_.load and cond_var.wait_for.
std::unique_lock<std::mutex> lock(this->mutex_);
// This lock is required to avoid a data race between calls to
// `AtomicCondition::get` when called in `AtomicCondition::wait_*` through
// a predicate.
std::unique_lock lock(this->mutex_);
this->value_.store(val);
}
this->condvar_.notify_all();
auto native_handle() { return this->condvar_.native_handle(); }
void notify_one() const noexcept { this->condvar_.notify_one(); }
void notify_all() const noexcept { this->condvar_.notify_all(); }
template<typename... Args>
auto wait(Args&&... args) const
{
std::unique_lock lock(this->mutex_);
return this->condvar_.wait(lock, std::forward<Args>(args)...);
}
template<typename Rep, typename Period>
void wait_for(const std::chrono::duration<Rep, Period>& time,
ValueType val) const
template<typename... Args>
auto wait_for(Args&&... args) const
{
std::unique_lock<std::mutex> lock(this->mutex_);
while( this->value_.load() != val )
if( this->condvar_.wait_for(lock, time) == std::cv_status::timeout )
return;
std::unique_lock lock(this->mutex_);
return this->condvar_.wait_for(lock, std::forward<Args>(args)...);
}
template<typename Rep, typename Period, typename Predicate>
void wait_for(const std::chrono::duration<Rep, Period>& time,
Predicate pred) const
template<typename... Args>
auto wait_until(Args&&... args) const
{
std::unique_lock<std::mutex> lock(this->mutex_);
while( !pred() )
if( this->condvar_.wait_for(lock, time) == std::cv_status::timeout )
return;
std::unique_lock lock(this->mutex_);
return this->condvar_.wait_until(lock, std::forward<Args>(args)...);
}
private:

View File

@ -3,13 +3,12 @@
#pragma once
#include <sgnl/AtomicCondition.h>
#include <csignal>
#include <cstring>
#include <map>
#include <functional>
#include <initializer_list>
#include <stdexcept>
#include <utility>
#include <string>
namespace sgnl {
@ -21,21 +20,26 @@ class SignalHandlerException : public std::runtime_error
};
template<typename ValueType>
/// When constructed, SignalHandler blocks the given `signals` in the calling
/// thread.
///
/// Signals can be polled by calling `SignalHandler::sigwait()` or
/// `SignalHandler::sigwait_handler(handler)`.
/// `handler` is a callable that accepts a signal number as its first and only
/// argument. `handler` returns false if the waiting should be continued.
///
/// When destructed, SignalHandler unblocks the `signals`.
class SignalHandler
{
public:
SignalHandler(std::map<int, ValueType> signal_map,
AtomicCondition<ValueType>& condition)
: signal_map_(std::move(signal_map))
, set_()
, condition_(condition)
explicit SignalHandler(const std::initializer_list<int>& signals)
: set_()
{
if( sigemptyset(&this->set_) != 0 )
throw SignalHandlerException("sigemptyset error");
for( const auto& p : this->signal_map_ )
if( sigaddset(&this->set_, p.first) != 0 )
for( int signum : signals )
if( sigaddset(&this->set_, signum) != 0 )
throw SignalHandlerException("sigaddset error");
int s = pthread_sigmask(SIG_BLOCK, &this->set_, nullptr);
@ -44,29 +48,38 @@ public:
std::string("pthread_sigmask: ") + std::strerror(s));
}
int operator()()
~SignalHandler()
{
while( true )
pthread_sigmask(SIG_UNBLOCK, &this->set_, nullptr);
}
SignalHandler(const SignalHandler& other) = delete;
SignalHandler(SignalHandler&& other) = delete;
SignalHandler& operator=(const SignalHandler& other) = delete;
SignalHandler& operator=(SignalHandler&& other) = delete;
int sigwait()
{
int signum = 0;
int ret = sigwait(&this->set_, &signum);
int ret = ::sigwait(&this->set_, &signum);
if( ret != 0 )
throw SignalHandlerException(
std::string("sigwait: ") + std::strerror(ret));
if( auto it = this->signal_map_.find(signum);
it != this->signal_map_.end() )
{
this->condition_.set(it->second);
return it->first;
return signum;
}
int sigwait_handler(std::function<bool (int)> handler)
{
while( true )
{
int signum = this->sigwait();
if( handler(signum) )
return signum;
}
}
private:
std::map<int, ValueType> signal_map_;
sigset_t set_;
AtomicCondition<ValueType>& condition_;
};

View File

@ -3,31 +3,36 @@
#include <catch2/catch.hpp>
#include <sgnl/AtomicCondition.h>
#include <sgnl/SignalHandler.h>
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <chrono>
#include <future>
#include <map>
#include <vector>
namespace {
bool worker(sgnl::AtomicCondition<bool>& exit_condition)
bool Worker(const sgnl::AtomicCondition<bool>& exit_condition)
{
exit_condition.wait_for(std::chrono::hours(1000), true);
exit_condition.wait_for(
std::chrono::hours(1000),
[&exit_condition](){ return exit_condition.get() == true; });
return exit_condition.get();
}
int looping_worker(sgnl::AtomicCondition<bool>& exit_condition)
int LoopingWorker(const sgnl::AtomicCondition<bool>& exit_condition)
{
int i = 0;
while( exit_condition.get() == false )
{
exit_condition.wait_for(std::chrono::milliseconds(2), true);
exit_condition.wait_for(
std::chrono::milliseconds(2),
[&exit_condition](){ return exit_condition.get() == true; });
++i;
}
@ -46,184 +51,209 @@ TEST_CASE("condition-get-set")
REQUIRE( condition.get() == 42 );
}
TEST_CASE("wait-for-value")
TEST_CASE("wait")
{
sgnl::AtomicCondition<int> condition(23);
std::future<void> future =
sgnl::AtomicCondition condition(false);
std::future<bool> future =
std::async(
std::launch::async,
[&] { condition.wait_for(std::chrono::hours(1000), 42); });
[&condition](){ condition.wait(); return true; });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
condition.set(42);
future.get();
condition.notify_one();
REQUIRE( future.get() == true );
}
REQUIRE( condition.get() == 42 );
TEST_CASE("wait-pred")
{
sgnl::AtomicCondition condition(false);
std::future<bool> future =
std::async(
std::launch::async,
[&condition](){ condition.wait([](){ return true; }); return true; });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
condition.notify_one();
REQUIRE( future.get() == true );
}
TEST_CASE("wait-for-predicate")
{
sgnl::AtomicCondition<int> condition(23);
sgnl::AtomicCondition condition(23);
auto pred = [&condition](){ return condition.get() == 42; };
std::future<void> future =
std::async(
std::launch::async,
[&] {
auto pred = [&]() { return condition.get() == 42; };
condition.wait_for(std::chrono::hours(1000), pred);
});
[&condition, &pred](){
condition.wait_for(std::chrono::hours(1000), pred); });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
condition.set(42);
future.get();
condition.notify_all();
future.wait();
REQUIRE( condition.get() == 42 );
}
TEST_CASE("constructor-thread-blocks-signals")
TEST_CASE("wait-until-predicate")
{
sgnl::AtomicCondition<bool> condition(false);
sgnl::SignalHandler signal_handler({{SIGTERM, true}, {SIGINT, true}}, condition);
std::promise<pthread_t> signal_handler_thread_id;
std::future<int> ft_sig_handler =
std::async(std::launch::async, [&]() {
signal_handler_thread_id.set_value(pthread_self());
return signal_handler();
});
sgnl::AtomicCondition condition(23);
auto pred = [&condition](){ return condition.get() == 42; };
std::future<void> future =
std::async(
std::launch::async,
[&condition, &pred](){
condition.wait_until(
std::chrono::system_clock::now() + std::chrono::hours(1),
pred); });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
// send terminate signal to main thread ..
REQUIRE( pthread_kill(pthread_self(), SIGTERM) == 0 );
condition.set(42);
condition.notify_all();
future.wait();
// .. but we're still running
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
// send interrupt signal to signal handler thread
REQUIRE(
pthread_kill(
signal_handler_thread_id.get_future().get(),
SIGINT) == 0 );
CHECK( ft_sig_handler.get() == SIGINT );
CHECK( condition.get() == true );
CHECK( signal_handler() == SIGTERM );
REQUIRE( condition.get() == 42 );
}
TEST_CASE("exit-condition")
TEST_CASE("sigwait")
{
std::vector<int> signals({SIGINT, SIGTERM, SIGUSR1, SIGUSR2});
sgnl::SignalHandler signal_handler({SIGUSR1});
kill(0, SIGUSR1);
REQUIRE( signal_handler.sigwait() == SIGUSR1 );
}
TEST_CASE("sigwait_handler")
{
auto handler = [](int){
return true;
};
sgnl::SignalHandler signal_handler({SIGUSR2});
kill(0, SIGUSR2);
REQUIRE( signal_handler.sigwait_handler(handler) == SIGUSR2 );
}
TEST_CASE("constructor-thread-blocks-signals")
{
std::atomic last_signal(0);
sgnl::SignalHandler signal_handler({SIGTERM, SIGINT});
auto handler = [&last_signal](int signum) {
last_signal.store(signum);
return signum == SIGINT;
};
std::future<int> ft_sig_handler =
std::async(
std::launch::async,
&sgnl::SignalHandler::sigwait_handler,
&signal_handler,
std::ref(handler));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
REQUIRE( kill(0, SIGTERM) == 0 );
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
CHECK( last_signal.load() == SIGTERM );
REQUIRE( kill(0, SIGINT) == 0 );
REQUIRE( ft_sig_handler.get() == SIGINT );
REQUIRE( last_signal.load() == SIGINT );
}
TEST_CASE("sleeping-workers-with-exit-condition")
{
sgnl::AtomicCondition exit_condition(false);
std::initializer_list signals({SIGINT, SIGTERM, SIGUSR1, SIGUSR2});
for( auto test_signal : signals )
{
sgnl::AtomicCondition exit_condition(false);
exit_condition.set(false);
auto handler = [&exit_condition, test_signal](int signum) {
exit_condition.set(true);
exit_condition.notify_all();
return test_signal == signum;
};
sgnl::SignalHandler signal_handler({{test_signal, true}}, exit_condition);
std::promise<pthread_t> signal_handler_thread_id;
sgnl::SignalHandler signal_handler({test_signal});
std::future<int> ft_sig_handler =
std::async(std::launch::async, [&]() {
signal_handler_thread_id.set_value(pthread_self());
return signal_handler();
});
std::async(
std::launch::async,
&sgnl::SignalHandler::sigwait_handler,
&signal_handler,
std::ref(handler));
std::vector<std::future<bool>> futures;
for(int i = 0; i < 50; ++i)
futures.push_back(
std::async(
std::launch::async,
&worker,
Worker,
std::ref(exit_condition)));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
REQUIRE(
pthread_kill(
signal_handler_thread_id.get_future().get(),
test_signal) == 0 );
REQUIRE( kill(0, test_signal) == 0 );
for(auto& future : futures)
REQUIRE(future.get() == true);
int signal = ft_sig_handler.get();
REQUIRE( signal == test_signal );
REQUIRE( ft_sig_handler.get() == test_signal );
}
}
TEST_CASE("exit-condition-looping")
TEST_CASE("looping-workers-with-exit-condition")
{
std::vector<int> signals({SIGINT, SIGTERM, SIGUSR1, SIGUSR2});
std::atomic last_signal(0);
sgnl::AtomicCondition exit_condition(false);
std::initializer_list signals({SIGINT, SIGTERM, SIGUSR1, SIGUSR2});
for( auto test_signal : signals )
{
sgnl::AtomicCondition exit_condition(false);
exit_condition.set(false);
auto handler = [&exit_condition, test_signal](int signum) {
exit_condition.set(true);
exit_condition.notify_all();
return test_signal == signum;
};
sgnl::SignalHandler signal_handler({{test_signal, true}}, exit_condition);
std::promise<pthread_t> signal_handler_thread_id;
sgnl::SignalHandler signal_handler({test_signal});
std::future<int> ft_sig_handler =
std::async(std::launch::async, [&]() {
signal_handler_thread_id.set_value(pthread_self());
return signal_handler();
});
std::async(
std::launch::async,
&sgnl::SignalHandler::sigwait_handler,
&signal_handler,
std::ref(handler));
std::vector<std::future<int>> futures;
for(int i = 0; i < 10; ++i)
futures.push_back(
std::async(
std::launch::async,
&looping_worker,
LoopingWorker,
std::ref(exit_condition)));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
REQUIRE(
pthread_kill(
signal_handler_thread_id.get_future().get(),
test_signal) == 0 );
REQUIRE( kill(0, test_signal) == 0 );
for(auto& future : futures)
// After 100 milliseconds, each worker thread should
// have looped at least 10 times.
// This might break if system is under heavy load
// or really slow.
REQUIRE(future.get() > 10);
REQUIRE( future.get() > 10 );
int signal = ft_sig_handler.get();
REQUIRE( signal == test_signal );
}
}
TEST_CASE("signal-handler-reuse")
{
std::map<int, int> signal_map({{SIGINT, 1 << 0},
{SIGTERM, 1 << 1},
{SIGUSR1, 1 << 2},
{SIGUSR2, 1 << 3}});
sgnl::AtomicCondition<int> condition(0);
sgnl::SignalHandler signal_handler(signal_map, condition);
for( auto p : signal_map )
{
std::promise<pthread_t> signal_handler_thread_id;
std::future<int> ft_sig_handler =
std::async(std::launch::async, [&]() {
signal_handler_thread_id.set_value(pthread_self());
return signal_handler();
});
REQUIRE(
pthread_kill(
signal_handler_thread_id.get_future().get(),
p.first) == 0 );
REQUIRE( ft_sig_handler.get() == p.first );
REQUIRE( condition.get() == p.second );
REQUIRE( ft_sig_handler.get() == test_signal );
}
}