From 2d6a8788263534c66ff4f275e646c80169b5ae1c Mon Sep 17 00:00:00 2001 From: Francisco Date: Mon, 17 May 2021 17:43:00 +0100 Subject: [PATCH] change interface to create multiqueue queues --- lib/include/srsran/common/multiqueue.h | 34 ++++++++++------------ lib/include/srsran/common/task_scheduler.h | 13 +++++---- lib/test/common/multiqueue_test.cc | 26 ++++++++--------- lib/test/common/network_utils_test.cc | 2 +- srsenb/src/stack/enb_stack_lte.cc | 8 ++--- srsenb/src/stack/gnb_stack_nr.cc | 10 +++---- srsue/src/stack/ue_stack_lte.cc | 10 +++---- srsue/src/stack/ue_stack_nr.cc | 8 ++--- 8 files changed, 52 insertions(+), 59 deletions(-) diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index e1830deae..6883fbdc7 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -47,7 +47,9 @@ class multiqueue_handler class input_port_impl { public: - input_port_impl(uint32_t cap, multiqueue_handler* parent_) : buffer(cap), parent(parent_) {} + input_port_impl(uint32_t cap, bool notify_flag_, multiqueue_handler* parent_) : + buffer(cap), notify_flag(notify_flag_), consumer_notify_needed(notify_flag_), parent(parent_) + {} input_port_impl(const input_port_impl&) = delete; input_port_impl(input_port_impl&&) = delete; input_port_impl& operator=(const input_port_impl&) = delete; @@ -55,6 +57,7 @@ class multiqueue_handler ~input_port_impl() { deactivate_blocking(); } size_t capacity() const { return buffer.max_size(); } + bool get_notify_mode() const { return notify_flag; } size_t size() const { std::lock_guard lock(q_mutex); @@ -65,12 +68,6 @@ class multiqueue_handler std::lock_guard lock(q_mutex); return active_; } - - void set_notify_mode() - { - std::unique_lock lock(q_mutex); - notify_mode = true; - } void set_active(bool val) { std::unique_lock lock(q_mutex); @@ -79,7 +76,7 @@ class multiqueue_handler return; } active_ = val; - consumer_notify_needed = true; + consumer_notify_needed = notify_flag; if (not active_) { buffer.clear(); @@ -120,7 +117,7 @@ class multiqueue_handler { std::unique_lock lock(q_mutex); if (buffer.empty()) { - consumer_notify_needed = true; + consumer_notify_needed = notify_flag; return false; } obj = std::move(buffer.top()); @@ -157,7 +154,7 @@ class multiqueue_handler } } buffer.push(std::forward(*o)); - if (consumer_notify_needed and notify_mode) { + if (consumer_notify_needed) { // Note: The consumer thread only needs to be notified and awaken when queues transition from empty to non-empty // To ensure that the consumer noticed that the queue was empty before a push, we store the last // try_pop() return in a member variable. @@ -174,8 +171,8 @@ class multiqueue_handler srsran::dyn_circular_buffer buffer; std::condition_variable cv_full, cv_exit; bool active_ = true; - bool consumer_notify_needed = true; - bool notify_mode = false; + bool consumer_notify_needed = false; + bool notify_flag = false; int nof_waiting = 0; }; @@ -198,7 +195,6 @@ public: impl = nullptr; } } - void set_notify_mode() { impl->set_notify_mode(); } size_t size() { return impl->size(); } size_t capacity() { return impl->capacity(); } @@ -249,20 +245,22 @@ public: * @param capacity_ The capacity of the queue. * @return The index of the newly created (or reused) queue within the vector of queues. */ - queue_handle add_queue(uint32_t capacity_) + queue_handle add_queue(uint32_t capacity_, bool notify_flag = false) { uint32_t qidx = 0; std::lock_guard lock(mutex); if (not running) { return queue_handle(); } - for (; qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_)); ++qidx) - ; + while (qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_) or + (queues[qidx].get_notify_mode() == notify_flag))) { + ++qidx; + } // check if there is a free queue of the required size if (qidx == queues.size()) { // create new queue - queues.emplace_back(capacity_, this); + queues.emplace_back(capacity_, notify_flag, this); qidx = queues.size() - 1; // update qidx to the last element } else { queues[qidx].set_active(true); @@ -274,7 +272,7 @@ public: * Add queue using the default capacity of the underlying multiqueue * @return The queue index */ - queue_handle add_queue() { return add_queue(default_capacity); } + queue_handle add_queue(bool notify_flag) { return add_queue(default_capacity, notify_flag); } uint32_t nof_queues() const { diff --git a/lib/include/srsran/common/task_scheduler.h b/lib/include/srsran/common/task_scheduler.h index fc0b5d3c5..8a73cc7c5 100644 --- a/lib/include/srsran/common/task_scheduler.h +++ b/lib/include/srsran/common/task_scheduler.h @@ -26,7 +26,7 @@ public: explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) : external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc} { - background_queue = external_tasks.add_queue(); + background_queue = external_tasks.add_queue(false); } task_scheduler(const task_scheduler&) = delete; task_scheduler(task_scheduler&&) = delete; @@ -38,8 +38,11 @@ public: srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); } //! Creates new queue for tasks coming from external thread - srsran::task_queue_handle make_task_queue() { return external_tasks.add_queue(); } - srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.add_queue(qsize); } + srsran::task_queue_handle make_task_queue(bool notify_mode) { return external_tasks.add_queue(notify_mode); } + srsran::task_queue_handle make_task_queue(uint32_t qsize, bool notify_mode) + { + return external_tasks.add_queue(qsize, notify_mode); + } //! Delays a task processing by duration_ms template @@ -124,7 +127,7 @@ public: sched->defer_callback(duration_ms, std::forward(func)); } void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); } - srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); } + srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(false); } private: task_scheduler* sched; @@ -141,7 +144,7 @@ public: { sched->notify_background_task_result(std::move(task)); } - srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); } + srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(false); } template void defer_callback(uint32_t duration_ms, F&& func) { diff --git a/lib/test/common/multiqueue_test.cc b/lib/test/common/multiqueue_test.cc index bb44f461d..d906b5571 100644 --- a/lib/test/common/multiqueue_test.cc +++ b/lib/test/common/multiqueue_test.cc @@ -32,7 +32,7 @@ int test_multiqueue() TESTASSERT(multiqueue.nof_queues() == 0); // test push/pop and size for one queue - queue_handle qid1 = multiqueue.add_queue(); + queue_handle qid1 = multiqueue.add_queue(true); TESTASSERT(qid1.active()); TESTASSERT(qid1.size() == 0 and qid1.empty()); TESTASSERT(multiqueue.nof_queues() == 1); @@ -45,7 +45,7 @@ int test_multiqueue() TESTASSERT(number == 2 and qid1.empty()); // test push/pop and size for two queues - queue_handle qid2 = multiqueue.add_queue(); + queue_handle qid2 = multiqueue.add_queue(true); TESTASSERT(qid2.active()); TESTASSERT(multiqueue.nof_queues() == 2 and qid1.active()); TESTASSERT(qid2.try_push(3).has_value()); @@ -55,7 +55,7 @@ int test_multiqueue() // check if erasing a queue breaks anything qid1.reset(); TESTASSERT(multiqueue.nof_queues() == 1 and not qid1.active()); - qid1 = multiqueue.add_queue(); + qid1 = multiqueue.add_queue(true); TESTASSERT(qid1.empty() and qid1.active()); TESTASSERT(qid2.size() == 1 and not qid2.empty()); multiqueue.wait_pop(&number); @@ -89,15 +89,15 @@ int test_multiqueue() // check that adding a queue of different capacity works { - qid1 = multiqueue.add_queue(); - qid2 = multiqueue.add_queue(); + qid1 = multiqueue.add_queue(true); + qid2 = multiqueue.add_queue(true); // remove first queue again qid1.reset(); TESTASSERT(multiqueue.nof_queues() == 1); // add queue with non-default capacity - auto qid3 = multiqueue.add_queue(10); + auto qid3 = multiqueue.add_queue(10, true); TESTASSERT(qid3.capacity() == 10); // make sure neither a new queue index is returned @@ -117,7 +117,7 @@ int test_multiqueue_threading() int capacity = 4, number = 0, start_number = 2, nof_pushes = capacity + 1; multiqueue_handler multiqueue(capacity); - auto qid1 = multiqueue.add_queue(); + auto qid1 = multiqueue.add_queue(true); auto push_blocking_func = [](queue_handle* qid, int start_value, int nof_pushes, bool* is_running) { for (int i = 0; i < nof_pushes; ++i) { qid->push(start_value + i); @@ -165,7 +165,7 @@ int test_multiqueue_threading2() int capacity = 4, start_number = 2, nof_pushes = capacity + 1; multiqueue_handler multiqueue(capacity); - auto qid1 = multiqueue.add_queue(); + auto qid1 = multiqueue.add_queue(true); auto push_blocking_func = [](queue_handle* qid, int start_value, int nof_pushes, bool* is_running) { for (int i = 0; i < nof_pushes; ++i) { qid->push(start_value + i); @@ -199,7 +199,7 @@ int test_multiqueue_threading3() int capacity = 4; multiqueue_handler multiqueue(capacity); - auto qid1 = multiqueue.add_queue(); + auto qid1 = multiqueue.add_queue(true); auto pop_blocking_func = [&multiqueue](bool* success) { int number = 0; bool ret = multiqueue.wait_pop(&number); @@ -235,10 +235,10 @@ int test_multiqueue_threading4() int capacity = 4; multiqueue_handler multiqueue(capacity); - auto qid1 = multiqueue.add_queue(); - auto qid2 = multiqueue.add_queue(); - auto qid3 = multiqueue.add_queue(); - auto qid4 = multiqueue.add_queue(); + auto qid1 = multiqueue.add_queue(true); + auto qid2 = multiqueue.add_queue(true); + auto qid3 = multiqueue.add_queue(true); + auto qid4 = multiqueue.add_queue(true); std::mutex mutex; int last_number = -1; auto pop_blocking_func = [&multiqueue, &last_number, &mutex](bool* success) { diff --git a/lib/test/common/network_utils_test.cc b/lib/test/common/network_utils_test.cc index 2a9290777..fcaf6dd12 100644 --- a/lib/test/common/network_utils_test.cc +++ b/lib/test/common/network_utils_test.cc @@ -23,7 +23,7 @@ struct rx_thread_tester { std::thread t; rx_thread_tester() : - task_queue(task_sched.make_task_queue()), + task_queue(task_sched.make_task_queue(true)), t([this]() { stop_token.store(false); while (not stop_token.load(std::memory_order_relaxed)) { diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 91a3a0bf3..ac299ddcc 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -41,9 +41,8 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) : pending_stack_metrics(64) { get_background_workers().set_nof_workers(2); - enb_task_queue = task_sched.make_task_queue(); - enb_task_queue.set_notify_mode(); - metrics_task_queue = task_sched.make_task_queue(); + enb_task_queue = task_sched.make_task_queue(true); + metrics_task_queue = task_sched.make_task_queue(false); // sync_queue is added in init() } @@ -116,8 +115,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_) } // add sync queue - sync_task_queue = task_sched.make_task_queue(args.sync_queue_size); - sync_task_queue.set_notify_mode(); + sync_task_queue = task_sched.make_task_queue(args.sync_queue_size, true); // Init all layers if (!mac.init(args.mac, rrc_cfg.cell_list, phy, &rlc, &rrc)) { diff --git a/srsenb/src/stack/gnb_stack_nr.cc b/srsenb/src/stack/gnb_stack_nr.cc index dec7e9d0a..a8ba3b3e4 100644 --- a/srsenb/src/stack/gnb_stack_nr.cc +++ b/srsenb/src/stack/gnb_stack_nr.cc @@ -26,12 +26,10 @@ gnb_stack_nr::gnb_stack_nr() : task_sched{512, 128}, thread("gNB"), rlc_logger(s m_gw.reset(new srsue::gw()); // m_gtpu.reset(new srsenb::gtpu()); - ue_task_queue = task_sched.make_task_queue(); - ue_task_queue.set_notify_mode(); - sync_task_queue = task_sched.make_task_queue(); - sync_task_queue.set_notify_mode(); - gw_task_queue = task_sched.make_task_queue(); - mac_task_queue = task_sched.make_task_queue(); + ue_task_queue = task_sched.make_task_queue(true); + sync_task_queue = task_sched.make_task_queue(true); + gw_task_queue = task_sched.make_task_queue(false); + mac_task_queue = task_sched.make_task_queue(false); } gnb_stack_nr::~gnb_stack_nr() diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index b3d330d8e..867b7b9b0 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -52,10 +52,9 @@ ue_stack_lte::ue_stack_lte() : tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD) { get_background_workers().set_nof_workers(2); - ue_task_queue = task_sched.make_task_queue(); - ue_task_queue.set_notify_mode(); - gw_queue_id = task_sched.make_task_queue(); - cfg_task_queue = task_sched.make_task_queue(); + ue_task_queue = task_sched.make_task_queue(true); + gw_queue_id = task_sched.make_task_queue(false); + cfg_task_queue = task_sched.make_task_queue(false); // sync_queue is added in init() } @@ -199,8 +198,7 @@ int ue_stack_lte::init(const stack_args_t& args_) } // add sync queue - sync_task_queue = task_sched.make_task_queue(args.sync_queue_size); - sync_task_queue.set_notify_mode(); + sync_task_queue = task_sched.make_task_queue(args.sync_queue_size, true); mac.init(phy, &rlc, &rrc); rlc.init(&pdcp, &rrc, &rrc_nr, task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */); diff --git a/srsue/src/stack/ue_stack_nr.cc b/srsue/src/stack/ue_stack_nr.cc index 59a54d3fc..ffbf784b8 100644 --- a/srsue/src/stack/ue_stack_nr.cc +++ b/srsue/src/stack/ue_stack_nr.cc @@ -33,11 +33,9 @@ ue_stack_nr::ue_stack_nr() : // setup logging for pool, RLC and PDCP byte_buffer_pool::get_instance()->enable_logger(true); - ue_task_queue = task_sched.make_task_queue(); - ue_task_queue.set_notify_mode(); - sync_task_queue = task_sched.make_task_queue(); - sync_task_queue.set_notify_mode(); - gw_task_queue = task_sched.make_task_queue(); + ue_task_queue = task_sched.make_task_queue(true); + sync_task_queue = task_sched.make_task_queue(true); + gw_task_queue = task_sched.make_task_queue(false); } ue_stack_nr::~ue_stack_nr()