diff --git a/lib/include/srslte/common/interfaces_common.h b/lib/include/srslte/common/interfaces_common.h index de49a659c..5afd183d0 100644 --- a/lib/include/srslte/common/interfaces_common.h +++ b/lib/include/srslte/common/interfaces_common.h @@ -85,6 +85,7 @@ public: virtual srslte::timer_handler::unique_timer get_unique_timer() = 0; virtual srslte::task_multiqueue::queue_handler make_task_queue() = 0; virtual void defer_callback(uint32_t duration_ms, std::function func) = 0; + virtual void defer_task(srslte::move_task_t func) = 0; virtual void enqueue_background_task(std::function task) = 0; virtual void notify_background_task_result(srslte::move_task_t task) = 0; }; diff --git a/lib/include/srslte/common/multiqueue.h b/lib/include/srslte/common/multiqueue.h index fb8db8923..1e28e6317 100644 --- a/lib/include/srslte/common/multiqueue.h +++ b/lib/include/srslte/common/multiqueue.h @@ -174,20 +174,12 @@ public: { std::unique_lock lock(mutex); while (running) { - // Round-robin for all queues - for (const queue_wrapper& q : queues) { - spin_idx = (spin_idx + 1) % queues.size(); - if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { - if (value) { - *value = std::move(queues[spin_idx].front()); - } - queues[spin_idx].pop(); - if (nof_threads_waiting > 0) { - lock.unlock(); - queues[spin_idx].cv_full.notify_one(); - } - return spin_idx; + if (round_robin_pop_(value)) { + if (nof_threads_waiting > 0) { + lock.unlock(); + queues[spin_idx].cv_full.notify_one(); } + return spin_idx; } nof_threads_waiting++; cv_empty.wait(lock); @@ -197,6 +189,24 @@ public: return -1; } + int try_pop(myobj* value) + { + std::unique_lock lock(mutex); + if (running) { + if (round_robin_pop_(value)) { + if (nof_threads_waiting > 0) { + lock.unlock(); + queues[spin_idx].cv_full.notify_one(); + } + return spin_idx; + } + // didn't find any task + return -1; + } + cv_exit.notify_one(); + return -1; + } + bool empty(int qidx) { std::lock_guard lck(mutex); @@ -237,6 +247,22 @@ public: private: bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } + bool round_robin_pop_(myobj* value) + { + // Round-robin for all queues + for (const queue_wrapper& q : queues) { + spin_idx = (spin_idx + 1) % queues.size(); + if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { + if (value) { + *value = std::move(queues[spin_idx].front()); + } + queues[spin_idx].pop(); + return true; + } + } + return false; + } + std::mutex mutex; std::condition_variable cv_empty, cv_exit; uint32_t spin_idx = 0; diff --git a/lib/include/srslte/test/ue_test_interfaces.h b/lib/include/srslte/test/ue_test_interfaces.h index 6f52898fc..d049897d8 100644 --- a/lib/include/srslte/test/ue_test_interfaces.h +++ b/lib/include/srslte/test/ue_test_interfaces.h @@ -29,6 +29,8 @@ namespace srsue { class stack_test_dummy : public stack_interface_rrc { public: + stack_test_dummy() { stack_queue_id = pending_tasks.add_queue(); } + srslte::timer_handler::unique_timer get_unique_timer() override { return timers.get_unique_timer(); } void start_cell_search() override {} void start_cell_select(const phy_interface_rrc_lte::phy_cell_t* cell) override {} @@ -40,9 +42,31 @@ public: { timers.defer_callback(duration_ms, func); } + void defer_task(srslte::move_task_t task) final { pending_tasks.push(stack_queue_id, std::move(task)); } + + // Testing utility functions + void call_on_every_tti(srslte::move_task_t t) { tti_callbacks.push_back(std::move(t)); } + void process_tasks() + { + // Make sure to process any stack pending tasks + srslte::move_task_t task; + while (pending_tasks.try_pop(&task) >= 0) { + task(); + } + } + void run_tti() + { + process_tasks(); + for (auto& t : tti_callbacks) { + t(); + } + timers.step_all(); + } - srslte::timer_handler timers{100}; - srslte::task_multiqueue pending_tasks; + srslte::timer_handler timers{100}; + srslte::task_multiqueue pending_tasks; + std::vector tti_callbacks; + int stack_queue_id = -1; }; class rlc_dummy_interface : public rlc_interface_mac diff --git a/lib/include/srslte/upper/pdcp.h b/lib/include/srslte/upper/pdcp.h index 12b3af2fd..d3477d1a3 100644 --- a/lib/include/srslte/upper/pdcp.h +++ b/lib/include/srslte/upper/pdcp.h @@ -65,16 +65,13 @@ public: void write_pdu_pcch(unique_byte_buffer_t sdu); private: - srsue::rlc_interface_pdcp* rlc = nullptr; - srsue::rrc_interface_pdcp* rrc = nullptr; - srsue::gw_interface_pdcp* gw = nullptr; - - typedef std::map pdcp_map_t; - typedef std::pair pdcp_map_pair_t; - + srsue::rlc_interface_pdcp* rlc = nullptr; + srsue::rrc_interface_pdcp* rrc = nullptr; + srsue::gw_interface_pdcp* gw = nullptr; srslte::task_handler_interface* task_executor = nullptr; srslte::log_ref pdcp_log; - pdcp_map_t pdcp_array, pdcp_array_mrb; + + std::map > pdcp_array, pdcp_array_mrb; // cache valid lcids to be checked from separate thread std::mutex cache_mutex; diff --git a/lib/src/upper/pdcp.cc b/lib/src/upper/pdcp.cc index 365295335..0b2a94740 100644 --- a/lib/src/upper/pdcp.cc +++ b/lib/src/upper/pdcp.cc @@ -36,14 +36,7 @@ pdcp::~pdcp() valid_lcids_cached.clear(); } // destroy all remaining entities - for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); ++it) { - delete (it->second); - } pdcp_array.clear(); - - for (pdcp_map_t::iterator it = pdcp_array_mrb.begin(); it != pdcp_array_mrb.end(); ++it) { - delete (it->second); - } pdcp_array_mrb.clear(); } @@ -58,8 +51,8 @@ void pdcp::stop() {} void pdcp::reestablish() { - for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); ++it) { - it->second->reestablish(); + for (auto& lcid_it : pdcp_array) { + lcid_it.second->reestablish(); } } @@ -77,11 +70,7 @@ void pdcp::reset() valid_lcids_cached.clear(); } // destroy all bearers - for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); /* post increment in erase */) { - it->second->reset(); - delete (it->second); - pdcp_array.erase(it++); - } + pdcp_array.clear(); } /******************************************************************************* @@ -114,7 +103,7 @@ void pdcp::write_sdu_mch(uint32_t lcid, unique_byte_buffer_t sdu) void pdcp::add_bearer(uint32_t lcid, pdcp_config_t cfg) { if (not valid_lcid(lcid)) { - if (not pdcp_array.insert(pdcp_map_pair_t(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log))) + if (not pdcp_array.insert(std::make_pair(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log))) .second) { pdcp_log->error("Error inserting PDCP entity in to array\n."); return; @@ -137,7 +126,7 @@ void pdcp::add_bearer(uint32_t lcid, pdcp_config_t cfg) void pdcp::add_bearer_mrb(uint32_t lcid, pdcp_config_t cfg) { if (not valid_mch_lcid(lcid)) { - if (not pdcp_array_mrb.insert(pdcp_map_pair_t(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log))) + if (not pdcp_array_mrb.insert(std::make_pair(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log))) .second) { pdcp_log->error("Error inserting PDCP entity in to array\n."); return; @@ -160,8 +149,6 @@ void pdcp::del_bearer(uint32_t lcid) valid_lcids_cached.erase(lcid); } if (valid_lcid(lcid)) { - pdcp_map_t::iterator it = pdcp_array.find(lcid); - delete (it->second); pdcp_array.erase(it); pdcp_log->warning("Deleted PDCP bearer %s\n", rrc->get_rb_name(lcid).c_str()); } else { @@ -174,19 +161,17 @@ void pdcp::change_lcid(uint32_t old_lcid, uint32_t new_lcid) // make sure old LCID exists and new LCID is still free if (valid_lcid(old_lcid) && not valid_lcid(new_lcid)) { // insert old PDCP entity into new LCID - pdcp_map_t::iterator it = pdcp_array.find(old_lcid); - pdcp_entity_lte* pdcp_entity = it->second; - if (not pdcp_array.insert(pdcp_map_pair_t(new_lcid, pdcp_entity)).second) { + std::lock_guard lock(cache_mutex); + auto it = pdcp_array.find(old_lcid); + std::unique_ptr pdcp_entity = std::move(it->second); + if (not pdcp_array.insert(std::make_pair(new_lcid, std::move(pdcp_entity))).second) { pdcp_log->error("Error inserting PDCP entity into array\n."); return; } - { - std::lock_guard lock(cache_mutex); - valid_lcids_cached.erase(old_lcid); - valid_lcids_cached.insert(new_lcid); - } // erase from old position pdcp_array.erase(it); + valid_lcids_cached.erase(old_lcid); + valid_lcids_cached.insert(new_lcid); pdcp_log->warning("Changed LCID of PDCP bearer from %d to %d\n", old_lcid, new_lcid); } else { pdcp_log->error( @@ -206,8 +191,8 @@ void pdcp::config_security(uint32_t lcid, as_security_config_t sec_cfg) void pdcp::config_security_all(as_security_config_t sec_cfg) { - for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); ++it) { - it->second->config_security(sec_cfg); + for (auto& it : pdcp_array) { + it.second->config_security(sec_cfg); } } @@ -284,11 +269,7 @@ bool pdcp::valid_lcid(uint32_t lcid) return false; } - if (pdcp_array.find(lcid) == pdcp_array.end()) { - return false; - } - - return true; + return pdcp_array.find(lcid) != pdcp_array.end(); } bool pdcp::valid_mch_lcid(uint32_t lcid) @@ -298,11 +279,7 @@ bool pdcp::valid_mch_lcid(uint32_t lcid) return false; } - if (pdcp_array_mrb.find(lcid) == pdcp_array_mrb.end()) { - return false; - } - - return true; + return pdcp_array_mrb.find(lcid) != pdcp_array_mrb.end(); } } // namespace srslte diff --git a/lib/src/upper/pdcp_entity_lte.cc b/lib/src/upper/pdcp_entity_lte.cc index 2f64df338..40ab6d29c 100644 --- a/lib/src/upper/pdcp_entity_lte.cc +++ b/lib/src/upper/pdcp_entity_lte.cc @@ -36,7 +36,10 @@ pdcp_entity_lte::pdcp_entity_lte(srsue::rlc_interface_pdcp* rlc_, { } -pdcp_entity_lte::~pdcp_entity_lte() {} +pdcp_entity_lte::~pdcp_entity_lte() +{ + reset(); +} void pdcp_entity_lte::init(uint32_t lcid_, pdcp_config_t cfg_) { @@ -87,10 +90,10 @@ void pdcp_entity_lte::reestablish() // Used to stop/pause the entity (called on RRC conn release) void pdcp_entity_lte::reset() { - active = false; - if (log) { + if (active and log) { log->debug("Reset %s\n", rrc->get_rb_name(lcid).c_str()); } + active = false; } // GW/RRC interface diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index 2a2eb00f5..b92ab2413 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -117,6 +117,7 @@ public: void defer_callback(uint32_t duration_ms, std::function func) final; void enqueue_background_task(std::function task) final; void notify_background_task_result(srslte::move_task_t task) final; + void defer_task(srslte::move_task_t task) final; private: static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD @@ -164,7 +165,9 @@ private: // state bool started = false; srslte::task_multiqueue pending_tasks; - int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1; + int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1, + stack_queue_id = -1; + std::vector deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking srslte::block_queue pending_stack_metrics; }; diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 0544f5c6a..c3d6e66bd 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -35,11 +35,12 @@ enb_stack_lte::enb_stack_lte(srslte::logger* logger_) : pdcp(this, "PDCP"), thread("STACK") { - enb_queue_id = pending_tasks.add_queue(); - sync_queue_id = pending_tasks.add_queue(); - mme_queue_id = pending_tasks.add_queue(); - gtpu_queue_id = pending_tasks.add_queue(); - mac_queue_id = pending_tasks.add_queue(); + enb_queue_id = pending_tasks.add_queue(); + sync_queue_id = pending_tasks.add_queue(); + mme_queue_id = pending_tasks.add_queue(); + gtpu_queue_id = pending_tasks.add_queue(); + mac_queue_id = pending_tasks.add_queue(); + stack_queue_id = pending_tasks.add_queue(); pool = byte_buffer_pool::get_instance(); } @@ -135,6 +136,10 @@ void enb_stack_lte::tti_clock() void enb_stack_lte::tti_clock_impl() { + for (auto& t : deferred_stack_tasks) { + t(); + } + deferred_stack_tasks.clear(); timers.step_all(); rrc.tti_clock(); } @@ -276,4 +281,9 @@ void enb_stack_lte::notify_background_task_result(srslte::move_task_t task) task(); } +void enb_stack_lte::defer_task(srslte::move_task_t task) +{ + deferred_stack_tasks.push_back(std::move(task)); +} + } // namespace srsenb diff --git a/srsue/hdr/stack/ue_stack_lte.h b/srsue/hdr/stack/ue_stack_lte.h index f0e56dfe9..cc3b287c0 100644 --- a/srsue/hdr/stack/ue_stack_lte.h +++ b/srsue/hdr/stack/ue_stack_lte.h @@ -127,6 +127,7 @@ public: void enqueue_background_task(std::function f) final; void notify_background_task_result(srslte::move_task_t task) final; void defer_callback(uint32_t duration_ms, std::function func) final; + void defer_task(srslte::move_task_t task) final; private: void run_thread() final; @@ -156,16 +157,6 @@ private: srslte::log_ref nas_log{"NAS"}; srslte::log_ref pool_log{"POOL"}; - // stack components - srsue::mac mac; - srslte::mac_pcap mac_pcap; - srslte::nas_pcap nas_pcap; - srslte::rlc rlc; - srslte::pdcp pdcp; - srsue::rrc rrc; - srsue::nas nas; - std::unique_ptr usim; - // RAT-specific interfaces phy_interface_stack_lte* phy = nullptr; gw_interface_stack* gw = nullptr; @@ -173,12 +164,23 @@ private: // Thread static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD srslte::task_multiqueue pending_tasks; - int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1; - srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks + int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, stack_queue_id = -1, background_queue_id = -1; + srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks + std::vector deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking srslte::block_queue pending_stack_metrics; // TTI stats srslte::tprof tti_tprof; + + // stack components + srsue::mac mac; + srslte::mac_pcap mac_pcap; + srslte::nas_pcap nas_pcap; + srslte::rlc rlc; + srslte::pdcp pdcp; + srsue::rrc rrc; + srsue::nas nas; + std::unique_ptr usim; }; } // namespace srsue diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index 4cd8a3326..7591408a2 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -50,7 +50,7 @@ ue_stack_lte::ue_stack_lte() : ue_queue_id = pending_tasks.add_queue(); sync_queue_id = pending_tasks.add_queue(); gw_queue_id = pending_tasks.add_queue(); - mac_queue_id = pending_tasks.add_queue(); + stack_queue_id = pending_tasks.add_queue(); background_queue_id = pending_tasks.add_queue(); background_tasks.start(); @@ -288,6 +288,12 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump) } current_tti = tti_point{tti}; + // Perform pending stack deferred tasks + for (auto& task : deferred_stack_tasks) { + task(); + } + deferred_stack_tasks.clear(); + // perform tasks for the received TTI range for (uint32_t i = 0; i < tti_jump; ++i) { uint32_t next_tti = TTI_SUB(tti, (tti_jump - i - 1)); @@ -332,6 +338,11 @@ void ue_stack_lte::defer_callback(uint32_t duration_ms, std::function fu timers.defer_callback(duration_ms, func); } +void ue_stack_lte::defer_task(srslte::move_task_t task) +{ + deferred_stack_tasks.push_back(std::move(task)); +} + /******************** * RRC Interface *******************/ diff --git a/srsue/test/ttcn3/hdr/ttcn3_syssim.h b/srsue/test/ttcn3/hdr/ttcn3_syssim.h index 892b6e501..cf36c1a2c 100644 --- a/srsue/test/ttcn3/hdr/ttcn3_syssim.h +++ b/srsue/test/ttcn3/hdr/ttcn3_syssim.h @@ -212,8 +212,8 @@ public: log->step(tti); log->debug("Start TTI\n"); - // Make sure to step SS timers - step_timer(); + // Make sure to step SS + step_stack(); // inform UE about new TTI ue->set_current_tti(tti); @@ -907,7 +907,7 @@ public: ue->new_tb(dl_grant, (const uint8_t*)pdu->msg); } - void step_timer() { stack.timers.step_all(); } + void step_stack() { stack.run_tti(); } void add_srb(const ttcn3_helpers::timing_info_t timing, const uint32_t lcid, const pdcp_config_t pdcp_config) {