fix multiqueue producer to consumer notification to avoid deadlocks

master
Francisco 4 years ago committed by Francisco Paisana
parent 0d800eb8f6
commit 99abae9e6a

@ -70,6 +70,7 @@ class multiqueue_handler
{ {
std::unique_lock<std::mutex> lock(q_mutex); std::unique_lock<std::mutex> lock(q_mutex);
if (val == active_) { if (val == active_) {
// no-op
return; return;
} }
active_ = val; active_ = val;
@ -78,6 +79,7 @@ class multiqueue_handler
if (not active_) { if (not active_) {
buffer.clear(); buffer.clear();
lock.unlock(); lock.unlock();
// unlock blocked pushing threads
cv_full.notify_all(); cv_full.notify_all();
} }
} }
@ -130,27 +132,26 @@ class multiqueue_handler
template <typename T> template <typename T>
bool push_(T* o, bool blocking) noexcept bool push_(T* o, bool blocking) noexcept
{ {
{ std::unique_lock<std::mutex> lock(q_mutex);
std::unique_lock<std::mutex> lock(q_mutex); while (active_ and blocking and buffer.full()) {
while (active_ and blocking and buffer.full()) { nof_waiting++;
nof_waiting++; cv_full.wait(lock);
cv_full.wait(lock); nof_waiting--;
nof_waiting--;
}
if (not active_) {
lock.unlock();
cv_exit.notify_one();
return false;
}
buffer.push(std::forward<T>(*o));
} }
if (not active_) {
lock.unlock();
cv_exit.notify_one();
return false;
}
buffer.push(std::forward<T>(*o));
if (consumer_notify_needed) { if (consumer_notify_needed) {
// Note: The consumer thread only needs to be notified and awaken when queues transition from empty to non-empty // Note: The consumer thread only needs to be notified and awaken when queues transition from empty to non-empty
// To ensure that the consumer noticed that the queue was empty before a push, we store the last // To ensure that the consumer noticed that the queue was empty before a push, we store the last
// try_pop() return in a member variable. // try_pop() return in a member variable.
// Doing this reduces the contention of multiple producers for the same condition variable // Doing this reduces the contention of multiple producers for the same condition variable
parent->cv_empty.notify_one();
consumer_notify_needed = false; consumer_notify_needed = false;
lock.unlock();
parent->signal_pushed_data();
} }
return true; return true;
} }
@ -218,7 +219,8 @@ public:
// signal deactivation to pushing threads in a non-blocking way // signal deactivation to pushing threads in a non-blocking way
q.set_active(false); q.set_active(false);
} }
while (wait_pop_state) { while (wait_state) {
pushed_data = true;
cv_empty.notify_one(); cv_empty.notify_one();
cv_exit.wait(lock); cv_exit.wait(lock);
} }
@ -277,13 +279,14 @@ public:
if (round_robin_pop_(value)) { if (round_robin_pop_(value)) {
return true; return true;
} }
wait_pop_state = true; pushed_data = false;
cv_empty.wait(lock); wait_state = true;
wait_pop_state = false; while (not pushed_data) {
} cv_empty.wait(lock);
if (not running) { }
cv_exit.notify_one(); wait_state = false;
} }
cv_exit.notify_one();
return false; return false;
} }
@ -310,11 +313,23 @@ private:
} }
return false; return false;
} }
/// Called by the producer threads to signal the consumer to unlock in wait_pop
void signal_pushed_data()
{
{
std::lock_guard<std::mutex> lock(mutex);
if (pushed_data) {
return;
}
pushed_data = true;
}
cv_empty.notify_one();
}
mutable std::mutex mutex; mutable std::mutex mutex;
std::condition_variable cv_empty, cv_exit; std::condition_variable cv_empty, cv_exit;
uint32_t spin_idx = 0; uint32_t spin_idx = 0;
bool running = true, wait_pop_state = false; bool running = true, pushed_data = false, wait_state = false;
std::deque<input_port_impl> queues; std::deque<input_port_impl> queues;
uint32_t default_capacity = 0; uint32_t default_capacity = 0;
}; };

@ -16,6 +16,7 @@
#include "srsran/common/thread_pool.h" #include "srsran/common/thread_pool.h"
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <random>
#include <thread> #include <thread>
#include <unistd.h> #include <unistd.h>
@ -234,14 +235,17 @@ int test_multiqueue_threading4()
int capacity = 4; int capacity = 4;
multiqueue_handler<int> multiqueue(capacity); multiqueue_handler<int> multiqueue(capacity);
auto qid1 = multiqueue.add_queue(); auto qid1 = multiqueue.add_queue();
auto qid2 = multiqueue.add_queue(); auto qid2 = multiqueue.add_queue();
auto qid3 = multiqueue.add_queue(); auto qid3 = multiqueue.add_queue();
auto qid4 = multiqueue.add_queue(); auto qid4 = multiqueue.add_queue();
auto pop_blocking_func = [&multiqueue](bool* success) { std::mutex mutex;
int number = 0, count = 0; int last_number;
auto pop_blocking_func = [&multiqueue, &last_number, &mutex](bool* success) {
int number = 0;
while (multiqueue.wait_pop(&number)) { while (multiqueue.wait_pop(&number)) {
TESTASSERT(number == count++); std::lock_guard<std::mutex> lock(mutex);
last_number = std::max(last_number, number);
} }
*success = true; *success = true;
}; };
@ -249,8 +253,12 @@ int test_multiqueue_threading4()
bool t1_success = false; bool t1_success = false;
std::thread t1(pop_blocking_func, &t1_success); std::thread t1(pop_blocking_func, &t1_success);
for (int i = 0; i < 1000; ++i) { std::random_device rd;
switch (i % 3) { std::mt19937 gen(rd());
std::uniform_int_distribution<> dist{0, 2};
for (int i = 0; i < 10000; ++i) {
int qidx = dist(gen);
switch (qidx) {
case 0: case 0:
qid1.push(i); qid1.push(i);
break; break;
@ -263,7 +271,17 @@ int test_multiqueue_threading4()
default: default:
break; break;
} }
usleep(10); if (i % 20 == 0) {
int count = 0;
std::unique_lock<std::mutex> lock(mutex);
while (last_number != i) {
lock.unlock();
usleep(100);
count++;
TESTASSERT(count < 100000);
lock.lock();
}
}
} }
// Should be able to unlock all // Should be able to unlock all

Loading…
Cancel
Save