diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index 1b181be01..16915acbb 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -70,6 +70,7 @@ class multiqueue_handler { std::unique_lock lock(q_mutex); if (val == active_) { + // no-op return; } active_ = val; @@ -78,6 +79,7 @@ class multiqueue_handler if (not active_) { buffer.clear(); lock.unlock(); + // unlock blocked pushing threads cv_full.notify_all(); } } @@ -130,27 +132,26 @@ class multiqueue_handler template bool push_(T* o, bool blocking) noexcept { - { - std::unique_lock lock(q_mutex); - while (active_ and blocking and buffer.full()) { - nof_waiting++; - cv_full.wait(lock); - nof_waiting--; - } - if (not active_) { - lock.unlock(); - cv_exit.notify_one(); - return false; - } - buffer.push(std::forward(*o)); + std::unique_lock lock(q_mutex); + while (active_ and blocking and buffer.full()) { + nof_waiting++; + cv_full.wait(lock); + nof_waiting--; } + if (not active_) { + lock.unlock(); + cv_exit.notify_one(); + return false; + } + buffer.push(std::forward(*o)); if (consumer_notify_needed) { // Note: The consumer thread only needs to be notified and awaken when queues transition from empty to non-empty // To ensure that the consumer noticed that the queue was empty before a push, we store the last // try_pop() return in a member variable. // Doing this reduces the contention of multiple producers for the same condition variable - parent->cv_empty.notify_one(); consumer_notify_needed = false; + lock.unlock(); + parent->signal_pushed_data(); } return true; } @@ -218,7 +219,8 @@ public: // signal deactivation to pushing threads in a non-blocking way q.set_active(false); } - while (wait_pop_state) { + while (wait_state) { + pushed_data = true; cv_empty.notify_one(); cv_exit.wait(lock); } @@ -277,13 +279,14 @@ public: if (round_robin_pop_(value)) { return true; } - wait_pop_state = true; - cv_empty.wait(lock); - wait_pop_state = false; - } - if (not running) { - cv_exit.notify_one(); + pushed_data = false; + wait_state = true; + while (not pushed_data) { + cv_empty.wait(lock); + } + wait_state = false; } + cv_exit.notify_one(); return false; } @@ -310,11 +313,23 @@ private: } return false; } + /// Called by the producer threads to signal the consumer to unlock in wait_pop + void signal_pushed_data() + { + { + std::lock_guard lock(mutex); + if (pushed_data) { + return; + } + pushed_data = true; + } + cv_empty.notify_one(); + } mutable std::mutex mutex; std::condition_variable cv_empty, cv_exit; uint32_t spin_idx = 0; - bool running = true, wait_pop_state = false; + bool running = true, pushed_data = false, wait_state = false; std::deque queues; uint32_t default_capacity = 0; }; diff --git a/lib/test/common/multiqueue_test.cc b/lib/test/common/multiqueue_test.cc index beaebd472..391979fe8 100644 --- a/lib/test/common/multiqueue_test.cc +++ b/lib/test/common/multiqueue_test.cc @@ -16,6 +16,7 @@ #include "srsran/common/thread_pool.h" #include #include +#include #include #include @@ -234,14 +235,17 @@ int test_multiqueue_threading4() int capacity = 4; multiqueue_handler multiqueue(capacity); - auto qid1 = multiqueue.add_queue(); - auto qid2 = multiqueue.add_queue(); - auto qid3 = multiqueue.add_queue(); - auto qid4 = multiqueue.add_queue(); - auto pop_blocking_func = [&multiqueue](bool* success) { - int number = 0, count = 0; + auto qid1 = multiqueue.add_queue(); + auto qid2 = multiqueue.add_queue(); + auto qid3 = multiqueue.add_queue(); + auto qid4 = multiqueue.add_queue(); + std::mutex mutex; + int last_number; + auto pop_blocking_func = [&multiqueue, &last_number, &mutex](bool* success) { + int number = 0; while (multiqueue.wait_pop(&number)) { - TESTASSERT(number == count++); + std::lock_guard lock(mutex); + last_number = std::max(last_number, number); } *success = true; }; @@ -249,8 +253,12 @@ int test_multiqueue_threading4() bool t1_success = false; std::thread t1(pop_blocking_func, &t1_success); - for (int i = 0; i < 1000; ++i) { - switch (i % 3) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dist{0, 2}; + for (int i = 0; i < 10000; ++i) { + int qidx = dist(gen); + switch (qidx) { case 0: qid1.push(i); break; @@ -263,7 +271,17 @@ int test_multiqueue_threading4() default: break; } - usleep(10); + if (i % 20 == 0) { + int count = 0; + std::unique_lock lock(mutex); + while (last_number != i) { + lock.unlock(); + usleep(100); + count++; + TESTASSERT(count < 100000); + lock.lock(); + } + } } // Should be able to unlock all