timers,feature: make timers thread-safe by using atomic to store timers state.

master
Francisco 4 years ago committed by Francisco Paisana
parent dddb3ede71
commit f36f5271d3

@ -25,6 +25,7 @@
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <deque> #include <deque>
#include <inttypes.h>
#include <limits> #include <limits>
#include <mutex> #include <mutex>
@ -37,8 +38,9 @@ public:
}; };
/** /**
* Class that manages stack timers. It allows creation of unique_timers, with different ids. Each unique_timer duration, * Class that manages stack timers. It allows creation of unique_timers with different ids. Each unique_timer duration,
* and callback can be set via the set(...) method. A timer can be started/stopped via run()/stop() methods. * and callback can be set via the set(...) method. A timer can be started/stopped via run()/stop() methods.
* The timers access/alteration is thread-safe. Just beware non-atomic uses of its getters.
* Internal Data structures: * Internal Data structures:
* - timer_list - std::deque that stores timer objects via push_back() to keep pointer/reference validity. * - timer_list - std::deque that stores timer objects via push_back() to keep pointer/reference validity.
* The timer index in the timer_list matches the timer object id field. * The timer index in the timer_list matches the timer object id field.
@ -53,20 +55,34 @@ public:
*/ */
class timer_handler class timer_handler
{ {
using tic_diff_t = uint32_t; using tic_diff_t = uint32_t;
using tic_t = uint32_t; using tic_t = uint32_t;
constexpr static uint32_t INVALID_ID = std::numeric_limits<uint32_t>::max(); constexpr static uint32_t INVALID_ID = std::numeric_limits<uint32_t>::max();
constexpr static tic_diff_t INVALID_TIME_DIFF = std::numeric_limits<tic_diff_t>::max(); constexpr static size_t WHEEL_SHIFT = 16U;
constexpr static size_t WHEEL_SHIFT = 16U; constexpr static size_t WHEEL_SIZE = 1U << WHEEL_SHIFT;
constexpr static size_t WHEEL_SIZE = 1U << WHEEL_SHIFT; constexpr static size_t WHEEL_MASK = WHEEL_SIZE - 1U;
constexpr static size_t WHEEL_MASK = WHEEL_SIZE - 1U;
constexpr static uint64_t STOPPED_FLAG = 0U;
constexpr static uint64_t RUNNING_FLAG = static_cast<uint64_t>(1U) << 63U;
constexpr static uint64_t EXPIRED_FLAG = static_cast<uint64_t>(1U) << 62U;
constexpr static tic_diff_t MAX_TIMER_DURATION = 0x3FFFFFFFU;
static bool decode_is_running(uint64_t value) { return (value & RUNNING_FLAG) != 0; }
static bool decode_is_expired(uint64_t value) { return (value & EXPIRED_FLAG) != 0; }
static tic_diff_t decode_duration(uint64_t value) { return (value >> 32U) & MAX_TIMER_DURATION; }
static tic_t decode_timeout(uint64_t value) { return static_cast<uint32_t>(value & 0xFFFFFFFFU); }
static uint64_t encode_state(uint64_t mode_flag, uint32_t duration, uint32_t timeout)
{
return mode_flag + (static_cast<uint64_t>(duration) << 32U) + timeout;
}
struct timer_impl : public intrusive_double_linked_list_element<>, public intrusive_forward_list_element<> { struct timer_impl : public intrusive_double_linked_list_element<>, public intrusive_forward_list_element<> {
timer_handler& parent; // const
const uint32_t id; const uint32_t id;
tic_diff_t duration = INVALID_TIME_DIFF; timer_handler& parent;
tic_t timeout = 0; // writes protected by backend lock
enum state_t : int8_t { empty, stopped, running, expired } state = empty; bool allocated = false;
std::atomic<uint64_t> state{0}; ///< read can be without lock, thus writes must be atomic
srsran::move_callback<void(uint32_t)> callback; srsran::move_callback<void(uint32_t)> callback;
explicit timer_impl(timer_handler& parent_, uint32_t id_) : parent(parent_), id(id_) {} explicit timer_impl(timer_handler& parent_, uint32_t id_) : parent(parent_), id(id_) {}
@ -75,32 +91,38 @@ class timer_handler
timer_impl& operator=(const timer_impl&) = delete; timer_impl& operator=(const timer_impl&) = delete;
timer_impl& operator=(timer_impl&&) = delete; timer_impl& operator=(timer_impl&&) = delete;
bool is_empty() const { return state == empty; } // unprotected
bool is_running() const { return state == running; } bool is_running_() const { return decode_is_running(state.load(std::memory_order_relaxed)); }
bool is_expired() const { return state == expired; } bool is_expired_() const { return decode_is_expired(state.load(std::memory_order_relaxed)); }
tic_diff_t time_left() const { return is_running() ? timeout - parent.cur_time : (is_expired() ? 0 : duration); } uint32_t duration_() const { return decode_duration(state.load(std::memory_order_relaxed)); }
uint32_t time_elapsed() const { return duration - time_left(); } bool is_set_() const { return duration_() > 0; }
tic_diff_t time_elapsed_() const
{
uint64_t state_snapshot = state.load(std::memory_order_relaxed);
bool running = decode_is_running(state_snapshot), expired = decode_is_expired(state_snapshot);
uint32_t duration = decode_duration(state_snapshot), timeout = decode_timeout(state_snapshot);
return running ? duration - (timeout - parent.cur_time) : (expired ? duration : 0);
}
bool set(uint32_t duration_) void set(uint32_t duration_)
{ {
duration = std::max(duration_, 1U); // the next step will be one place ahead of current one srsran_assert(duration_ <= MAX_TIMER_DURATION,
if (is_running()) { "Invalid timer duration=%" PRIu32 ">%" PRIu32,
// if already running, just extends timer lifetime duration_,
run(); MAX_TIMER_DURATION);
} else { std::lock_guard<std::mutex> lock(parent.mutex);
state = stopped; set_(duration_);
timeout = 0;
}
return true;
} }
bool set(uint32_t duration_, srsran::move_callback<void(uint32_t)> callback_) void set(uint32_t duration_, srsran::move_callback<void(uint32_t)> callback_)
{ {
if (set(duration_)) { srsran_assert(duration_ <= MAX_TIMER_DURATION,
callback = std::move(callback_); "Invalid timer duration=%" PRIu32 ">%" PRIu32,
return true; duration_,
} MAX_TIMER_DURATION);
return false; std::lock_guard<std::mutex> lock(parent.mutex);
set_(duration_);
callback = std::move(callback_);
} }
void run() void run()
@ -116,7 +138,25 @@ class timer_handler
parent.stop_timer_(*this, false); parent.stop_timer_(*this, false);
} }
void deallocate() { parent.dealloc_timer(*this); } void deallocate()
{
std::lock_guard<std::mutex> lock(parent.mutex);
parent.dealloc_timer_(*this);
}
private:
void set_(uint32_t duration_)
{
duration_ = std::max(duration_, 1U); // the next step will be one place ahead of current one
// called in locked context
uint64_t old_state = state.load(std::memory_order_relaxed);
if (decode_is_running(old_state)) {
// if already running, just extends timer lifetime
parent.start_run_(*this, duration_);
} else {
state.store(encode_state(STOPPED_FLAG, duration_, 0), std::memory_order_relaxed);
}
}
}; };
public: public:
@ -151,17 +191,12 @@ public:
handle->set(duration_); handle->set(duration_);
} }
bool is_set() const { return is_valid() and handle->duration != INVALID_TIME_DIFF; } uint32_t id() const { return is_valid() ? handle->id : INVALID_ID; }
bool is_set() const { return is_valid() and handle->is_set_(); }
bool is_running() const { return is_valid() and handle->is_running(); } bool is_running() const { return is_valid() and handle->is_running_(); }
bool is_expired() const { return is_valid() and handle->is_expired_(); }
bool is_expired() const { return is_valid() and handle->is_expired(); } tic_diff_t time_elapsed() const { return is_valid() ? handle->time_elapsed_() : 0; }
tic_diff_t duration() const { return is_valid() ? handle->duration_() : 0; }
tic_diff_t time_elapsed() const { return is_valid() ? handle->time_elapsed() : INVALID_TIME_DIFF; }
uint32_t id() const { return is_valid() ? handle->id : INVALID_ID; }
tic_diff_t duration() const { return is_valid() ? handle->duration : INVALID_TIME_DIFF; }
void run() void run()
{ {
@ -205,13 +240,13 @@ public:
void step_all() void step_all()
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
cur_time++; uint32_t cur_time_local = cur_time.load(std::memory_order_relaxed) + 1;
auto& wheel_list = time_wheel[cur_time & WHEEL_MASK]; auto& wheel_list = time_wheel[cur_time_local & WHEEL_MASK];
for (auto it = wheel_list.begin(); it != wheel_list.end();) { for (auto it = wheel_list.begin(); it != wheel_list.end();) {
timer_impl& timer = timer_list[it->id]; timer_impl& timer = timer_list[it->id];
++it; ++it;
if (timer.timeout == cur_time) { if (decode_timeout(timer.state.load(std::memory_order_relaxed)) == cur_time_local) {
// stop timer (callback has to see the timer has already expired) // stop timer (callback has to see the timer has already expired)
stop_timer_(timer, true); stop_timer_(timer, true);
@ -227,6 +262,8 @@ public:
} }
} }
} }
cur_time.fetch_add(1, std::memory_order_relaxed);
} }
void stop_all() void stop_all()
@ -252,6 +289,8 @@ public:
return nof_timers_running_; return nof_timers_running_;
} }
constexpr static uint32_t max_timer_duration() { return MAX_TIMER_DURATION; }
template <typename F> template <typename F>
void defer_callback(uint32_t duration, const F& func) void defer_callback(uint32_t duration, const F& func)
{ {
@ -275,7 +314,7 @@ private:
timer_impl* t; timer_impl* t;
if (not free_list.empty()) { if (not free_list.empty()) {
t = &free_list.front(); t = &free_list.front();
srsran_assert(t->is_empty(), "Invalid timer id=%d state", t->id); srsran_assert(not t->allocated, "Invalid timer id=%d state", t->id);
free_list.pop_front(); free_list.pop_front();
nof_free_timers--; nof_free_timers--;
} else { } else {
@ -283,63 +322,71 @@ private:
timer_list.emplace_back(*this, timer_list.size()); timer_list.emplace_back(*this, timer_list.size());
t = &timer_list.back(); t = &timer_list.back();
} }
t->state = timer_impl::stopped; t->allocated = true;
return *t; return *t;
} }
void dealloc_timer(timer_impl& timer) void dealloc_timer_(timer_impl& timer)
{ {
std::lock_guard<std::mutex> lock(mutex); if (not timer.allocated) {
if (timer.is_empty()) {
// already deallocated // already deallocated
return; return;
} }
stop_timer_(timer, false); stop_timer_(timer, false);
timer.state = timer_impl::empty; timer.allocated = false;
timer.duration = INVALID_TIME_DIFF; timer.state.store(encode_state(STOPPED_FLAG, 0, 0), std::memory_order_relaxed);
timer.timeout = 0;
timer.callback = srsran::move_callback<void(uint32_t)>(); timer.callback = srsran::move_callback<void(uint32_t)>();
free_list.push_front(&timer); free_list.push_front(&timer);
nof_free_timers++; nof_free_timers++;
// leave id unchanged. // leave id unchanged.
} }
void start_run_(timer_impl& timer) void start_run_(timer_impl& timer, uint32_t duration_ = 0)
{ {
uint32_t timeout = cur_time + timer.duration; uint64_t timer_old_state = timer.state.load(std::memory_order_relaxed);
size_t new_wheel_pos = timeout & WHEEL_MASK; duration_ = duration_ == 0 ? decode_duration(timer_old_state) : duration_;
if (timer.is_running() and (timer.timeout & WHEEL_MASK) == new_wheel_pos) { uint32_t new_timeout = cur_time.load(std::memory_order_relaxed) + duration_;
size_t new_wheel_pos = new_timeout & WHEEL_MASK;
uint32_t old_timeout = decode_timeout(timer_old_state);
bool was_running = decode_is_running(timer_old_state);
if (was_running and (old_timeout & WHEEL_MASK) == new_wheel_pos) {
// If no change in timer wheel position. Just update absolute timeout // If no change in timer wheel position. Just update absolute timeout
timer.timeout = timeout; timer.state.store(encode_state(RUNNING_FLAG, duration_, new_timeout), std::memory_order_relaxed);
return; return;
} }
// Stop timer if it was running, removing it from wheel in the process // Stop timer if it was running, removing it from wheel in the process
stop_timer_(timer, false); if (was_running) {
time_wheel[old_timeout & WHEEL_MASK].pop(&timer);
nof_timers_running_--;
}
// Insert timer in wheel // Insert timer in wheel
time_wheel[new_wheel_pos].push_front(&timer); time_wheel[new_wheel_pos].push_front(&timer);
timer.timeout = timeout; timer.state.store(encode_state(RUNNING_FLAG, duration_, new_timeout), std::memory_order_relaxed);
timer.state = timer_impl::running;
nof_timers_running_++; nof_timers_running_++;
} }
/// called when user manually stops timer (as an alternative to expiry) /// called when user manually stops timer (as an alternative to expiry)
void stop_timer_(timer_impl& timer, bool expiry) void stop_timer_(timer_impl& timer, bool expiry)
{ {
if (not timer.is_running()) { uint64_t timer_old_state = timer.state.load(std::memory_order_relaxed);
if (not decode_is_running(timer_old_state)) {
return; return;
} }
// If already running, need to disconnect it from previous wheel // If already running, need to disconnect it from previous wheel
time_wheel[timer.timeout & WHEEL_MASK].pop(&timer); uint32_t old_timeout = decode_timeout(timer_old_state);
time_wheel[old_timeout & WHEEL_MASK].pop(&timer);
timer.state = expiry ? timer_impl::expired : timer_impl::stopped; uint64_t new_state =
encode_state(expiry ? EXPIRED_FLAG : STOPPED_FLAG, decode_duration(timer_old_state), old_timeout);
timer.state.store(new_state, std::memory_order_relaxed);
nof_timers_running_--; nof_timers_running_--;
} }
tic_t cur_time = 0; std::atomic<tic_t> cur_time{0};
size_t nof_timers_running_ = 0, nof_free_timers = 0; size_t nof_timers_running_ = 0, nof_free_timers = 0;
// using a deque to maintain reference validity on emplace_back. Also, this deque will only grow. // using a deque to maintain reference validity on emplace_back. Also, this deque will only grow.
std::deque<timer_impl> timer_list; std::deque<timer_impl> timer_list;
srsran::intrusive_forward_list<timer_impl> free_list; srsran::intrusive_forward_list<timer_impl> free_list;

