diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index f0076cdb3..3e1fe4f28 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -32,290 +32,6 @@ namespace srsran { #define MULTIQUEUE_DEFAULT_CAPACITY (8192) // Default per-queue capacity -// template -// class multiqueue_handler -//{ -// class circular_buffer -// { -// public: -// circular_buffer(uint32_t cap) : buffer(cap + 1) {} -// circular_buffer(circular_buffer&& other) noexcept -// { -// active = other.active; -// other.active = false; -// widx = other.widx; -// ridx = other.ridx; -// buffer = std::move(other.buffer); -// } -// -// std::condition_variable cv_full; -// bool active = true; -// -// bool empty() const { return widx == ridx; } -// size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } -// bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } -// size_t capacity() const { return buffer.size() - 1; } -// -// template -// void push(T&& o) noexcept -// { -// buffer[widx++] = std::forward(o); -// if (widx >= buffer.size()) { -// widx = 0; -// } -// } -// -// void pop() noexcept -// { -// ridx++; -// if (ridx >= buffer.size()) { -// ridx = 0; -// } -// } -// -// myobj& front() noexcept { return buffer[ridx]; } -// const myobj& front() const noexcept { return buffer[ridx]; } -// -// private: -// std::vector buffer; -// size_t widx = 0, ridx = 0; -// }; -// -// public: -// class queue_handle -// { -// public: -// queue_handle() = default; -// queue_handle(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} -// template -// void push(FwdRef&& value) -// { -// parent->push(queue_id, std::forward(value)); -// } -// bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } -// std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } -// size_t size() { return parent->size(queue_id); } -// -// private: -// multiqueue_handler* parent = nullptr; -// int queue_id = -1; -// }; -// -// explicit multiqueue_handler(uint32_t capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(capacity_) {} -// ~multiqueue_handler() { reset(); } -// -// void reset() -// { -// std::unique_lock lock(mutex); -// running = false; -// while (nof_threads_waiting > 0) { -// uint32_t size = queues.size(); -// cv_empty.notify_one(); -// for (uint32_t i = 0; i < size; ++i) { -// queues[i].cv_full.notify_all(); -// } -// // wait for all threads to unblock -// cv_exit.wait(lock); -// } -// queues.clear(); -// } -// -// /** -// * Adds a new queue with fixed capacity -// * @param capacity_ The capacity of the queue. -// * @return The index of the newly created (or reused) queue within the vector of queues. -// */ -// int add_queue(uint32_t capacity_) -// { -// uint32_t qidx = 0; -// std::lock_guard lock(mutex); -// if (not running) { -// return -1; -// } -// for (; qidx < queues.size() and queues[qidx].active; ++qidx) -// ; -// -// // check if there is a free queue of the required size -// if (qidx == queues.size() || queues[qidx].capacity() != capacity_) { -// // create new queue -// queues.emplace_back(capacity_); -// qidx = queues.size() - 1; // update qidx to the last element -// } else { -// queues[qidx].active = true; -// } -// return (int)qidx; -// } -// -// /** -// * Add queue using the default capacity of the underlying multiqueue -// * @return The queue index -// */ -// int add_queue() { return add_queue(capacity); } -// -// int nof_queues() -// { -// std::lock_guard lock(mutex); -// uint32_t count = 0; -// for (uint32_t i = 0; i < queues.size(); ++i) { -// count += queues[i].active ? 1 : 0; -// } -// return count; -// } -// -// template -// void push(int q_idx, FwdRef&& value) -// { -// { -// std::unique_lock lock(mutex); -// while (is_queue_active_(q_idx) and queues[q_idx].full()) { -// nof_threads_waiting++; -// queues[q_idx].cv_full.wait(lock); -// nof_threads_waiting--; -// } -// if (not is_queue_active_(q_idx)) { -// cv_exit.notify_one(); -// return; -// } -// queues[q_idx].push(std::forward(value)); -// } -// cv_empty.notify_one(); -// } -// -// bool try_push(int q_idx, const myobj& value) -// { -// { -// std::lock_guard lock(mutex); -// if (not is_queue_active_(q_idx) or queues[q_idx].full()) { -// return false; -// } -// queues[q_idx].push(value); -// } -// cv_empty.notify_one(); -// return true; -// } -// -// std::pair try_push(int q_idx, myobj&& value) -// { -// { -// std::lock_guard lck(mutex); -// if (not is_queue_active_(q_idx) or queues[q_idx].full()) { -// return {false, std::move(value)}; -// } -// queues[q_idx].push(std::move(value)); -// } -// cv_empty.notify_one(); -// return {true, std::move(value)}; -// } -// -// int wait_pop(myobj* value) -// { -// std::unique_lock lock(mutex); -// while (running) { -// if (round_robin_pop_(value)) { -// if (nof_threads_waiting > 0) { -// lock.unlock(); -// queues[spin_idx].cv_full.notify_one(); -// } -// return spin_idx; -// } -// nof_threads_waiting++; -// cv_empty.wait(lock); -// nof_threads_waiting--; -// } -// cv_exit.notify_one(); -// return -1; -// } -// -// int try_pop(myobj* value) -// { -// std::unique_lock lock(mutex); -// if (running) { -// if (round_robin_pop_(value)) { -// if (nof_threads_waiting > 0) { -// lock.unlock(); -// queues[spin_idx].cv_full.notify_one(); -// } -// return spin_idx; -// } -// // didn't find any task -// return -1; -// } -// cv_exit.notify_one(); -// return -1; -// } -// -// bool empty(int qidx) -// { -// std::lock_guard lck(mutex); -// return queues[qidx].empty(); -// } -// -// size_t size(int qidx) -// { -// std::lock_guard lck(mutex); -// return queues[qidx].size(); -// } -// -// size_t max_size(int qidx) -// { -// std::lock_guard lck(mutex); -// return queues[qidx].capacity(); -// } -// -// const myobj& front(int qidx) -// { -// std::lock_guard lck(mutex); -// return queues[qidx].front(); -// } -// -// void erase_queue(int qidx) -// { -// std::lock_guard lck(mutex); -// if (is_queue_active_(qidx)) { -// queues[qidx].active = false; -// while (not queues[qidx].empty()) { -// queues[qidx].pop(); -// } -// } -// } -// -// bool is_queue_active(int qidx) -// { -// std::lock_guard lck(mutex); -// return is_queue_active_(qidx); -// } -// -// queue_handle get_queue_handler() { return {this, add_queue()}; } -// queue_handle get_queue_handler(uint32_t size) { return {this, add_queue(size)}; } -// -// private: -// bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } -// -// bool round_robin_pop_(myobj* value) -// { -// // Round-robin for all queues -// for (const circular_buffer& q : queues) { -// spin_idx = (spin_idx + 1) % queues.size(); -// if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { -// if (value) { -// *value = std::move(queues[spin_idx].front()); -// } -// queues[spin_idx].pop(); -// return true; -// } -// } -// return false; -// } -// -// std::mutex mutex; -// std::condition_variable cv_empty, cv_exit; -// uint32_t spin_idx = 0; -// bool running = true; -// std::vector queues; -// uint32_t capacity = 0; -// uint32_t nof_threads_waiting = 0; -// }; - /** * N-to-1 Message-Passing Broker that manages the creation, destruction of input ports, and popping of messages that * are pushed to these ports. @@ -332,15 +48,11 @@ class multiqueue_handler { public: input_port_impl(uint32_t cap, multiqueue_handler* parent_) : buffer(cap), parent(parent_) {} - input_port_impl(input_port_impl&& other) noexcept - { - std::lock_guard lock(other.q_mutex); - active_ = other.active_; - parent = other.parent_; - other.active_ = false; - buffer = std::move(other.buffer); - } - ~input_port_impl() { set_active_blocking(false); } + input_port_impl(const input_port_impl&) = delete; + input_port_impl(input_port_impl&&) = delete; + input_port_impl& operator=(const input_port_impl&) = delete; + input_port_impl& operator=(input_port_impl&&) = delete; + ~input_port_impl() { deactivate_blocking(); } size_t capacity() const { return buffer.max_size(); } size_t size() const @@ -369,16 +81,14 @@ class multiqueue_handler } } - void set_active_blocking(bool val) + void deactivate_blocking() { - set_active(val); + set_active(false); - if (not val) { - // wait for all the pushers to unlock - std::unique_lock lock(q_mutex); - while (nof_waiting > 0) { - cv_exit.wait(lock); - } + // wait for all the pushers to unlock + std::unique_lock lock(q_mutex); + while (nof_waiting > 0) { + cv_exit.wait(lock); } } @@ -448,8 +158,7 @@ public: class queue_handle { public: - queue_handle() = default; - queue_handle(input_port_impl* impl_) : impl(impl_) {} + explicit queue_handle(input_port_impl* impl_ = nullptr) : impl(impl_) {} template void push(FwdRef&& value) { @@ -460,7 +169,7 @@ public: void reset() { if (impl != nullptr) { - impl->set_active_blocking(false); + impl->deactivate_blocking(); impl = nullptr; } } @@ -474,10 +183,20 @@ public: bool operator!=(const queue_handle& other) const { return impl != other.impl; } private: - input_port_impl* impl = nullptr; + struct recycle_op { + void operator()(input_port_impl* p) + { + if (p != nullptr) { + p->deactivate_blocking(); + } + } + }; + std::unique_ptr impl; }; - explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(default_capacity_) {} + explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : + default_capacity(default_capacity_) + {} ~multiqueue_handler() { reset(); } void reset() @@ -492,10 +211,7 @@ public: cv_empty.notify_one(); cv_exit.wait(lock); } - for (auto& q : queues) { - // ensure that all queues are completed with the deactivation before clearing the memory - q.set_active_blocking(false); - } + // queue destructor ensures that the pushing threads have been notified of the queue deactivation in a blocking way queues.clear(); } @@ -529,7 +245,7 @@ public: * Add queue using the default capacity of the underlying multiqueue * @return The queue index */ - queue_handle add_queue() { return add_queue(capacity); } + queue_handle add_queue() { return add_queue(default_capacity); } uint32_t nof_queues() const { @@ -561,10 +277,7 @@ public: bool try_pop(myobj* value) { std::unique_lock lock(mutex); - if (running and round_robin_pop_(value)) { - return true; - } - return false; + return running and round_robin_pop_(value); } private: @@ -590,7 +303,7 @@ private: uint32_t spin_idx = 0; bool running = true, wait_pop_state = false; std::deque queues; - uint32_t capacity = 0; + uint32_t default_capacity = 0; }; template