simplification of multiqueue design.

- the consumer does multi-staged waiting:
  1. spins first across all queues in a RR fashion
  2. each queue access is done with a try_lock.
  3. if the try_lock fails, it increases the number of spins needed
  2. if no queue had data, the consumer sleeps for 100 usec.
- no differentiation between queues, in terms of notification features
master
Francisco 4 years ago committed by Ismael Gomez
parent 60a8ee0af9
commit f2a56c9139

@ -47,9 +47,7 @@ class multiqueue_handler
class input_port_impl class input_port_impl
{ {
public: public:
input_port_impl(uint32_t cap, bool notify_flag_, multiqueue_handler<myobj>* parent_) : input_port_impl(uint32_t cap, multiqueue_handler<myobj>* parent_) : buffer(cap), parent(parent_) {}
buffer(cap), notify_flag(notify_flag_), consumer_notify_needed(notify_flag_), parent(parent_)
{}
input_port_impl(const input_port_impl&) = delete; input_port_impl(const input_port_impl&) = delete;
input_port_impl(input_port_impl&&) = delete; input_port_impl(input_port_impl&&) = delete;
input_port_impl& operator=(const input_port_impl&) = delete; input_port_impl& operator=(const input_port_impl&) = delete;
@ -57,7 +55,6 @@ class multiqueue_handler
~input_port_impl() { deactivate_blocking(); } ~input_port_impl() { deactivate_blocking(); }
size_t capacity() const { return buffer.max_size(); } size_t capacity() const { return buffer.max_size(); }
bool get_notify_mode() const { return notify_flag; }
size_t size() const size_t size() const
{ {
std::lock_guard<std::mutex> lock(q_mutex); std::lock_guard<std::mutex> lock(q_mutex);
@ -76,7 +73,6 @@ class multiqueue_handler
return; return;
} }
active_ = val; active_ = val;
consumer_notify_needed = notify_flag;
if (not active_) { if (not active_) {
buffer.clear(); buffer.clear();
@ -150,26 +146,16 @@ class multiqueue_handler
} }
} }
buffer.push(std::forward<T>(*o)); buffer.push(std::forward<T>(*o));
if (consumer_notify_needed) {
// Note: The consumer thread only needs to be notified and awaken when queues transition from empty to non-empty
// To ensure that the consumer noticed that the queue was empty before a push, we store the last
// try_pop() return in a member variable.
// Doing this reduces the contention of multiple producers for the same condition variable
lock.unlock();
parent->signal_pushed_data();
}
return true; return true;
} }
bool pop_(std::unique_lock<std::mutex>& lock, myobj& obj) bool pop_(std::unique_lock<std::mutex>& lock, myobj& obj)
{ {
if (buffer.empty()) { if (buffer.empty()) {
consumer_notify_needed = notify_flag;
return false; return false;
} }
obj = std::move(buffer.top()); obj = std::move(buffer.top());
buffer.pop(); buffer.pop();
consumer_notify_needed = false;
if (nof_waiting > 0) { if (nof_waiting > 0) {
lock.unlock(); lock.unlock();
cv_full.notify_one(); cv_full.notify_one();
@ -183,8 +169,6 @@ class multiqueue_handler
srsran::dyn_circular_buffer<myobj> buffer; srsran::dyn_circular_buffer<myobj> buffer;
std::condition_variable cv_full, cv_exit; std::condition_variable cv_full, cv_exit;
bool active_ = true; bool active_ = true;
bool consumer_notify_needed = false;
bool notify_flag = false;
int nof_waiting = 0; int nof_waiting = 0;
}; };
@ -242,7 +226,6 @@ public:
q.set_active(false); q.set_active(false);
} }
while (consumer_state) { while (consumer_state) {
cv_empty.notify_one();
cv_exit.wait(lock); cv_exit.wait(lock);
} }
for (auto& q : queues) { for (auto& q : queues) {
@ -256,22 +239,21 @@ public:
* @param capacity_ The capacity of the queue. * @param capacity_ The capacity of the queue.
* @return The index of the newly created (or reused) queue within the vector of queues. * @return The index of the newly created (or reused) queue within the vector of queues.
*/ */
queue_handle add_queue(uint32_t capacity_, bool notify_flag = false) queue_handle add_queue(uint32_t capacity_)
{ {
uint32_t qidx = 0; uint32_t qidx = 0;
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
if (not running) { if (not running) {
return queue_handle(); return queue_handle();
} }
while (qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_) or while (qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_))) {
(queues[qidx].get_notify_mode() == notify_flag))) {
++qidx; ++qidx;
} }
// check if there is a free queue of the required size // check if there is a free queue of the required size
if (qidx == queues.size()) { if (qidx == queues.size()) {
// create new queue // create new queue
queues.emplace_back(capacity_, notify_flag, this); queues.emplace_back(capacity_, this);
qidx = queues.size() - 1; // update qidx to the last element qidx = queues.size() - 1; // update qidx to the last element
} else { } else {
queues[qidx].set_active(true); queues[qidx].set_active(true);
@ -283,7 +265,7 @@ public:
* Add queue using the default capacity of the underlying multiqueue * Add queue using the default capacity of the underlying multiqueue
* @return The queue index * @return The queue index
*/ */
queue_handle add_queue(bool notify_flag) { return add_queue(default_capacity, notify_flag); } queue_handle add_queue() { return add_queue(default_capacity); }
uint32_t nof_queues() const uint32_t nof_queues() const
{ {
@ -304,7 +286,9 @@ public:
consumer_state = false; consumer_state = false;
return true; return true;
} }
cv_empty.wait_for(lock, std::chrono::microseconds(100)); lock.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(100));
lock.lock();
} }
consumer_state = false; consumer_state = false;
lock.unlock(); lock.unlock();
@ -340,11 +324,9 @@ private:
} }
return false; return false;
} }
/// Called by the producer threads to signal the consumer to unlock in wait_pop
void signal_pushed_data() { cv_empty.notify_one(); }
mutable std::mutex mutex; mutable std::mutex mutex;
std::condition_variable cv_empty, cv_exit; std::condition_variable cv_exit;
uint32_t spin_idx = 0; uint32_t spin_idx = 0;
bool running = true, consumer_state = false; bool running = true, consumer_state = false;
std::deque<input_port_impl> queues; std::deque<input_port_impl> queues;