@ -19,6 +19,8 @@
using namespace srsran; using namespace srsran;
static_assert(timer_handler::max_timer_duration() == 1073741823, "Invalid max duration");
int timers_test1() int timers_test1()
{ {
timer_handler timers; timer_handler timers;
@ -179,19 +181,23 @@ int timers_test3()
} }
struct timers_test4_ctxt { struct timers_test4_ctxt {
std::vector<timer_handler::unique_timer> timers; std::vector<unique_timer> timers;
srsran::tti_sync_cv tti_sync1; srsran::tti_sync_cv tti_sync1;
srsran::tti_sync_cv tti_sync2; srsran::tti_sync_cv tti_sync2;
const uint32_t duration = 1000; const uint32_t duration = 1000;
}; };
static void timers2_test4_thread(timers_test4_ctxt* ctx) static void timers2_test4_thread(timers_test4_ctxt* ctx)
{ {
std::mt19937 mt19937(4); std::random_device rd;
std::mt19937 mt19937(rd());
std::uniform_real_distribution<float> real_dist(0.0f, 1.0f); std::uniform_real_distribution<float> real_dist(0.0f, 1.0f);
for (uint32_t d = 0; d < ctx->duration; d++) { for (uint32_t d = 0; d < ctx->duration; d++) {
// make random events // make random events
for (uint32_t i = 1; i < ctx->timers.size(); i++) { for (uint32_t i = 1; i < ctx->timers.size(); i++) {
// ensure the getters always return reasonable values
TESTASSERT(ctx->timers[i].time_elapsed() <= ctx->duration);
if (0.1f > real_dist(mt19937)) { if (0.1f > real_dist(mt19937)) {
ctx->timers[i].run(); ctx->timers[i].run();
} }
@ -214,76 +220,82 @@ static void timers2_test4_thread(timers_test4_ctxt* ctx)
int timers_test4() int timers_test4()
{ {
timers_test4_ctxt* ctx = new timers_test4_ctxt;
timer_handler timers; timer_handler timers;
timers_test4_ctxt ctx;
uint32_t nof_timers = 32; uint32_t nof_timers = 32;
std::mt19937 mt19937(4); std::mt19937 mt19937(4);
std::uniform_real_distribution<float> real_dist(0.0f, 1.0f); std::uniform_real_distribution<float> real_dist(0.0f, 1.0f);
// Generate all timers and start them // Generate all timers and start them
for (uint32_t i = 0; i < nof_timers; i++) { for (uint32_t i = 0; i < nof_timers; i++) {
ctx->timers.push_back(timers.get_unique_timer()); ctx.timers.push_back(timers.get_unique_timer());
ctx->timers[i].set(ctx->duration); ctx.timers[i].set(ctx.duration);
ctx->timers[i].run(); ctx.timers[i].run();
} }
/* ========== multithreaded region begin =========== */
// Create side thread // Create side thread
std::thread thread(timers2_test4_thread, ctx); std::thread thread(timers2_test4_thread, &ctx);
for (uint32_t d = 0; d < ctx->duration; d++) { for (uint32_t d = 0; d < ctx.duration; d++) {
// make random events // make random events
for (uint32_t i = 1; i < nof_timers; i++) { for (uint32_t i = 1; i < nof_timers; i++) {
// ensure the getters always return reasonable values
TESTASSERT(ctx.timers[i].time_elapsed() <= ctx.duration);
if (0.1f > real_dist(mt19937)) { if (0.1f > real_dist(mt19937)) {
ctx->timers[i].run(); ctx.timers[i].run(); // restart run
} }
if (0.1f > real_dist(mt19937)) { if (0.1f > real_dist(mt19937)) {
ctx->timers[i].stop(); ctx.timers[i].stop(); // stop run
} }
if (0.1f > real_dist(mt19937)) { if (0.1f > real_dist(mt19937)) {
ctx->timers[i].set(static_cast<uint32_t>(ctx->duration * real_dist(mt19937))); ctx.timers[i].set(static_cast<uint32_t>(ctx.duration * real_dist(mt19937)));
ctx->timers[i].run(); ctx.timers[i].run(); // start run with new duration
} }
} }
// first times, does not have event, it shall keep running // first timer does not get updated, so it shall keep running
TESTASSERT(ctx->timers[0].is_running()); TESTASSERT(ctx.timers[0].is_running());
// Increment time // Increment time
timers.step_all(); timers.step_all();
// wait second thread to finish events // wait second thread to finish events
ctx->tti_sync1.wait(); ctx.tti_sync1.wait();
// assert no timer got wrong values // assert no timer got wrong values
for (uint32_t i = 0; i < nof_timers; i++) { for (uint32_t i = 0; i < nof_timers; i++) {
if (ctx->timers[i].is_running()) { if (ctx.timers[i].is_running()) {
TESTASSERT(ctx->timers[i].time_elapsed() <= ctx->timers[i].duration()); TESTASSERT(ctx.timers[i].time_elapsed() <= ctx.timers[i].duration());
TESTASSERT(ctx.timers[i].duration() <= ctx.duration);
} }
} }
// Start new TTI // Start new TTI
ctx->tti_sync2.increase(); ctx.tti_sync2.increase();
} }
// Finish asynchronous thread // Finish asynchronous thread
thread.join(); thread.join();
/* ========== multithreaded region end =========== */
// First timer should have expired // First timer should have expired
TESTASSERT(ctx->timers[0].is_expired()); TESTASSERT(ctx.timers[0].is_expired());
TESTASSERT(not ctx->timers[0].is_running()); TESTASSERT(not ctx.timers[0].is_running());
// Run for the maximum period // Run for the maximum period
for (uint32_t d = 0; d < ctx->duration; d++) { for (uint32_t d = 0; d < ctx.duration; d++) {
timers.step_all(); timers.step_all();
} }
// No timer should be running // No timer should be running
for (uint32_t i = 0; i < nof_timers; i++) { for (uint32_t i = 0; i < nof_timers; i++) {
TESTASSERT(not ctx->timers[i].is_running()); TESTASSERT(not ctx.timers[i].is_running());
} }
delete ctx;
return SRSRAN_SUCCESS; return SRSRAN_SUCCESS;
} }
@ -355,12 +367,12 @@ int timers_test6()
std::vector<int> vals; std::vector<int> vals;
// Event: Add a timer that gets erased 1 tti after. // Event: Add a timer that gets erased 1 tti after, and before expiring.
{ {
timer_handler::unique_timer t = timers.get_unique_timer(); timer_handler::unique_timer t = timers.get_unique_timer();
t.set(2, [&vals](uint32_t tid) { vals.push_back(1); }); t.set(2, [&vals](uint32_t tid) { vals.push_back(1); });
t.run(); t.run();
TESTASSERT(timers.nof_running_timers() == 1); TESTASSERT(timers.nof_running_timers() == 1 and t.duration() == 2 and t.is_running());
timers.step_all(); timers.step_all();
} }
TESTASSERT(timers.nof_running_timers() == 0); TESTASSERT(timers.nof_running_timers() == 0);
@ -375,7 +387,7 @@ int timers_test6()
timer_handler::unique_timer t = timers.get_unique_timer(); timer_handler::unique_timer t = timers.get_unique_timer();
t.set(2, [&vals](uint32_t tid) { vals.push_back(2); }); t.set(2, [&vals](uint32_t tid) { vals.push_back(2); });
t.run(); t.run();
TESTASSERT(timers.nof_running_timers() == 1); TESTASSERT(timers.nof_running_timers() == 1 and t.is_running());
timers.step_all(); timers.step_all();
TESTASSERT(t.time_elapsed() == 1); TESTASSERT(t.time_elapsed() == 1);
} }

Loading…
Cancel
Save