diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index 5275c4ad1..f0076cdb3 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -19,6 +19,7 @@ #ifndef SRSRAN_MULTIQUEUE_H #define SRSRAN_MULTIQUEUE_H +#include "srsran/adt/circular_buffer.h" #include "srsran/adt/move_callback.h" #include #include @@ -31,53 +32,416 @@ namespace srsran { #define MULTIQUEUE_DEFAULT_CAPACITY (8192) // Default per-queue capacity +// template +// class multiqueue_handler +//{ +// class circular_buffer +// { +// public: +// circular_buffer(uint32_t cap) : buffer(cap + 1) {} +// circular_buffer(circular_buffer&& other) noexcept +// { +// active = other.active; +// other.active = false; +// widx = other.widx; +// ridx = other.ridx; +// buffer = std::move(other.buffer); +// } +// +// std::condition_variable cv_full; +// bool active = true; +// +// bool empty() const { return widx == ridx; } +// size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } +// bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } +// size_t capacity() const { return buffer.size() - 1; } +// +// template +// void push(T&& o) noexcept +// { +// buffer[widx++] = std::forward(o); +// if (widx >= buffer.size()) { +// widx = 0; +// } +// } +// +// void pop() noexcept +// { +// ridx++; +// if (ridx >= buffer.size()) { +// ridx = 0; +// } +// } +// +// myobj& front() noexcept { return buffer[ridx]; } +// const myobj& front() const noexcept { return buffer[ridx]; } +// +// private: +// std::vector buffer; +// size_t widx = 0, ridx = 0; +// }; +// +// public: +// class queue_handle +// { +// public: +// queue_handle() = default; +// queue_handle(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} +// template +// void push(FwdRef&& value) +// { +// parent->push(queue_id, std::forward(value)); +// } +// bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } +// std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } +// size_t size() { return parent->size(queue_id); } +// +// private: +// multiqueue_handler* parent = nullptr; +// int queue_id = -1; +// }; +// +// explicit multiqueue_handler(uint32_t capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(capacity_) {} +// ~multiqueue_handler() { reset(); } +// +// void reset() +// { +// std::unique_lock lock(mutex); +// running = false; +// while (nof_threads_waiting > 0) { +// uint32_t size = queues.size(); +// cv_empty.notify_one(); +// for (uint32_t i = 0; i < size; ++i) { +// queues[i].cv_full.notify_all(); +// } +// // wait for all threads to unblock +// cv_exit.wait(lock); +// } +// queues.clear(); +// } +// +// /** +// * Adds a new queue with fixed capacity +// * @param capacity_ The capacity of the queue. +// * @return The index of the newly created (or reused) queue within the vector of queues. +// */ +// int add_queue(uint32_t capacity_) +// { +// uint32_t qidx = 0; +// std::lock_guard lock(mutex); +// if (not running) { +// return -1; +// } +// for (; qidx < queues.size() and queues[qidx].active; ++qidx) +// ; +// +// // check if there is a free queue of the required size +// if (qidx == queues.size() || queues[qidx].capacity() != capacity_) { +// // create new queue +// queues.emplace_back(capacity_); +// qidx = queues.size() - 1; // update qidx to the last element +// } else { +// queues[qidx].active = true; +// } +// return (int)qidx; +// } +// +// /** +// * Add queue using the default capacity of the underlying multiqueue +// * @return The queue index +// */ +// int add_queue() { return add_queue(capacity); } +// +// int nof_queues() +// { +// std::lock_guard lock(mutex); +// uint32_t count = 0; +// for (uint32_t i = 0; i < queues.size(); ++i) { +// count += queues[i].active ? 1 : 0; +// } +// return count; +// } +// +// template +// void push(int q_idx, FwdRef&& value) +// { +// { +// std::unique_lock lock(mutex); +// while (is_queue_active_(q_idx) and queues[q_idx].full()) { +// nof_threads_waiting++; +// queues[q_idx].cv_full.wait(lock); +// nof_threads_waiting--; +// } +// if (not is_queue_active_(q_idx)) { +// cv_exit.notify_one(); +// return; +// } +// queues[q_idx].push(std::forward(value)); +// } +// cv_empty.notify_one(); +// } +// +// bool try_push(int q_idx, const myobj& value) +// { +// { +// std::lock_guard lock(mutex); +// if (not is_queue_active_(q_idx) or queues[q_idx].full()) { +// return false; +// } +// queues[q_idx].push(value); +// } +// cv_empty.notify_one(); +// return true; +// } +// +// std::pair try_push(int q_idx, myobj&& value) +// { +// { +// std::lock_guard lck(mutex); +// if (not is_queue_active_(q_idx) or queues[q_idx].full()) { +// return {false, std::move(value)}; +// } +// queues[q_idx].push(std::move(value)); +// } +// cv_empty.notify_one(); +// return {true, std::move(value)}; +// } +// +// int wait_pop(myobj* value) +// { +// std::unique_lock lock(mutex); +// while (running) { +// if (round_robin_pop_(value)) { +// if (nof_threads_waiting > 0) { +// lock.unlock(); +// queues[spin_idx].cv_full.notify_one(); +// } +// return spin_idx; +// } +// nof_threads_waiting++; +// cv_empty.wait(lock); +// nof_threads_waiting--; +// } +// cv_exit.notify_one(); +// return -1; +// } +// +// int try_pop(myobj* value) +// { +// std::unique_lock lock(mutex); +// if (running) { +// if (round_robin_pop_(value)) { +// if (nof_threads_waiting > 0) { +// lock.unlock(); +// queues[spin_idx].cv_full.notify_one(); +// } +// return spin_idx; +// } +// // didn't find any task +// return -1; +// } +// cv_exit.notify_one(); +// return -1; +// } +// +// bool empty(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].empty(); +// } +// +// size_t size(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].size(); +// } +// +// size_t max_size(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].capacity(); +// } +// +// const myobj& front(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].front(); +// } +// +// void erase_queue(int qidx) +// { +// std::lock_guard lck(mutex); +// if (is_queue_active_(qidx)) { +// queues[qidx].active = false; +// while (not queues[qidx].empty()) { +// queues[qidx].pop(); +// } +// } +// } +// +// bool is_queue_active(int qidx) +// { +// std::lock_guard lck(mutex); +// return is_queue_active_(qidx); +// } +// +// queue_handle get_queue_handler() { return {this, add_queue()}; } +// queue_handle get_queue_handler(uint32_t size) { return {this, add_queue(size)}; } +// +// private: +// bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } +// +// bool round_robin_pop_(myobj* value) +// { +// // Round-robin for all queues +// for (const circular_buffer& q : queues) { +// spin_idx = (spin_idx + 1) % queues.size(); +// if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { +// if (value) { +// *value = std::move(queues[spin_idx].front()); +// } +// queues[spin_idx].pop(); +// return true; +// } +// } +// return false; +// } +// +// std::mutex mutex; +// std::condition_variable cv_empty, cv_exit; +// uint32_t spin_idx = 0; +// bool running = true; +// std::vector queues; +// uint32_t capacity = 0; +// uint32_t nof_threads_waiting = 0; +// }; + +/** + * N-to-1 Message-Passing Broker that manages the creation, destruction of input ports, and popping of messages that + * are pushed to these ports. + * Each port provides a thread-safe push(...) / try_push(...) interface to enqueue messages + * The class will pop from the several created ports in a round-robin fashion. + * The popping() interface is not safe-thread. That means, that it is expected that only one thread will + * be popping tasks. + * @tparam myobj message type + */ template class multiqueue_handler { - class circular_buffer + class input_port_impl { public: - circular_buffer(uint32_t cap) : buffer(cap + 1) {} - circular_buffer(circular_buffer&& other) noexcept + input_port_impl(uint32_t cap, multiqueue_handler* parent_) : buffer(cap), parent(parent_) {} + input_port_impl(input_port_impl&& other) noexcept { - active = other.active; - other.active = false; - widx = other.widx; - ridx = other.ridx; - buffer = std::move(other.buffer); + std::lock_guard lock(other.q_mutex); + active_ = other.active_; + parent = other.parent_; + other.active_ = false; + buffer = std::move(other.buffer); } + ~input_port_impl() { set_active_blocking(false); } - std::condition_variable cv_full; - bool active = true; + size_t capacity() const { return buffer.max_size(); } + size_t size() const + { + std::lock_guard lock(q_mutex); + return buffer.size(); + } + bool active() const + { + std::lock_guard lock(q_mutex); + return active_; + } - bool empty() const { return widx == ridx; } - size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } - bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } - size_t capacity() const { return buffer.size() - 1; } + void set_active(bool val) + { + std::unique_lock lock(q_mutex); + if (val == active_) { + return; + } + active_ = val; + + if (not active_) { + buffer.clear(); + lock.unlock(); + cv_full.notify_all(); + } + } + + void set_active_blocking(bool val) + { + set_active(val); + + if (not val) { + // wait for all the pushers to unlock + std::unique_lock lock(q_mutex); + while (nof_waiting > 0) { + cv_exit.wait(lock); + } + } + } template void push(T&& o) noexcept { - buffer[widx++] = std::forward(o); - if (widx >= buffer.size()) { - widx = 0; - } + push_(&o, true); } - void pop() noexcept + bool try_push(const myobj& o) { return push_(&o, false); } + + srsran::error_type try_push(myobj&& o) { - ridx++; - if (ridx >= buffer.size()) { - ridx = 0; + if (push_(&o, false)) { + return {}; } + return {std::move(o)}; } - myobj& front() noexcept { return buffer[ridx]; } - const myobj& front() const noexcept { return buffer[ridx]; } + bool try_pop(myobj& obj) + { + std::unique_lock lock(q_mutex); + if (buffer.empty()) { + return false; + } + obj = std::move(buffer.top()); + buffer.pop(); + if (nof_waiting > 0) { + lock.unlock(); + cv_full.notify_one(); + } + return true; + } private: - std::vector buffer; - size_t widx = 0, ridx = 0; + template + bool push_(T* o, bool blocking) noexcept + { + { + std::unique_lock lock(q_mutex); + while (active_ and blocking and buffer.full()) { + nof_waiting++; + cv_full.wait(lock); + nof_waiting--; + } + if (not active_) { + lock.unlock(); + cv_exit.notify_one(); + return false; + } + buffer.push(std::forward(*o)); + } + parent->cv_empty.notify_one(); + return true; + } + + multiqueue_handler* parent = nullptr; + + mutable std::mutex q_mutex; + srsran::dyn_circular_buffer buffer; + std::condition_variable cv_full, cv_exit; + bool active_ = true; + int nof_waiting = 0; }; public: @@ -85,37 +449,53 @@ public: { public: queue_handle() = default; - queue_handle(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} + queue_handle(input_port_impl* impl_) : impl(impl_) {} template void push(FwdRef&& value) { - parent->push(queue_id, std::forward(value)); + impl->push(std::forward(value)); + } + bool try_push(const myobj& value) { return impl->try_push(value); } + srsran::error_type try_push(myobj&& value) { return impl->try_push(std::move(value)); } + void reset() + { + if (impl != nullptr) { + impl->set_active_blocking(false); + impl = nullptr; + } } - bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } - std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } - size_t size() { return parent->size(queue_id); } + + size_t size() { return impl->size(); } + size_t capacity() { return impl->capacity(); } + bool active() const { return impl != nullptr and impl->active(); } + bool empty() const { return impl->size() == 0; } + + bool operator==(const queue_handle& other) const { return impl == other.impl; } + bool operator!=(const queue_handle& other) const { return impl != other.impl; } private: - multiqueue_handler* parent = nullptr; - int queue_id = -1; + input_port_impl* impl = nullptr; }; - explicit multiqueue_handler(uint32_t capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(capacity_) {} + explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(default_capacity_) {} ~multiqueue_handler() { reset(); } void reset() { std::unique_lock lock(mutex); running = false; - while (nof_threads_waiting > 0) { - uint32_t size = queues.size(); + for (auto& q : queues) { + // signal deactivation to pushing threads in a non-blocking way + q.set_active(false); + } + while (wait_pop_state) { cv_empty.notify_one(); - for (uint32_t i = 0; i < size; ++i) { - queues[i].cv_full.notify_all(); - } - // wait for all threads to unblock cv_exit.wait(lock); } + for (auto& q : queues) { + // ensure that all queues are completed with the deactivation before clearing the memory + q.set_active_blocking(false); + } queues.clear(); } @@ -124,197 +504,98 @@ public: * @param capacity_ The capacity of the queue. * @return The index of the newly created (or reused) queue within the vector of queues. */ - int add_queue(uint32_t capacity_) + queue_handle add_queue(uint32_t capacity_) { uint32_t qidx = 0; std::lock_guard lock(mutex); if (not running) { - return -1; + return queue_handle(); } - for (; qidx < queues.size() and queues[qidx].active; ++qidx) + for (; qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_)); ++qidx) ; // check if there is a free queue of the required size - if (qidx == queues.size() || queues[qidx].capacity() != capacity_) { + if (qidx == queues.size()) { // create new queue - queues.emplace_back(capacity_); + queues.emplace_back(capacity_, this); qidx = queues.size() - 1; // update qidx to the last element } else { - queues[qidx].active = true; + queues[qidx].set_active(true); } - return (int)qidx; + return queue_handle(&queues[qidx]); } /** * Add queue using the default capacity of the underlying multiqueue * @return The queue index */ - int add_queue() { return add_queue(capacity); } + queue_handle add_queue() { return add_queue(capacity); } - int nof_queues() + uint32_t nof_queues() const { std::lock_guard lock(mutex); uint32_t count = 0; for (uint32_t i = 0; i < queues.size(); ++i) { - count += queues[i].active ? 1 : 0; + count += queues[i].active() ? 1 : 0; } return count; } - template - void push(int q_idx, FwdRef&& value) - { - { - std::unique_lock lock(mutex); - while (is_queue_active_(q_idx) and queues[q_idx].full()) { - nof_threads_waiting++; - queues[q_idx].cv_full.wait(lock); - nof_threads_waiting--; - } - if (not is_queue_active_(q_idx)) { - cv_exit.notify_one(); - return; - } - queues[q_idx].push(std::forward(value)); - } - cv_empty.notify_one(); - } - - bool try_push(int q_idx, const myobj& value) - { - { - std::lock_guard lock(mutex); - if (not is_queue_active_(q_idx) or queues[q_idx].full()) { - return false; - } - queues[q_idx].push(value); - } - cv_empty.notify_one(); - return true; - } - - std::pair try_push(int q_idx, myobj&& value) - { - { - std::lock_guard lck(mutex); - if (not is_queue_active_(q_idx) or queues[q_idx].full()) { - return {false, std::move(value)}; - } - queues[q_idx].push(std::move(value)); - } - cv_empty.notify_one(); - return {true, std::move(value)}; - } - - int wait_pop(myobj* value) + bool wait_pop(myobj* value) { std::unique_lock lock(mutex); while (running) { if (round_robin_pop_(value)) { - if (nof_threads_waiting > 0) { - lock.unlock(); - queues[spin_idx].cv_full.notify_one(); - } - return spin_idx; + return true; } - nof_threads_waiting++; + wait_pop_state = true; cv_empty.wait(lock); - nof_threads_waiting--; + wait_pop_state = false; } - cv_exit.notify_one(); - return -1; - } - - int try_pop(myobj* value) - { - std::unique_lock lock(mutex); - if (running) { - if (round_robin_pop_(value)) { - if (nof_threads_waiting > 0) { - lock.unlock(); - queues[spin_idx].cv_full.notify_one(); - } - return spin_idx; - } - // didn't find any task - return -1; + if (not running) { + cv_exit.notify_one(); } - cv_exit.notify_one(); - return -1; - } - - bool empty(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].empty(); - } - - size_t size(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].size(); - } - - size_t max_size(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].capacity(); - } - - const myobj& front(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].front(); + return false; } - void erase_queue(int qidx) + bool try_pop(myobj* value) { - std::lock_guard lck(mutex); - if (is_queue_active_(qidx)) { - queues[qidx].active = false; - while (not queues[qidx].empty()) { - queues[qidx].pop(); - } + std::unique_lock lock(mutex); + if (running and round_robin_pop_(value)) { + return true; } + return false; } - bool is_queue_active(int qidx) - { - std::lock_guard lck(mutex); - return is_queue_active_(qidx); - } - - queue_handle get_queue_handler() { return {this, add_queue()}; } - queue_handle get_queue_handler(uint32_t size) { return {this, add_queue(size)}; } - private: - bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } - bool round_robin_pop_(myobj* value) { // Round-robin for all queues - for (const circular_buffer& q : queues) { - spin_idx = (spin_idx + 1) % queues.size(); - if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { - if (value) { - *value = std::move(queues[spin_idx].front()); - } - queues[spin_idx].pop(); + auto it = queues.begin() + spin_idx; + uint32_t count = 0; + for (; count < queues.size(); ++count, ++it) { + if (it == queues.end()) { + it = queues.begin(); // wrap-around + } + if (it->try_pop(*value)) { + spin_idx = (spin_idx + count + 1) % queues.size(); return true; } } return false; } - std::mutex mutex; - std::condition_variable cv_empty, cv_exit; - uint32_t spin_idx = 0; - bool running = true; - std::vector queues; - uint32_t capacity = 0; - uint32_t nof_threads_waiting = 0; + mutable std::mutex mutex; + std::condition_variable cv_empty, cv_exit; + uint32_t spin_idx = 0; + bool running = true, wait_pop_state = false; + std::deque queues; + uint32_t capacity = 0; }; +template +using queue_handle = typename multiqueue_handler::queue_handle; + //! Specialization for tasks using task_multiqueue = multiqueue_handler; using task_queue_handle = task_multiqueue::queue_handle; diff --git a/lib/include/srsran/common/task_scheduler.h b/lib/include/srsran/common/task_scheduler.h index 5bba56fee..27b9bdeed 100644 --- a/lib/include/srsran/common/task_scheduler.h +++ b/lib/include/srsran/common/task_scheduler.h @@ -26,7 +26,7 @@ public: explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) : external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc} { - background_queue_id = external_tasks.add_queue(); + background_queue = external_tasks.add_queue(); } task_scheduler(const task_scheduler&) = delete; task_scheduler(task_scheduler&&) = delete; @@ -38,8 +38,8 @@ public: srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); } //! Creates new queue for tasks coming from external thread - srsran::task_queue_handle make_task_queue() { return external_tasks.get_queue_handler(); } - srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.get_queue_handler(qsize); } + srsran::task_queue_handle make_task_queue() { return external_tasks.add_queue(); } + srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.add_queue(qsize); } //! Delays a task processing by duration_ms template @@ -55,7 +55,7 @@ public: void notify_background_task_result(srsran::move_task_t task) { // run the notification in next tic - external_tasks.push(background_queue_id, std::move(task)); + background_queue.push(std::move(task)); } //! Updates timers, and run any pending internal tasks. @@ -67,7 +67,7 @@ public: bool run_next_task() { srsran::move_task_t task{}; - if (external_tasks.wait_pop(&task) >= 0) { + if (external_tasks.wait_pop(&task)) { task(); run_all_internal_tasks(); return true; @@ -81,7 +81,7 @@ public: { run_all_internal_tasks(); srsran::move_task_t task{}; - while (external_tasks.try_pop(&task) >= 0) { + while (external_tasks.try_pop(&task)) { task(); run_all_internal_tasks(); } @@ -101,9 +101,9 @@ private: } } - int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background - srsran::task_multiqueue external_tasks; - srsran::timer_handler timers; + srsran::task_multiqueue external_tasks; + srsran::task_queue_handle background_queue; ///< Queue for handling the outcomes of tasks run in the background + srsran::timer_handler timers; std::deque internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking }; diff --git a/lib/test/common/multiqueue_test.cc b/lib/test/common/multiqueue_test.cc index cfd6cf6d5..67e36a3df 100644 --- a/lib/test/common/multiqueue_test.cc +++ b/lib/test/common/multiqueue_test.cc @@ -12,20 +12,13 @@ #include "srsran/adt/move_callback.h" #include "srsran/common/multiqueue.h" +#include "srsran/common/test_common.h" #include "srsran/common/thread_pool.h" #include #include #include #include -#define TESTASSERT(cond) \ - { \ - if (!(cond)) { \ - std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ - return -1; \ - } \ - } - using namespace srsran; int test_multiqueue() @@ -35,79 +28,80 @@ int test_multiqueue() int number = 2; multiqueue_handler multiqueue; - TESTASSERT(multiqueue.nof_queues() == 0) + TESTASSERT(multiqueue.nof_queues() == 0); // test push/pop and size for one queue - int qid1 = multiqueue.add_queue(); - TESTASSERT(qid1 == 0 and multiqueue.is_queue_active(qid1)) - TESTASSERT(multiqueue.size(qid1) == 0 and multiqueue.empty(qid1)) - TESTASSERT(multiqueue.nof_queues() == 1) - TESTASSERT(multiqueue.try_push(qid1, 5).first) - TESTASSERT(multiqueue.try_push(qid1, number)) - TESTASSERT(multiqueue.size(qid1) == 2 and not multiqueue.empty(qid1)) - TESTASSERT(multiqueue.wait_pop(&number) == qid1) - TESTASSERT(number == 5) - TESTASSERT(multiqueue.wait_pop(&number) == qid1) - TESTASSERT(number == 2 and multiqueue.empty(qid1) and multiqueue.size(qid1) == 0) + queue_handle qid1 = multiqueue.add_queue(); + TESTASSERT(qid1.active()); + TESTASSERT(qid1.size() == 0 and qid1.empty()); + TESTASSERT(multiqueue.nof_queues() == 1); + TESTASSERT(qid1.try_push(5).has_value()); + TESTASSERT(qid1.try_push(number)); + TESTASSERT(qid1.size() == 2 and not qid1.empty()); + TESTASSERT(multiqueue.wait_pop(&number)); + TESTASSERT(number == 5); + TESTASSERT(multiqueue.wait_pop(&number)); + TESTASSERT(number == 2 and qid1.empty()); // test push/pop and size for two queues - int qid2 = multiqueue.add_queue(); - TESTASSERT(qid2 == 1) - TESTASSERT(multiqueue.nof_queues() == 2 and multiqueue.is_queue_active(qid1)) - TESTASSERT(multiqueue.try_push(qid2, 3).first) - TESTASSERT(multiqueue.size(qid2) == 1 and not multiqueue.empty(qid2)) - TESTASSERT(multiqueue.empty(qid1) and multiqueue.size(qid1) == 0) + queue_handle qid2 = multiqueue.add_queue(); + TESTASSERT(qid2.active()); + TESTASSERT(multiqueue.nof_queues() == 2 and qid1.active()); + TESTASSERT(qid2.try_push(3).has_value()); + TESTASSERT(qid2.size() == 1 and not qid2.empty()); + TESTASSERT(qid1.empty()); // check if erasing a queue breaks anything - multiqueue.erase_queue(qid1); - TESTASSERT(multiqueue.nof_queues() == 1 and not multiqueue.is_queue_active(qid1)) + qid1.reset(); + TESTASSERT(multiqueue.nof_queues() == 1 and not qid1.active()); qid1 = multiqueue.add_queue(); - TESTASSERT(qid1 == 0) - TESTASSERT(multiqueue.empty(qid1) and multiqueue.is_queue_active(qid1)) + TESTASSERT(qid1.empty() and qid1.active()); + TESTASSERT(qid2.size() == 1 and not qid2.empty()); multiqueue.wait_pop(&number); // check round-robin for (int i = 0; i < 10; ++i) { - TESTASSERT(multiqueue.try_push(qid1, i)) + TESTASSERT(qid1.try_push(i)); } for (int i = 20; i < 35; ++i) { - TESTASSERT(multiqueue.try_push(qid2, i)) + TESTASSERT(qid2.try_push(i)); } - TESTASSERT(multiqueue.size(qid1) == 10) - TESTASSERT(multiqueue.size(qid2) == 15) - TESTASSERT(multiqueue.wait_pop(&number) == qid1 and number == 0) - TESTASSERT(multiqueue.wait_pop(&number) == qid2 and number == 20) - TESTASSERT(multiqueue.wait_pop(&number) == qid1 and number == 1) - TESTASSERT(multiqueue.wait_pop(&number) == qid2 and number == 21) - TESTASSERT(multiqueue.size(qid1) == 8) - TESTASSERT(multiqueue.size(qid2) == 13) + TESTASSERT(qid1.size() == 10); + TESTASSERT(qid2.size() == 15); + TESTASSERT(multiqueue.wait_pop(&number) and number == 0); + TESTASSERT(multiqueue.wait_pop(&number) and number == 20); + TESTASSERT(multiqueue.wait_pop(&number) and number == 1); + TESTASSERT(multiqueue.wait_pop(&number) and number == 21); + TESTASSERT(qid1.size() == 8); + TESTASSERT(qid2.size() == 13); for (int i = 0; i < 8 * 2; ++i) { multiqueue.wait_pop(&number); } - TESTASSERT(multiqueue.size(qid1) == 0) - TESTASSERT(multiqueue.size(qid2) == 5) - TESTASSERT(multiqueue.wait_pop(&number) == qid2 and number == 30) + TESTASSERT(qid1.size() == 0); + TESTASSERT(qid2.size() == 5); + TESTASSERT(multiqueue.wait_pop(&number) and number == 30); // remove existing queues - multiqueue.erase_queue(qid1); - multiqueue.erase_queue(qid2); - TESTASSERT(multiqueue.nof_queues() == 0) + qid1.reset(); + qid2.reset(); + TESTASSERT(multiqueue.nof_queues() == 0); // check that adding a queue of different capacity works { - int qid1 = multiqueue.add_queue(); - int qid2 = multiqueue.add_queue(); + qid1 = multiqueue.add_queue(); + qid2 = multiqueue.add_queue(); // remove first queue again - multiqueue.erase_queue(qid1); - TESTASSERT(multiqueue.nof_queues() == 1) + qid1.reset(); + TESTASSERT(multiqueue.nof_queues() == 1); // add queue with non-default capacity - int qid3 = multiqueue.add_queue(10); + auto qid3 = multiqueue.add_queue(10); + TESTASSERT(qid3.capacity() == 10); // make sure neither a new queue index is returned - TESTASSERT(qid1 != qid3) - TESTASSERT(qid2 != qid3) + TESTASSERT(qid1 != qid3); + TESTASSERT(qid2 != qid3); } std::cout << "outcome: Success\n"; @@ -122,10 +116,10 @@ int test_multiqueue_threading() int capacity = 4, number = 0, start_number = 2, nof_pushes = capacity + 1; multiqueue_handler multiqueue(capacity); - int qid1 = multiqueue.add_queue(); - auto push_blocking_func = [&multiqueue](int qid, int start_value, int nof_pushes, bool* is_running) { + auto qid1 = multiqueue.add_queue(); + auto push_blocking_func = [](queue_handle* qid, int start_value, int nof_pushes, bool* is_running) { for (int i = 0; i < nof_pushes; ++i) { - multiqueue.push(qid, start_value + i); + qid->push(start_value + i); std::cout << "t1: pushed item " << i << std::endl; } std::cout << "t1: pushed all items\n"; @@ -133,17 +127,17 @@ int test_multiqueue_threading() }; bool t1_running = true; - std::thread t1(push_blocking_func, qid1, start_number, nof_pushes, &t1_running); + std::thread t1(push_blocking_func, &qid1, start_number, nof_pushes, &t1_running); // Wait for queue to fill - while ((int)multiqueue.size(qid1) != capacity) { + while ((int)qid1.size() != capacity) { usleep(1000); - TESTASSERT(t1_running) + TESTASSERT(t1_running); } for (int i = 0; i < nof_pushes; ++i) { - TESTASSERT(multiqueue.wait_pop(&number) == qid1) - TESTASSERT(number == start_number + i) + TESTASSERT(multiqueue.wait_pop(&number)); + TESTASSERT(number == start_number + i); std::cout << "main: popped item " << i << "\n"; } std::cout << "main: popped all items\n"; @@ -152,7 +146,7 @@ int test_multiqueue_threading() while (t1_running) { usleep(1000); } - TESTASSERT(multiqueue.size(qid1) == 0) + TESTASSERT(qid1.size() == 0); multiqueue.reset(); t1.join(); @@ -170,22 +164,22 @@ int test_multiqueue_threading2() int capacity = 4, start_number = 2, nof_pushes = capacity + 1; multiqueue_handler multiqueue(capacity); - int qid1 = multiqueue.add_queue(); - auto push_blocking_func = [&multiqueue](int qid, int start_value, int nof_pushes, bool* is_running) { + auto qid1 = multiqueue.add_queue(); + auto push_blocking_func = [](queue_handle* qid, int start_value, int nof_pushes, bool* is_running) { for (int i = 0; i < nof_pushes; ++i) { - multiqueue.push(qid, start_value + i); + qid->push(start_value + i); } std::cout << "t1: pushed all items\n"; *is_running = false; }; bool t1_running = true; - std::thread t1(push_blocking_func, qid1, start_number, nof_pushes, &t1_running); + std::thread t1(push_blocking_func, &qid1, start_number, nof_pushes, &t1_running); // Wait for queue to fill - while ((int)multiqueue.size(qid1) != capacity) { + while ((int)qid1.size() != capacity) { usleep(1000); - TESTASSERT(t1_running) + TESTASSERT(t1_running); } multiqueue.reset(); @@ -204,23 +198,25 @@ int test_multiqueue_threading3() int capacity = 4; multiqueue_handler multiqueue(capacity); - int qid1 = multiqueue.add_queue(); - auto pop_blocking_func = [&multiqueue](int qid, bool* success) { - int number = 0; - int id = multiqueue.wait_pop(&number); - *success = id < 0; + auto qid1 = multiqueue.add_queue(); + auto pop_blocking_func = [&multiqueue](bool* success) { + int number = 0; + bool ret = multiqueue.wait_pop(&number); + *success = not ret; }; bool t1_success = false; - std::thread t1(pop_blocking_func, qid1, &t1_success); + std::thread t1(pop_blocking_func, &t1_success); - TESTASSERT(not t1_success) + TESTASSERT(not t1_success); usleep(1000); - TESTASSERT(not t1_success) - TESTASSERT((int)multiqueue.size(qid1) == 0) + TESTASSERT(not t1_success); + TESTASSERT((int)qid1.size() == 0); // Should be able to unlock all multiqueue.reset(); + TESTASSERT(multiqueue.nof_queues() == 0); + TESTASSERT(not qid1.active()); t1.join(); TESTASSERT(t1_success); diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 66fa3915a..4f38c942c 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -218,7 +218,7 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics) } }); - if (ret.first) { + if (ret.has_value()) { // wait for result *metrics = pending_stack_metrics.pop_blocking(); return true; diff --git a/srsue/src/stack/mac/mac.cc b/srsue/src/stack/mac/mac.cc index 4be6a1026..dcd504949 100644 --- a/srsue/src/stack/mac/mac.cc +++ b/srsue/src/stack/mac/mac.cc @@ -338,7 +338,7 @@ void mac::bch_decoded_ok(uint32_t cc_idx, uint8_t* payload, uint32_t len) buf->set_timestamp(); auto p = stack_task_dispatch_queue.try_push(std::bind( [this](srsran::unique_byte_buffer_t& buf) { rlc_h->write_pdu_bcch_bch(std::move(buf)); }, std::move(buf))); - if (not p.first) { + if (p.is_error()) { Warning("Failed to dispatch rlc::write_pdu_bcch_bch task to stack"); } } else { @@ -399,7 +399,7 @@ void mac::tb_decoded(uint32_t cc_idx, mac_grant_dl_t grant, bool ack[SRSRAN_MAX_ auto ret = stack_task_dispatch_queue.try_push(std::bind( [this](srsran::unique_byte_buffer_t& pdu) { rlc_h->write_pdu_pcch(std::move(pdu)); }, std::move(pdu))); - if (not ret.first) { + if (ret.is_error()) { Warning("Failed to dispatch rlc::write_pdu_pcch task to stack"); } } else { @@ -477,7 +477,7 @@ void mac::process_pdus() have_data = demux_unit.process_pdus(); } }); - if (not ret.first) { + if (ret.is_error()) { Warning("Failed to dispatch mac::%s task to stack thread", __func__); } } diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index 2b44b71b8..e903a4a27 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -119,7 +119,7 @@ int ue_stack_lte::init(const stack_args_t& args_) mac_nr_logger.set_hex_dump_max_size(args.log.mac_hex_limit); rrc_nr_logger.set_level(srslog::str_to_basic_level(args.log.rrc_level)); rrc_nr_logger.set_hex_dump_max_size(args.log.rrc_hex_limit); - + // Set up pcap // parse pcap trace list std::vector pcap_list; @@ -341,7 +341,7 @@ void ue_stack_lte::run_thread() void ue_stack_lte::write_sdu(uint32_t lcid, srsran::unique_byte_buffer_t sdu) { auto task = [this, lcid](srsran::unique_byte_buffer_t& sdu) { pdcp.write_sdu(lcid, std::move(sdu)); }; - bool ret = gw_queue_id.try_push(std::bind(task, std::move(sdu))).first; + bool ret = gw_queue_id.try_push(std::bind(task, std::move(sdu))).has_value(); if (not ret) { pdcp_logger.info("GW SDU with lcid=%d was discarded.", lcid); ul_dropped_sdus++; diff --git a/srsue/src/stack/ue_stack_nr.cc b/srsue/src/stack/ue_stack_nr.cc index c670acc14..33ed31c5f 100644 --- a/srsue/src/stack/ue_stack_nr.cc +++ b/srsue/src/stack/ue_stack_nr.cc @@ -154,9 +154,9 @@ void ue_stack_nr::run_thread() void ue_stack_nr::write_sdu(uint32_t lcid, srsran::unique_byte_buffer_t sdu) { if (pdcp != nullptr) { - std::pair ret = gw_task_queue.try_push(std::bind( + auto ret = gw_task_queue.try_push(std::bind( [this, lcid](srsran::unique_byte_buffer_t& sdu) { pdcp->write_sdu(lcid, std::move(sdu)); }, std::move(sdu))); - if (not ret.first) { + if (ret.is_error()) { pdcp_logger.warning("GW SDU with lcid=%d was discarded.", lcid); } }