simplification of timers handler design, relying solely on one time wheel.

master
Francisco 4 years ago committed by Francisco Paisana
parent 60896e30b5
commit e37968410e

@ -13,17 +13,26 @@
#ifndef SRSRAN_INTRUSIVE_LIST_H #ifndef SRSRAN_INTRUSIVE_LIST_H
#define SRSRAN_INTRUSIVE_LIST_H #define SRSRAN_INTRUSIVE_LIST_H
#include <iterator>
#include <type_traits> #include <type_traits>
namespace srsran { namespace srsran {
struct default_intrusive_tag; struct default_intrusive_tag;
/// Base class of T, where T is a node of intrusive_forward_list<T>
template <typename Tag = default_intrusive_tag> template <typename Tag = default_intrusive_tag>
struct intrusive_forward_list_element { struct intrusive_forward_list_element {
intrusive_forward_list_element<Tag>* next_node = nullptr; intrusive_forward_list_element<Tag>* next_node = nullptr;
}; };
/**
* Forward linked list of pointers of type "T" that doesn't rely on allocations.
* It leverages each node's internal pointer (thus intrusive) to store the next node of the list.
* It supports push_front/pop_front, iteration, clear, etc.
* @tparam T node type. It must be a subclass of intrusive_forward_list_element<Tag>
* @tparam Tag useful to differentiate multiple intrusive lists in the same node
*/
template <typename T, typename Tag = default_intrusive_tag> template <typename T, typename Tag = default_intrusive_tag>
class intrusive_forward_list class intrusive_forward_list
{ {
@ -37,7 +46,7 @@ class intrusive_forward_list
public: public:
using iterator_category = std::forward_iterator_tag; using iterator_category = std::forward_iterator_tag;
using value_type = U; using value_type = U;
using difference_type = ptrdiff_t; using difference_type = std::ptrdiff_t;
using pointer = U*; using pointer = U*;
using reference = U&; using reference = U&;
@ -116,6 +125,12 @@ struct intrusive_double_linked_list_element {
intrusive_double_linked_list_element<Tag>* prev_node = nullptr; intrusive_double_linked_list_element<Tag>* prev_node = nullptr;
}; };
/**
* Double Linked List of pointers of type "T" that doesn't rely on allocations.
* Instead, it leverages T's internal pointers to store the next and previous nodes
* @tparam T node type. Must be a subclass of intrusive_double_linked_list_element<Tag>
* @tparam Tag tag of nodes. Useful to differentiate separate intrusive lists inside the same T node
*/
template <typename T, typename Tag = default_intrusive_tag> template <typename T, typename Tag = default_intrusive_tag>
class intrusive_double_linked_list class intrusive_double_linked_list
{ {

@ -20,10 +20,8 @@
#ifndef SRSRAN_TIMERS_H #ifndef SRSRAN_TIMERS_H
#define SRSRAN_TIMERS_H #define SRSRAN_TIMERS_H
#include "srsran/adt/circular_array.h"
#include "srsran/adt/intrusive_list.h" #include "srsran/adt/intrusive_list.h"
#include "srsran/adt/move_callback.h" #include "srsran/adt/move_callback.h"
#include "srsran/phy/utils/debug.h"
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <deque> #include <deque>
@ -38,23 +36,44 @@ public:
virtual void timer_expired(uint32_t timer_id) = 0; virtual void timer_expired(uint32_t timer_id) = 0;
}; };
/**
* Class that manages stack timers. It allows creation of unique_timers, with different ids. Each unique_timer duration,
* and callback can be set via the set(...) method. A timer can be started/stopped via run()/stop() methods.
* Internal Data structures:
* - The timer objects are stored in a deque via push_back() to keep pointer/reference consistency. The timer index
* in the deque matches the timer id field.
* This deque will only grow in size. Erased timers are just tagged in deque as empty, and can be reused for the
* creation of new timers. To avoid unnecessary runtime allocations, the user can set an initial capacity.
* - A free_list std::vector is used as a stack (LIFO) to keep track of the empty timers and speed up timer creation.
* - A large circular vector of size WHEEL_SIZE which works as a time wheel, storing and circularly indexing the
* currently running timers by their respective timeout value.
* For a number of running timers N, and uniform distribution of timeout values, the step_all() complexity
* should be O(N/WHEEL_SIZE). Thus, the performance should improve with a larger WHEEL_SIZE, at the expense of more
* wasted memory.
*/
class timer_handler class timer_handler
{ {
using tic_diff_t = uint32_t; using tic_diff_t = uint32_t;
using tic_t = uint32_t; using tic_t = uint32_t;
constexpr static size_t WHEEL_SIZE = 1024; constexpr static uint32_t INVALID_ID = std::numeric_limits<uint32_t>::max();
constexpr static tic_t invalid_tic = std::numeric_limits<tic_t>::max(); constexpr static tic_diff_t INVALID_TIME_DIFF = std::numeric_limits<tic_diff_t>::max();
constexpr static uint32_t MAX_TIMER_DURATION = std::numeric_limits<tic_diff_t>::max() / 4; constexpr static size_t WHEEL_SHIFT = 16U;
constexpr static size_t WHEEL_SIZE = 1U << WHEEL_SHIFT;
constexpr static size_t WHEEL_MASK = WHEEL_SIZE - 1U;
struct timer_impl : public intrusive_double_linked_list_element<> { struct timer_impl : public intrusive_double_linked_list_element<> {
timer_handler& parent; timer_handler& parent;
const size_t id; const uint32_t id;
tic_diff_t duration = 0; tic_diff_t duration = INVALID_TIME_DIFF;
tic_t timeout = 0; tic_t timeout = 0;
enum state_t : int8_t { empty, stopped, running, expired } state = empty; enum state_t : int8_t { empty, stopped, running, expired } state = empty;
srsran::move_callback<void(uint32_t)> callback; srsran::move_callback<void(uint32_t)> callback;
explicit timer_impl(timer_handler& parent_, size_t id_) : parent(parent_), id(id_) {} explicit timer_impl(timer_handler& parent_, uint32_t id_) : parent(parent_), id(id_) {}
timer_impl(const timer_impl&) = delete;
timer_impl(timer_impl&&) = delete;
timer_impl& operator=(const timer_impl&) = delete;
timer_impl& operator=(timer_impl&&) = delete;
bool is_empty() const { return state == empty; } bool is_empty() const { return state == empty; }
bool is_running() const { return state == running; } bool is_running() const { return state == running; }
@ -64,11 +83,7 @@ class timer_handler
bool set(uint32_t duration_) bool set(uint32_t duration_)
{ {
if (duration_ > MAX_TIMER_DURATION) { duration = std::max(duration_, 1U); // the next step will be one place ahead of current one
ERROR("Error: timer durations above %u are not supported", MAX_TIMER_DURATION);
return false;
}
duration = std::max(duration_, 1u);
if (is_running()) { if (is_running()) {
// if already running, just extends timer lifetime // if already running, just extends timer lifetime
run(); run();
@ -98,10 +113,10 @@ class timer_handler
{ {
std::lock_guard<std::mutex> lock(parent.mutex); std::lock_guard<std::mutex> lock(parent.mutex);
// does not call callback // does not call callback
parent.stop_timer_(*this); parent.stop_timer_(*this, false);
} }
void clear() { parent.dealloc_timer(*this); } void deallocate() { parent.dealloc_timer(*this); }
}; };
public: public:
@ -136,17 +151,17 @@ public:
handle->set(duration_); handle->set(duration_);
} }
bool is_set() const { return is_valid() and handle->duration > 0; } bool is_set() const { return is_valid() and handle->duration != INVALID_TIME_DIFF; }
bool is_running() const { return is_valid() and handle->is_running(); } bool is_running() const { return is_valid() and handle->is_running(); }
bool is_expired() const { return is_valid() and handle->is_expired(); } bool is_expired() const { return is_valid() and handle->is_expired(); }
tic_diff_t time_elapsed() const { return is_valid() ? handle->time_elapsed() : -1; } tic_diff_t time_elapsed() const { return is_valid() ? handle->time_elapsed() : INVALID_TIME_DIFF; }
uint32_t id() const { return is_valid() ? handle->id : -1; } uint32_t id() const { return is_valid() ? handle->id : INVALID_ID; }
tic_diff_t duration() const { return is_valid() ? handle->duration : -1; } tic_diff_t duration() const { return is_valid() ? handle->duration : INVALID_TIME_DIFF; }
void run() void run()
{ {
@ -164,7 +179,7 @@ public:
void release() void release()
{ {
if (is_valid()) { if (is_valid()) {
handle->clear(); handle->deallocate();
handle = nullptr; handle = nullptr;
} }
} }
@ -175,51 +190,39 @@ public:
explicit timer_handler(uint32_t capacity = 64) explicit timer_handler(uint32_t capacity = 64)
{ {
time_wheel.resize(WHEEL_SIZE);
// Pre-reserve timers // Pre-reserve timers
free_list.reserve(capacity);
while (timer_list.size() < capacity) { while (timer_list.size() < capacity) {
timer_list.emplace_back(*this, timer_list.size()); timer_list.emplace_back(*this, timer_list.size());
free_list.push_back(&timer_list.back());
} }
std::reverse(free_list.begin(), free_list.end()); // reverse to keep clear tid allocation for tests
} }
void step_all() void step_all()
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
cur_time++; cur_time++;
if (cur_time == WHEEL_SIZE) { auto& wheel_list = time_wheel[cur_time & WHEEL_MASK];
// Promote timers from 2nd wheel to first if needed
for (size_t i = 0; i < WHEEL_SIZE; ++i) {
for (auto it = second_wheel[i].begin(); it != second_wheel[i].end();) {
auto& timer = timer_list[it->id];
timer.timeout -= WHEEL_SIZE;
++it;
if (timer.timeout < WHEEL_SIZE) {
second_wheel[i].pop(&timer);
first_wheel[i].push_front(&timer);
}
}
}
cur_time = 0;
}
auto& wheel_list = first_wheel[cur_time % WHEEL_SIZE]; for (auto it = wheel_list.begin(); it != wheel_list.end();) {
while (not wheel_list.empty()) { timer_impl& timer = timer_list[it->id];
// Remove timer from wheel ++it;
timer_impl& timer = wheel_list.front(); if (timer.timeout == cur_time) {
wheel_list.pop_front(); // stop timer (callback has to see the timer has already expired)
stop_timer_(timer, true);
// update timer state // Call callback if configured
timer.state = timer_impl::expired; if (not timer.callback.is_empty()) {
nof_timers_running_--; // unlock mutex. It can happen that the callback tries to run a timer too
lock.unlock();
// Call callback timer.callback(timer.id);
if (not timer.callback.is_empty()) {
// unlock mutex, it could be that the callback tries to run a timer too
lock.unlock();
timer.callback(timer.id); // Lock again to keep protecting the wheel
lock.lock();
// Lock again to keep protecting the wheel }
lock.lock();
} }
} }
} }
@ -229,7 +232,7 @@ public:
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
// does not call callback // does not call callback
for (timer_impl& timer : timer_list) { for (timer_impl& timer : timer_list) {
stop_timer_(timer); stop_timer_(timer, false);
} }
} }
@ -238,7 +241,7 @@ public:
uint32_t nof_timers() const uint32_t nof_timers() const
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
return nof_timers_; return timer_list.size() - free_list.size();
} }
uint32_t nof_running_timers() const uint32_t nof_running_timers() const
@ -254,7 +257,7 @@ public:
srsran::move_callback<void(uint32_t)> c = [func, &timer](uint32_t tid) { srsran::move_callback<void(uint32_t)> c = [func, &timer](uint32_t tid) {
func(); func();
// auto-deletes timer // auto-deletes timer
timer.clear(); timer.deallocate();
}; };
timer.set(duration, std::move(c)); timer.set(duration, std::move(c));
timer.run(); timer.run();
@ -264,22 +267,18 @@ private:
timer_impl& alloc_timer() timer_impl& alloc_timer()
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
nof_timers_++; timer_impl* t;
if (nof_timers_ > timer_list.size()) { if (not free_list.empty()) {
t = free_list.back();
srsran_assert(t->is_empty(), "Invalid timer id=%d state", t->id);
free_list.pop_back();
} else {
// Need to increase deque // Need to increase deque
timer_list.emplace_back(*this, timer_list.size()); timer_list.emplace_back(*this, timer_list.size());
timer_impl& ret = timer_list.back(); t = &timer_list.back();
ret.state = timer_impl::stopped;
return ret;
}
for (auto& timer : timer_list) {
if (timer.is_empty()) {
timer.state = timer_impl::stopped;
return timer;
}
} }
srsran_terminate("Failed to allocate timer"); t->state = timer_impl::stopped;
return *t;
} }
void dealloc_timer(timer_impl& timer) void dealloc_timer(timer_impl& timer)
@ -289,62 +288,55 @@ private:
// already deallocated // already deallocated
return; return;
} }
stop_timer_(timer); stop_timer_(timer, false);
timer.state = timer_impl::empty; timer.state = timer_impl::empty;
timer.duration = 0; timer.duration = INVALID_TIME_DIFF;
timer.timeout = 0; timer.timeout = 0;
timer.callback = srsran::move_callback<void(uint32_t)>(); timer.callback = srsran::move_callback<void(uint32_t)>();
nof_timers_--; free_list.push_back(&timer);
// leave id unchanged. // leave id unchanged.
} }
void start_run_(timer_impl& timer) void start_run_(timer_impl& timer)
{ {
uint32_t timeout = cur_time + timer.duration; uint32_t timeout = cur_time + timer.duration;
if (timer.is_running() and timer.timeout == timeout) { size_t new_wheel_pos = timeout & WHEEL_MASK;
// If no change in timeout, no need to change wheel position if (timer.is_running() and (timer.timeout & WHEEL_MASK) == new_wheel_pos) {
// If no change in timer wheel position
return; return;
} }
// Stop timer if it was running, removing it from wheel in the process // Stop timer if it was running, removing it from wheel in the process
stop_timer_(timer); stop_timer_(timer, false);
// Insert timer in wheel // Insert timer in wheel
if (timeout < WHEEL_SIZE) { time_wheel[new_wheel_pos].push_front(&timer);
first_wheel[timeout].push_front(&timer);
} else {
second_wheel[timeout % WHEEL_SIZE].push_front(&timer);
}
timer.timeout = timeout; timer.timeout = timeout;
timer.state = timer_impl::running; timer.state = timer_impl::running;
nof_timers_running_++; nof_timers_running_++;
} }
/// called when user manually stops timer (as an alternative to expiry) /// called when user manually stops timer (as an alternative to expiry)
void stop_timer_(timer_impl& timer) void stop_timer_(timer_impl& timer, bool expiry)
{ {
if (not timer.is_running()) { if (not timer.is_running()) {
return; return;
} }
// If already running, need to disconnect it from previous wheel // If already running, need to disconnect it from previous wheel
if (timer.timeout < WHEEL_SIZE) { time_wheel[timer.timeout & WHEEL_MASK].pop(&timer);
first_wheel[timer.timeout].pop(&timer);
} else {
second_wheel[timer.timeout % WHEEL_SIZE].pop(&timer);
}
timer.state = timer_impl::stopped; timer.state = expiry ? timer_impl::expired : timer_impl::stopped;
nof_timers_running_--; nof_timers_running_--;
} }
uint32_t cur_time = 0; tic_t cur_time = 0;
size_t nof_timers_ = 0; size_t nof_timers_running_ = 0;
size_t nof_timers_running_ = 0; // using a deque to maintain reference validity on emplace_back. Also, this deque will only grow.
std::deque<timer_impl> timer_list; std::deque<timer_impl> timer_list;
srsran::circular_array<srsran::intrusive_double_linked_list<timer_impl>, WHEEL_SIZE> first_wheel; std::vector<timer_impl*> free_list;
srsran::circular_array<srsran::intrusive_double_linked_list<timer_impl>, WHEEL_SIZE> second_wheel; std::vector<srsran::intrusive_double_linked_list<timer_impl> > time_wheel;
mutable std::mutex mutex; // Protect priority queue mutable std::mutex mutex; // Protect priority queue
}; };
using unique_timer = timer_handler::unique_timer; using unique_timer = timer_handler::unique_timer;

@ -12,6 +12,7 @@
#include "srsran/mac/pdu_queue.h" #include "srsran/mac/pdu_queue.h"
#include "srsran/common/log_helper.h" #include "srsran/common/log_helper.h"
#include "srsran/phy/utils/debug.h"
namespace srsran { namespace srsran {

@ -312,10 +312,7 @@ int timers_test5()
std::string string = "test string"; std::string string = "test string";
timers.defer_callback(2, [&vals, string]() { timers.defer_callback(2, [&vals, string]() {
vals.push_back(2); vals.push_back(2);
if (string != "test string") { srsran_assert(string == "test string", "string was not captured correctly");
ERROR("string was not captured correctly");
exit(-1);
}
}); });
} }
timers.defer_callback(6, [&vals]() { vals.push_back(3); }); timers.defer_callback(6, [&vals]() { vals.push_back(3); });

Loading…
Cancel
Save