@ -26,7 +26,7 @@ public:
explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) : explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) :
external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc} external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc}
{ {
background_queue = external_tasks.add_queue(false); background_queue = external_tasks.add_queue();
} }
task_scheduler(const task_scheduler&) = delete; task_scheduler(const task_scheduler&) = delete;
task_scheduler(task_scheduler&&) = delete; task_scheduler(task_scheduler&&) = delete;
@ -38,11 +38,8 @@ public:
srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); } srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); }
//! Creates new queue for tasks coming from external thread //! Creates new queue for tasks coming from external thread
srsran::task_queue_handle make_task_queue(bool notify_mode) { return external_tasks.add_queue(notify_mode); } srsran::task_queue_handle make_task_queue() { return external_tasks.add_queue(); }
srsran::task_queue_handle make_task_queue(uint32_t qsize, bool notify_mode) srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.add_queue(qsize); }
{
return external_tasks.add_queue(qsize, notify_mode);
}
//! Delays a task processing by duration_ms //! Delays a task processing by duration_ms
template <typename F> template <typename F>
@ -127,7 +124,7 @@ public:
sched->defer_callback(duration_ms, std::forward<F>(func)); sched->defer_callback(duration_ms, std::forward<F>(func));
} }
void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); } void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); }
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(false); } srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); }
private: private:
task_scheduler* sched; task_scheduler* sched;
@ -144,7 +141,7 @@ public:
{ {
sched->notify_background_task_result(std::move(task)); sched->notify_background_task_result(std::move(task));
} }
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(false); } srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); }
template <typename F> template <typename F>
void defer_callback(uint32_t duration_ms, F&& func) void defer_callback(uint32_t duration_ms, F&& func)
{ {

@ -32,7 +32,7 @@ int test_multiqueue()
TESTASSERT(multiqueue.nof_queues() == 0); TESTASSERT(multiqueue.nof_queues() == 0);
// test push/pop and size for one queue // test push/pop and size for one queue
queue_handle<int> qid1 = multiqueue.add_queue(true); queue_handle<int> qid1 = multiqueue.add_queue();
TESTASSERT(qid1.active()); TESTASSERT(qid1.active());
TESTASSERT(qid1.size() == 0 and qid1.empty()); TESTASSERT(qid1.size() == 0 and qid1.empty());
TESTASSERT(multiqueue.nof_queues() == 1); TESTASSERT(multiqueue.nof_queues() == 1);
@ -45,7 +45,7 @@ int test_multiqueue()
TESTASSERT(number == 2 and qid1.empty()); TESTASSERT(number == 2 and qid1.empty());
// test push/pop and size for two queues // test push/pop and size for two queues
queue_handle<int> qid2 = multiqueue.add_queue(true); queue_handle<int> qid2 = multiqueue.add_queue();
TESTASSERT(qid2.active()); TESTASSERT(qid2.active());
TESTASSERT(multiqueue.nof_queues() == 2 and qid1.active()); TESTASSERT(multiqueue.nof_queues() == 2 and qid1.active());
TESTASSERT(qid2.try_push(3).has_value()); TESTASSERT(qid2.try_push(3).has_value());
@ -55,7 +55,7 @@ int test_multiqueue()
// check if erasing a queue breaks anything // check if erasing a queue breaks anything
qid1.reset(); qid1.reset();
TESTASSERT(multiqueue.nof_queues() == 1 and not qid1.active()); TESTASSERT(multiqueue.nof_queues() == 1 and not qid1.active());
qid1 = multiqueue.add_queue(true); qid1 = multiqueue.add_queue();
TESTASSERT(qid1.empty() and qid1.active()); TESTASSERT(qid1.empty() and qid1.active());
TESTASSERT(qid2.size() == 1 and not qid2.empty()); TESTASSERT(qid2.size() == 1 and not qid2.empty());
multiqueue.wait_pop(&number); multiqueue.wait_pop(&number);
@ -89,15 +89,15 @@ int test_multiqueue()
// check that adding a queue of different capacity works // check that adding a queue of different capacity works
{ {
qid1 = multiqueue.add_queue(true); qid1 = multiqueue.add_queue();
qid2 = multiqueue.add_queue(true); qid2 = multiqueue.add_queue();
// remove first queue again // remove first queue again
qid1.reset(); qid1.reset();
TESTASSERT(multiqueue.nof_queues() == 1); TESTASSERT(multiqueue.nof_queues() == 1);
// add queue with non-default capacity // add queue with non-default capacity
auto qid3 = multiqueue.add_queue(10, true); auto qid3 = multiqueue.add_queue(10);
TESTASSERT(qid3.capacity() == 10); TESTASSERT(qid3.capacity() == 10);
// make sure neither a new queue index is returned // make sure neither a new queue index is returned
@ -117,7 +117,7 @@ int test_multiqueue_threading()
int capacity = 4, number = 0, start_number = 2, nof_pushes = capacity + 1; int capacity = 4, number = 0, start_number = 2, nof_pushes = capacity + 1;
multiqueue_handler<int> multiqueue(capacity); multiqueue_handler<int> multiqueue(capacity);
auto qid1 = multiqueue.add_queue(true); auto qid1 = multiqueue.add_queue();
auto push_blocking_func = [](queue_handle<int>* qid, int start_value, int nof_pushes, bool* is_running) { auto push_blocking_func = [](queue_handle<int>* qid, int start_value, int nof_pushes, bool* is_running) {
for (int i = 0; i < nof_pushes; ++i) { for (int i = 0; i < nof_pushes; ++i) {
qid->push(start_value + i); qid->push(start_value + i);
@ -165,7 +165,7 @@ int test_multiqueue_threading2()
int capacity = 4, start_number = 2, nof_pushes = capacity + 1; int capacity = 4, start_number = 2, nof_pushes = capacity + 1;
multiqueue_handler<int> multiqueue(capacity); multiqueue_handler<int> multiqueue(capacity);
auto qid1 = multiqueue.add_queue(true); auto qid1 = multiqueue.add_queue();
auto push_blocking_func = [](queue_handle<int>* qid, int start_value, int nof_pushes, bool* is_running) { auto push_blocking_func = [](queue_handle<int>* qid, int start_value, int nof_pushes, bool* is_running) {
for (int i = 0; i < nof_pushes; ++i) { for (int i = 0; i < nof_pushes; ++i) {
qid->push(start_value + i); qid->push(start_value + i);
@ -199,7 +199,7 @@ int test_multiqueue_threading3()
int capacity = 4; int capacity = 4;
multiqueue_handler<int> multiqueue(capacity); multiqueue_handler<int> multiqueue(capacity);
auto qid1 = multiqueue.add_queue(true); auto qid1 = multiqueue.add_queue();
auto pop_blocking_func = [&multiqueue](bool* success) { auto pop_blocking_func = [&multiqueue](bool* success) {
int number = 0; int number = 0;
bool ret = multiqueue.wait_pop(&number); bool ret = multiqueue.wait_pop(&number);
@ -235,10 +235,10 @@ int test_multiqueue_threading4()
int capacity = 4; int capacity = 4;
multiqueue_handler<int> multiqueue(capacity); multiqueue_handler<int> multiqueue(capacity);
auto qid1 = multiqueue.add_queue(true); auto qid1 = multiqueue.add_queue();
auto qid2 = multiqueue.add_queue(true); auto qid2 = multiqueue.add_queue();
auto qid3 = multiqueue.add_queue(true); auto qid3 = multiqueue.add_queue();
auto qid4 = multiqueue.add_queue(true); auto qid4 = multiqueue.add_queue();
std::mutex mutex; std::mutex mutex;
int last_number = -1; int last_number = -1;
auto pop_blocking_func = [&multiqueue, &last_number, &mutex](bool* success) { auto pop_blocking_func = [&multiqueue, &last_number, &mutex](bool* success) {
@ -349,7 +349,8 @@ int test_task_thread_pool2()
// to be completed, and does not get stuck. // to be completed, and does not get stuck.
uint32_t nof_workers = 4; uint32_t nof_workers = 4;
uint8_t workers_started = 0, workers_finished = 0; std::atomic<uint8_t> workers_started{0};
uint8_t workers_finished = 0;
std::mutex mut; std::mutex mut;
task_thread_pool thread_pool(nof_workers); task_thread_pool thread_pool(nof_workers);
@ -360,7 +361,7 @@ int test_task_thread_pool2()
std::lock_guard<std::mutex> lock(mut); std::lock_guard<std::mutex> lock(mut);
workers_started++; workers_started++;
} }
sleep(1); std::this_thread::sleep_for(std::chrono::seconds{1});
std::lock_guard<std::mutex> lock(mut); std::lock_guard<std::mutex> lock(mut);
std::cout << "worker has finished\n"; std::cout << "worker has finished\n";
workers_finished++; workers_finished++;

@ -23,7 +23,7 @@ struct rx_thread_tester {
std::thread t; std::thread t;
rx_thread_tester() : rx_thread_tester() :
task_queue(task_sched.make_task_queue(true)), task_queue(task_sched.make_task_queue()),
t([this]() { t([this]() {
stop_token.store(false); stop_token.store(false);
while (not stop_token.load(std::memory_order_relaxed)) { while (not stop_token.load(std::memory_order_relaxed)) {

@ -41,8 +41,8 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) :
pending_stack_metrics(64) pending_stack_metrics(64)
{ {
get_background_workers().set_nof_workers(2); get_background_workers().set_nof_workers(2);
enb_task_queue = task_sched.make_task_queue(true); enb_task_queue = task_sched.make_task_queue();
metrics_task_queue = task_sched.make_task_queue(false); metrics_task_queue = task_sched.make_task_queue();
// sync_queue is added in init() // sync_queue is added in init()
} }
@ -115,7 +115,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
} }
// add sync queue // add sync queue
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size, true); sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
// Init all layers // Init all layers
if (!mac.init(args.mac, rrc_cfg.cell_list, phy, &rlc, &rrc)) { if (!mac.init(args.mac, rrc_cfg.cell_list, phy, &rlc, &rrc)) {

@ -26,10 +26,10 @@ gnb_stack_nr::gnb_stack_nr() : task_sched{512, 128}, thread("gNB"), rlc_logger(s
m_gw.reset(new srsue::gw()); m_gw.reset(new srsue::gw());
// m_gtpu.reset(new srsenb::gtpu()); // m_gtpu.reset(new srsenb::gtpu());
ue_task_queue = task_sched.make_task_queue(true); ue_task_queue = task_sched.make_task_queue();
sync_task_queue = task_sched.make_task_queue(true); sync_task_queue = task_sched.make_task_queue();
gw_task_queue = task_sched.make_task_queue(false); gw_task_queue = task_sched.make_task_queue();
mac_task_queue = task_sched.make_task_queue(false); mac_task_queue = task_sched.make_task_queue();
} }
gnb_stack_nr::~gnb_stack_nr() gnb_stack_nr::~gnb_stack_nr()

@ -52,9 +52,9 @@ ue_stack_lte::ue_stack_lte() :
tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD) tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD)
{ {
get_background_workers().set_nof_workers(2); get_background_workers().set_nof_workers(2);
ue_task_queue = task_sched.make_task_queue(true); ue_task_queue = task_sched.make_task_queue();
gw_queue_id = task_sched.make_task_queue(false); gw_queue_id = task_sched.make_task_queue();
cfg_task_queue = task_sched.make_task_queue(false); cfg_task_queue = task_sched.make_task_queue();
// sync_queue is added in init() // sync_queue is added in init()
} }
@ -198,7 +198,7 @@ int ue_stack_lte::init(const stack_args_t& args_)
} }
// add sync queue // add sync queue
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size, true); sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
mac.init(phy, &rlc, &rrc); mac.init(phy, &rlc, &rrc);
rlc.init(&pdcp, &rrc, &rrc_nr, task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */); rlc.init(&pdcp, &rrc, &rrc_nr, task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */);

@ -33,9 +33,9 @@ ue_stack_nr::ue_stack_nr() :
// setup logging for pool, RLC and PDCP // setup logging for pool, RLC and PDCP
byte_buffer_pool::get_instance()->enable_logger(true); byte_buffer_pool::get_instance()->enable_logger(true);
ue_task_queue = task_sched.make_task_queue(true); ue_task_queue = task_sched.make_task_queue();
sync_task_queue = task_sched.make_task_queue(true); sync_task_queue = task_sched.make_task_queue();
gw_task_queue = task_sched.make_task_queue(false); gw_task_queue = task_sched.make_task_queue();
} }
ue_stack_nr::~ue_stack_nr() ue_stack_nr::~ue_stack_nr()

Loading…
Cancel
Save