implementation of time wheel-based timer handler, using a circular array and intrusive list

master
Francisco 4 years ago committed by Francisco Paisana
parent 77b11b82ac
commit 60896e30b5

@ -0,0 +1,231 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_INTRUSIVE_LIST_H
#define SRSRAN_INTRUSIVE_LIST_H
#include <type_traits>
namespace srsran {
struct default_intrusive_tag;
template <typename Tag = default_intrusive_tag>
struct intrusive_forward_list_element {
intrusive_forward_list_element<Tag>* next_node = nullptr;
};
template <typename T, typename Tag = default_intrusive_tag>
class intrusive_forward_list
{
using node_t = intrusive_forward_list_element<Tag>;
template <typename U>
class iterator_impl
{
using elem_t = typename std::conditional<std::is_const<U>::value, const node_t, node_t>::type;
public:
using iterator_category = std::forward_iterator_tag;
using value_type = U;
using difference_type = ptrdiff_t;
using pointer = U*;
using reference = U&;
explicit iterator_impl(elem_t* node_ = nullptr) : node(node_) {}
iterator_impl<U>& operator++()
{
node = node->next_node;
return *this;
}
pointer operator->() { return static_cast<pointer>(node); }
reference operator*() { return static_cast<reference>(*node); }
bool operator==(const iterator_impl<U>& other) const { return node == other.node; }
bool operator!=(const iterator_impl<U>& other) const { return node != other.node; }
private:
elem_t* node;
};
public:
using iterator = iterator_impl<T>;
using const_iterator = iterator_impl<const T>;
intrusive_forward_list()
{
static_assert(std::is_base_of<node_t, T>::value,
"Provided template argument T must have intrusive_forward_list_element<Tag> as base class");
}
intrusive_forward_list(const intrusive_forward_list&) = default;
intrusive_forward_list(intrusive_forward_list&& other) noexcept : node(other.node) { other.node = nullptr; }
intrusive_forward_list& operator=(const intrusive_forward_list&) = default;
intrusive_forward_list& operator =(intrusive_forward_list&& other) noexcept
{
node = other.node;
other.node = nullptr;
return *this;
}
T& front() const { return *static_cast<T*>(node); }
void push_front(T* t)
{
node_t* new_head = static_cast<node_t*>(t);
new_head->next_node = node;
node = new_head;
}
T* pop_front()
{
node_t* ret = node;
node = node->next_node;
return static_cast<T*>(ret);
}
void clear()
{
while (node != nullptr) {
node_t* torem = node;
node = node->next_node;
torem->next_node = nullptr;
}
}
bool empty() const { return node == nullptr; }
iterator begin() { return iterator(node); }
iterator end() { return iterator(nullptr); }
const_iterator begin() const { return const_iterator(node); }
const_iterator end() const { return const_iterator(nullptr); }
private:
node_t* node = nullptr;
};
template <typename Tag = default_intrusive_tag>
struct intrusive_double_linked_list_element {
intrusive_double_linked_list_element<Tag>* next_node = nullptr;
intrusive_double_linked_list_element<Tag>* prev_node = nullptr;
};
template <typename T, typename Tag = default_intrusive_tag>
class intrusive_double_linked_list
{
using node_t = intrusive_double_linked_list_element<Tag>;
template <typename U>
class iterator_impl
{
using elem_t = typename std::conditional<std::is_const<U>::value, const node_t, node_t>::type;
public:
using iterator_category = std::bidirectional_iterator_tag;
using value_type = U;
using difference_type = ptrdiff_t;
using pointer = U*;
using reference = U&;
explicit iterator_impl(elem_t* node_ = nullptr) : node(node_) {}
iterator_impl<U>& operator++()
{
node = node->next_node;
return *this;
}
iterator_impl<U>& operator--()
{
node = node->prev_node;
return *this;
}
pointer operator->() { return static_cast<pointer>(node); }
reference operator*() { return static_cast<reference>(*node); }
bool operator==(const iterator_impl<U>& other) const { return node == other.node; }
bool operator!=(const iterator_impl<U>& other) const { return node != other.node; }
private:
elem_t* node;
};
public:
using iterator = iterator_impl<T>;
using const_iterator = iterator_impl<const T>;
intrusive_double_linked_list()
{
static_assert(std::is_base_of<node_t, T>::value,
"Provided template argument T must have intrusive_forward_list_element<Tag> as base class");
}
intrusive_double_linked_list(const intrusive_double_linked_list&) = default;
intrusive_double_linked_list(intrusive_double_linked_list&& other) noexcept : node(other.node)
{
other.node = nullptr;
}
intrusive_double_linked_list& operator=(const intrusive_double_linked_list&) = default;
intrusive_double_linked_list& operator=(intrusive_double_linked_list&& other) noexcept
{
node = other.node;
other.node = nullptr;
return *this;
}
~intrusive_double_linked_list() { clear(); }
T& front() const { return *static_cast<T*>(node); }
void push_front(T* t)
{
node_t* new_head = static_cast<node_t*>(t);
new_head->prev_node = nullptr;
new_head->next_node = node;
if (node != nullptr) {
node->prev_node = new_head;
}
node = new_head;
}
void pop(T* t)
{
node_t* to_rem = static_cast<node_t*>(t);
if (to_rem == node) {
node = to_rem->next_node;
}
if (to_rem->prev_node != nullptr) {
to_rem->prev_node->next_node = to_rem->next_node;
}
if (to_rem->next_node != nullptr) {
to_rem->next_node->prev_node = to_rem->prev_node;
}
to_rem->next_node = nullptr;
to_rem->prev_node = nullptr;
}
void pop_front() { pop(static_cast<T*>(node)); }
void clear()
{
while (node != nullptr) {
node_t* torem = node;
node = node->next_node;
torem->next_node = nullptr;
torem->prev_node = nullptr;
}
}
bool empty() const { return node == nullptr; }
iterator begin() { return iterator(node); }
iterator end() { return iterator(nullptr); }
const_iterator begin() const { return const_iterator(node); }
const_iterator end() const { return const_iterator(nullptr); }
private:
node_t* node = nullptr;
};
} // namespace srsran
#endif // SRSRAN_INTRUSIVE_LIST_H

