From 60a8ee0af96452d555787f662bfb2145461c162e Mon Sep 17 00:00:00 2001 From: Francisco Date: Wed, 19 May 2021 18:49:06 +0100 Subject: [PATCH] multiqueue optimization - use condition_var wait_for() method, use queue try_lock in the consumer side --- lib/include/srsran/common/multiqueue.h | 79 ++++++++++++++------------ 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index 6883fbdc7..0947179b7 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -116,18 +116,14 @@ class multiqueue_handler bool try_pop(myobj& obj) { std::unique_lock lock(q_mutex); - if (buffer.empty()) { - consumer_notify_needed = notify_flag; - return false; - } - obj = std::move(buffer.top()); - buffer.pop(); - consumer_notify_needed = false; - if (nof_waiting > 0) { - lock.unlock(); - cv_full.notify_one(); - } - return true; + return pop_(lock, obj); + } + + bool try_pop(myobj& obj, bool& try_lock_success) + { + std::unique_lock lock(q_mutex, std::try_to_lock); + try_lock_success = lock.owns_lock(); + return try_lock_success ? pop_(lock, obj) : false; } private: @@ -165,6 +161,22 @@ class multiqueue_handler return true; } + bool pop_(std::unique_lock& lock, myobj& obj) + { + if (buffer.empty()) { + consumer_notify_needed = notify_flag; + return false; + } + obj = std::move(buffer.top()); + buffer.pop(); + consumer_notify_needed = false; + if (nof_waiting > 0) { + lock.unlock(); + cv_full.notify_one(); + } + return true; + } + multiqueue_handler* parent = nullptr; mutable std::mutex q_mutex; @@ -229,8 +241,7 @@ public: // signal deactivation to pushing threads in a non-blocking way q.set_active(false); } - while (wait_state) { - pushed_data = true; + while (consumer_state) { cv_empty.notify_one(); cv_exit.wait(lock); } @@ -287,17 +298,16 @@ public: bool wait_pop(myobj* value) { std::unique_lock lock(mutex); + consumer_state = true; while (running) { if (round_robin_pop_(value)) { + consumer_state = false; return true; } - pushed_data = false; - wait_state = true; - while (not pushed_data) { - cv_empty.wait(lock); - } - wait_state = false; + cv_empty.wait_for(lock, std::chrono::microseconds(100)); } + consumer_state = false; + lock.unlock(); cv_exit.notify_one(); return false; } @@ -312,36 +322,31 @@ private: bool round_robin_pop_(myobj* value) { // Round-robin for all queues - auto it = queues.begin() + spin_idx; + auto q_it = queues.begin() + spin_idx; uint32_t count = 0; - for (; count < queues.size(); ++count, ++it) { - if (it == queues.end()) { - it = queues.begin(); // wrap-around + for (; count < queues.size(); ++count, ++q_it) { + if (q_it == queues.end()) { + q_it = queues.begin(); // wrap-around } - if (it->try_pop(*value)) { + bool try_lock_success = true; + if (q_it->try_pop(*value, try_lock_success)) { spin_idx = (spin_idx + count + 1) % queues.size(); return true; } + if (not try_lock_success) { + // restart RR search, as there was a collision with a producer + count = 0; + } } return false; } /// Called by the producer threads to signal the consumer to unlock in wait_pop - void signal_pushed_data() - { - { - std::lock_guard lock(mutex); - if (pushed_data) { - return; - } - pushed_data = true; - } - cv_empty.notify_one(); - } + void signal_pushed_data() { cv_empty.notify_one(); } mutable std::mutex mutex; std::condition_variable cv_empty, cv_exit; uint32_t spin_idx = 0; - bool running = true, pushed_data = false, wait_state = false; + bool running = true, consumer_state = false; std::deque queues; uint32_t default_capacity = 0; };