@ -20,17 +20,15 @@
#ifndef SRSRAN_TIMERS_H
#define SRSRAN_TIMERS_H
#include "srsran/adt/circular_array.h"
#include "srsran/adt/intrusive_list.h"
#include "srsran/adt/move_callback.h"
#include "srsran/phy/utils/debug.h"
#include <algorithm>
#include <functional>
#include <cstdint>
#include <deque>
#include <limits>
#include <mutex>
#include <queue>
#include <stdint.h>
#include <stdio.h>
#include <time.h>
#include <vector>
namespace srsran {
@ -42,25 +40,27 @@ public:
class timer_handler
{
constexpr static uint32_t MAX_TIMER_DURATION = std::numeric_limits<uint32_t>::max() / 4;
constexpr static uint32_t MAX_TIMER_VALUE = std::numeric_limits<uint32_t>::max() / 2;
struct timer_impl {
timer_handler* parent;
uint32_t duration = 0, timeout = 0;
bool running = false;
bool active = false;
using tic_diff_t = uint32_t;
using tic_t = uint32_t;
constexpr static size_t WHEEL_SIZE = 1024;
constexpr static tic_t invalid_tic = std::numeric_limits<tic_t>::max();
constexpr static uint32_t MAX_TIMER_DURATION = std::numeric_limits<tic_diff_t>::max() / 4;
struct timer_impl : public intrusive_double_linked_list_element<> {
timer_handler& parent;
const size_t id;
tic_diff_t duration = 0;
tic_t timeout = 0;
enum state_t : int8_t { empty, stopped, running, expired } state = empty;
srsran::move_callback<void(uint32_t)> callback;
explicit timer_impl(timer_handler* parent_) : parent(parent_) {}
explicit timer_impl(timer_handler& parent_, size_t id_) : parent(parent_), id(id_) {}
uint32_t id() const { return std::distance((const timer_handler::timer_impl*)&parent->timer_list[0], this); }
bool is_running() const { return active and running and timeout > 0; }
bool is_expired() const { return active and not running and timeout > 0 and timeout <= parent->cur_time; }
uint32_t time_elapsed() const { return std::min(duration, parent->cur_time - (timeout - duration)); }
bool is_empty() const { return state == empty; }
bool is_running() const { return state == running; }
bool is_expired() const { return state == expired; }
tic_diff_t time_left() const { return is_running() ? timeout - parent.cur_time : (is_expired() ? 0 : duration); }
uint32_t time_elapsed() const { return duration - time_left(); }
bool set(uint32_t duration_)
{
@ -68,14 +68,13 @@ class timer_handler
ERROR("Error: timer durations above %u are not supported", MAX_TIMER_DURATION);
return false;
}
if (not active) {
ERROR("Error: setting inactive timer id=%d", id());
return false;
}
duration = duration_;
duration = std::max(duration_, 1u);
if (is_running()) {
// if already running, just extends timer lifetime
run();
} else {
state = stopped;
timeout = 0;
}
return true;
}
@ -91,156 +90,135 @@ class timer_handler
void run()
{
std::unique_lock<std::mutex> lock(parent->mutex);
if (not active) {
ERROR("Error: calling run() for inactive timer id=%d", id());
return;
}
timeout = parent->cur_time + duration;
parent->running_timers.emplace(id(), timeout);
running = true;
std::lock_guard<std::mutex> lock(parent.mutex);
parent.start_run_(*this);
}
void stop()
{
running = false; // invalidates trigger
if (not is_expired()) {
timeout = 0; // if it has already expired, then do not alter is_expired() state
}
std::lock_guard<std::mutex> lock(parent.mutex);
// does not call callback
parent.stop_timer_(*this);
}
void clear()
{
stop();
duration = 0;
active = false;
callback = srsran::move_callback<void(uint32_t)>();
// leave run_id unchanged. Since the timeout was changed, we shall not get spurious triggering
}
void trigger()
{
if (is_running()) {
running = false;
if (not callback.is_empty()) {
callback(id());
}
}
}
void clear() { parent.dealloc_timer(*this); }
};
public:
class unique_timer
{
public:
unique_timer() : timer_id(std::numeric_limits<decltype(timer_id)>::max()) {}
explicit unique_timer(timer_handler* parent_, uint32_t timer_id_) : parent(parent_), timer_id(timer_id_) {}
unique_timer() = default;
explicit unique_timer(timer_impl* handle_) : handle(handle_) {}
unique_timer(const unique_timer&) = delete;
unique_timer(unique_timer&& other) noexcept : parent(other.parent), timer_id(other.timer_id)
{
other.parent = nullptr;
}
~unique_timer()
{
if (parent != nullptr) {
// does not call callback
impl()->clear();
}
}
unique_timer(unique_timer&& other) noexcept : handle(other.handle) { other.handle = nullptr; }
~unique_timer() { release(); }
unique_timer& operator=(const unique_timer&) = delete;
unique_timer& operator=(unique_timer&& other) noexcept
unique_timer& operator =(unique_timer&& other) noexcept
{
if (this != &other) {
timer_id = other.timer_id;
parent = other.parent;
other.parent = nullptr;
handle = other.handle;
other.handle = nullptr;
}
return *this;
}
bool is_valid() const { return parent != nullptr; }
bool is_valid() const { return handle != nullptr; }
void set(uint32_t duration_, move_callback<void(uint32_t)> callback_)
{
impl()->set(duration_, std::move(callback_));
srsran_assert(is_valid(), "Trying to setup empty timer handle");
handle->set(duration_, std::move(callback_));
}
void set(uint32_t duration_)
{
srsran_assert(is_valid(), "Trying to setup empty timer handle");
handle->set(duration_);
}
void set(uint32_t duration_) { impl()->set(duration_); }
bool is_set() const { return (impl()->duration != 0); }
bool is_set() const { return is_valid() and handle->duration > 0; }
bool is_running() const { return impl()->is_running(); }
bool is_running() const { return is_valid() and handle->is_running(); }
bool is_expired() const { return impl()->is_expired(); }
bool is_expired() const { return is_valid() and handle->is_expired(); }
uint32_t time_elapsed() const { return impl()->time_elapsed(); }
tic_diff_t time_elapsed() const { return is_valid() ? handle->time_elapsed() : -1; }
void run() { impl()->run(); }
uint32_t id() const { return is_valid() ? handle->id : -1; }
void stop() { impl()->stop(); }
tic_diff_t duration() const { return is_valid() ? handle->duration : -1; }
void release()
void run()
{
impl()->clear();
parent = nullptr;
srsran_assert(is_valid(), "Starting invalid timer");
handle->run();
}
uint32_t id() const { return timer_id; }
void stop()
{
if (is_valid()) {
handle->stop();
}
}
uint32_t duration() const { return impl()->duration; }
void release()
{
if (is_valid()) {
handle->clear();
handle = nullptr;
}
}
private:
timer_impl* impl() { return &parent->timer_list[timer_id]; }
const timer_impl* impl() const { return &parent->timer_list[timer_id]; }
timer_handler* parent = nullptr;
uint32_t timer_id;
timer_impl* handle = nullptr;
};
explicit timer_handler(uint32_t capacity = 64)
{
timer_list.reserve(capacity);
// reserve a priority queue using a vector
std::vector<timer_run> v;
v.reserve(capacity);
std::priority_queue<timer_run> q(std::less<timer_run>(), std::move(v));
running_timers = std::move(q);
// Pre-reserve timers
while (timer_list.size() < capacity) {
timer_list.emplace_back(*this, timer_list.size());
}
}
void step_all()
{
std::unique_lock<std::mutex> lock(mutex);
cur_time++;
while (not running_timers.empty()) {
uint32_t next_timeout = running_timers.top().timeout;
timer_impl* ptr = &timer_list[running_timers.top().timer_id];
if (not ptr->is_running() or next_timeout != ptr->timeout) {
// remove timers that were explicitly stopped, or re-run, to avoid unnecessary priority_queue growth
running_timers.pop();
continue;
}
if (cur_time < next_timeout) {
break;
if (cur_time == WHEEL_SIZE) {
// Promote timers from 2nd wheel to first if needed
for (size_t i = 0; i < WHEEL_SIZE; ++i) {
for (auto it = second_wheel[i].begin(); it != second_wheel[i].end();) {
auto& timer = timer_list[it->id];
timer.timeout -= WHEEL_SIZE;
++it;
if (timer.timeout < WHEEL_SIZE) {
second_wheel[i].pop(&timer);
first_wheel[i].push_front(&timer);
}
}
}
// if the timer_run and timer_impl timeouts do not match, it means that timer_impl::timeout was overwritten.
// in such case, do not trigger
uint32_t timeout = running_timers.top().timeout;
running_timers.pop();
cur_time = 0;
}
auto& wheel_list = first_wheel[cur_time % WHEEL_SIZE];
while (not wheel_list.empty()) {
// Remove timer from wheel
timer_impl& timer = wheel_list.front();
wheel_list.pop_front();
if (ptr->timeout == timeout) {
// update timer state
timer.state = timer_impl::expired;
nof_timers_running_--;
// Call callback
if (not timer.callback.is_empty()) {
// unlock mutex, it could be that the callback tries to run a timer too
lock.unlock();
// Call callback
ptr->trigger();
timer.callback(timer.id);
// Lock again to keep protecting the queue
// Lock again to keep protecting the wheel
lock.lock();
}
}
@ -248,78 +226,125 @@ public:
void stop_all()
{
std::lock_guard<std::mutex> lock(mutex);
// does not call callback
while (not running_timers.empty()) {
running_timers.pop();
}
for (auto& i : timer_list) {
i.running = false;
for (timer_impl& timer : timer_list) {
stop_timer_(timer);
}
}
unique_timer get_unique_timer() { return unique_timer(this, alloc_timer()); }
uint32_t get_cur_time() const { return cur_time; }
unique_timer get_unique_timer() { return unique_timer(&alloc_timer()); }
uint32_t nof_timers() const
{
return std::count_if(timer_list.begin(), timer_list.end(), [](const timer_impl& t) { return t.active; });
std::lock_guard<std::mutex> lock(mutex);
return nof_timers_;
}
uint32_t nof_running_timers() const
{
return std::count_if(timer_list.begin(), timer_list.end(), [](const timer_impl& t) { return t.is_running(); });
std::lock_guard<std::mutex> lock(mutex);
return nof_timers_running_;
}
template <typename F>
void defer_callback(uint32_t duration, const F& func)
{
uint32_t id = alloc_timer();
srsran::move_callback<void(uint32_t)> c = [func, this, id](uint32_t tid) {
timer_impl& timer = alloc_timer();
srsran::move_callback<void(uint32_t)> c = [func, &timer](uint32_t tid) {
func();
// auto-deletes timer
timer_list[id].clear();
timer.clear();
};
timer_list[id].set(duration, std::move(c));
timer_list[id].run();
timer.set(duration, std::move(c));
timer.run();
}
private:
struct timer_run {
uint32_t timer_id;
uint32_t timeout;
timer_run(uint32_t timer_id_, uint32_t timeout_) : timer_id(timer_id_), timeout(timeout_) {}
timer_impl& alloc_timer()
{
std::lock_guard<std::mutex> lock(mutex);
nof_timers_++;
if (nof_timers_ > timer_list.size()) {
// Need to increase deque
timer_list.emplace_back(*this, timer_list.size());
timer_impl& ret = timer_list.back();
ret.state = timer_impl::stopped;
return ret;
}
bool operator<(const timer_run& other) const
{
// returns true, if other.timeout is lower than timeout, accounting for wrap around
if (timeout > other.timeout) {
return (timeout - other.timeout) < MAX_TIMER_VALUE / 2;
for (auto& timer : timer_list) {
if (timer.is_empty()) {
timer.state = timer_impl::stopped;
return timer;
}
return (other.timeout - timeout) > MAX_TIMER_VALUE / 2;
}
};
srsran_terminate("Failed to allocate timer");
}
uint32_t alloc_timer()
void dealloc_timer(timer_impl& timer)
{
uint32_t i = 0;
for (; i < timer_list.size(); ++i) {
if (not timer_list[i].active) {
break;
}
std::lock_guard<std::mutex> lock(mutex);
if (timer.is_empty()) {
// already deallocated
return;
}
if (i == timer_list.size()) {
timer_list.emplace_back(this);
stop_timer_(timer);
timer.state = timer_impl::empty;
timer.duration = 0;
timer.timeout = 0;
timer.callback = srsran::move_callback<void(uint32_t)>();
nof_timers_--;
// leave id unchanged.
}
void start_run_(timer_impl& timer)
{
uint32_t timeout = cur_time + timer.duration;
if (timer.is_running() and timer.timeout == timeout) {
// If no change in timeout, no need to change wheel position
return;
}
timer_list[i].active = true;
return i;
// Stop timer if it was running, removing it from wheel in the process
stop_timer_(timer);
// Insert timer in wheel
if (timeout < WHEEL_SIZE) {
first_wheel[timeout].push_front(&timer);
} else {
second_wheel[timeout % WHEEL_SIZE].push_front(&timer);
}
timer.timeout = timeout;
timer.state = timer_impl::running;
nof_timers_running_++;
}
/// called when user manually stops timer (as an alternative to expiry)
void stop_timer_(timer_impl& timer)
{
if (not timer.is_running()) {
return;
}
// If already running, need to disconnect it from previous wheel
if (timer.timeout < WHEEL_SIZE) {
first_wheel[timer.timeout].pop(&timer);
} else {
second_wheel[timer.timeout % WHEEL_SIZE].pop(&timer);
}
timer.state = timer_impl::stopped;
nof_timers_running_--;
}
std::vector<timer_impl> timer_list;
std::priority_queue<timer_run> running_timers;
uint32_t cur_time = 0;
std::mutex mutex; // Protect priority queue
uint32_t cur_time = 0;
size_t nof_timers_ = 0;
size_t nof_timers_running_ = 0;
std::deque<timer_impl> timer_list;
srsran::circular_array<srsran::intrusive_double_linked_list<timer_impl>, WHEEL_SIZE> first_wheel;
srsran::circular_array<srsran::intrusive_double_linked_list<timer_impl>, WHEEL_SIZE> second_wheel;
mutable std::mutex mutex; // Protect priority queue
};
using unique_timer = timer_handler::unique_timer;

@ -26,15 +26,13 @@ class stack_test_dummy : public stack_interface_rrc
public:
stack_test_dummy() {}
srsran::tti_point get_current_tti() override
{
return srsran::tti_point{task_sched.get_timer_handler()->get_cur_time() % 10240};
}
srsran::tti_point get_current_tti() override { return srsran::tti_point{tti % 10240}; }
// Testing utility functions
void run_tti()
{
// update clock and run internal tasks
tti++;
task_sched.tic();
task_sched.run_pending_tasks();
@ -43,6 +41,7 @@ public:
// run pending tasks without updating timers
void run_pending_tasks() { task_sched.run_pending_tasks(); }
uint32_t tti = 0;
srsran::task_scheduler task_sched{512, 100};
};

@ -10,20 +10,13 @@
*
*/
#include "srsran/common/test_common.h"
#include "srsran/common/timers.h"
#include <iostream>
#include <random>
#include <srsran/common/tti_sync_cv.h>
#include <thread>
#define TESTASSERT(cond) \
do { \
if (!(cond)) { \
std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \
return -1; \
} \
} while (0)
using namespace srsran;
int timers_test1()
@ -42,8 +35,7 @@ int timers_test1()
// TEST: Run multiple times with the same duration
bool callback_called = false;
t.set(dur, [&callback_called](int) { callback_called = true; });
TESTASSERT(timers.get_cur_time() == 0);
t.set(dur, [&callback_called](int tid) { callback_called = true; });
for (uint32_t runs = 0; runs < 3; ++runs) {
callback_called = false;
TESTASSERT(not t.is_running());
@ -57,7 +49,6 @@ int timers_test1()
TESTASSERT(not t.is_running() and t.is_expired());
TESTASSERT(callback_called);
}
TESTASSERT(timers.get_cur_time() == 3 * dur);
// TEST: interrupt a timer. check if callback was called
callback_called = false;

Loading…
Cancel
Save