From d1236fd62f78b555cf61690d96a983fae1dbe09d Mon Sep 17 00:00:00 2001 From: Francisco Date: Mon, 8 Mar 2021 09:50:39 +0000 Subject: [PATCH] stack,optimization - replaced previous block_queue design for new bounded queue in several places in the enb --- lib/include/srslte/common/mac_pcap_base.h | 12 +++++----- lib/include/srslte/mac/pdu_queue.h | 7 +++--- lib/include/srslte/upper/rlc_common.h | 10 ++++----- lib/src/common/mac_pcap.cc | 2 +- lib/src/common/mac_pcap_base.cc | 8 +++---- lib/src/common/mac_pcap_net.cc | 2 +- lib/src/mac/pdu_queue.cc | 6 ++--- lib/test/upper/rlc_stress_test.cc | 3 ++- srsenb/hdr/stack/enb_stack_lte.h | 2 +- srsenb/hdr/stack/mac/mac.h | 4 ++-- srsenb/hdr/stack/rrc/rrc.h | 7 +++--- srsenb/src/stack/enb_stack_lte.cc | 9 +++++--- srsenb/src/stack/mac/mac.cc | 7 ++++-- srsenb/src/stack/rrc/rrc.cc | 27 ++++++++++++++++------- 14 files changed, 62 insertions(+), 44 deletions(-) diff --git a/lib/include/srslte/common/mac_pcap_base.h b/lib/include/srslte/common/mac_pcap_base.h index 67fd706b0..35eb3b748 100644 --- a/lib/include/srslte/common/mac_pcap_base.h +++ b/lib/include/srslte/common/mac_pcap_base.h @@ -13,7 +13,7 @@ #ifndef SRSLTE_MAC_PCAP_BASE_H #define SRSLTE_MAC_PCAP_BASE_H -#include "srslte/common/block_queue.h" +#include "srslte/adt/circular_buffer.h" #include "srslte/common/buffer_pool.h" #include "srslte/common/common.h" #include "srslte/common/pcap.h" @@ -93,11 +93,11 @@ protected: virtual void write_pdu(pcap_pdu_t& pdu) = 0; void run_thread() final; - std::mutex mutex; - srslog::basic_logger& logger; - bool running = false; - block_queue queue; - uint16_t ue_id = 0; + std::mutex mutex; + srslog::basic_logger& logger; + bool running = false; + static_block_queue queue; + uint16_t ue_id = 0; private: void pack_and_queue(uint8_t* payload, diff --git a/lib/include/srslte/mac/pdu_queue.h b/lib/include/srslte/mac/pdu_queue.h index 1e3ceca47..5b968f239 100644 --- a/lib/include/srslte/mac/pdu_queue.h +++ b/lib/include/srslte/mac/pdu_queue.h @@ -13,6 +13,7 @@ #ifndef SRSLTE_PDU_QUEUE_H #define SRSLTE_PDU_QUEUE_H +#include "srslte/adt/circular_buffer.h" #include "srslte/common/block_queue.h" #include "srslte/common/buffer_pool.h" #include "srslte/common/log.h" @@ -34,7 +35,7 @@ public: }; pdu_queue(srslog::basic_logger& logger, uint32_t pool_size = DEFAULT_POOL_SIZE) : - pool(pool_size), callback(NULL), logger(logger) + pool(pool_size), callback(NULL), logger(logger), pdu_q(pool_size) {} void init(process_callback* callback); @@ -60,8 +61,8 @@ private: } pdu_t; - block_queue pdu_q; - buffer_pool pool; + dyn_block_queue pdu_q; + buffer_pool pool; process_callback* callback; srslog::basic_logger& logger; diff --git a/lib/include/srslte/upper/rlc_common.h b/lib/include/srslte/upper/rlc_common.h index 54eca35f3..1b33ac886 100644 --- a/lib/include/srslte/upper/rlc_common.h +++ b/lib/include/srslte/upper/rlc_common.h @@ -13,7 +13,7 @@ #ifndef SRSLTE_RLC_COMMON_H #define SRSLTE_RLC_COMMON_H -#include "srslte/common/block_queue.h" +#include "srslte/adt/circular_buffer.h" #include "srslte/common/logmap.h" #include "srslte/interfaces/rlc_interface_types.h" #include "srslte/upper/rlc_metrics.h" @@ -219,13 +219,13 @@ public: } pdu_t p; // Do not block - while (rx_pdu_resume_queue.try_pop(&p)) { + while (rx_pdu_resume_queue.try_pop(p)) { write_pdu(p.payload, p.nof_bytes); free(p.payload); } unique_byte_buffer_t s; - while (tx_sdu_resume_queue.try_pop(&s)) { + while (tx_sdu_resume_queue.try_pop(s)) { write_sdu(std::move(s)); } suspended = false; @@ -303,8 +303,8 @@ private: uint32_t nof_bytes; } pdu_t; - block_queue rx_pdu_resume_queue; - block_queue tx_sdu_resume_queue{256}; + static_block_queue rx_pdu_resume_queue; + static_block_queue tx_sdu_resume_queue; }; } // namespace srslte diff --git a/lib/src/common/mac_pcap.cc b/lib/src/common/mac_pcap.cc index 650542ab9..01d96e8d6 100644 --- a/lib/src/common/mac_pcap.cc +++ b/lib/src/common/mac_pcap.cc @@ -58,7 +58,7 @@ uint32_t mac_pcap::close() // tell writer thread to stop running = false; pcap_pdu_t pdu = {}; - queue.push(std::move(pdu)); + queue.push_blocking(std::move(pdu)); } wait_thread_finish(); diff --git a/lib/src/common/mac_pcap_base.cc b/lib/src/common/mac_pcap_base.cc index 979cfaab8..2cd2483e4 100644 --- a/lib/src/common/mac_pcap_base.cc +++ b/lib/src/common/mac_pcap_base.cc @@ -37,7 +37,7 @@ void mac_pcap_base::run_thread() { // blocking write until stopped while (running) { - pcap_pdu_t pdu = queue.wait_pop(); + pcap_pdu_t pdu = queue.pop_blocking(); { std::lock_guard lock(mutex); write_pdu(pdu); @@ -47,7 +47,7 @@ void mac_pcap_base::run_thread() // write remainder of queue std::lock_guard lock(mutex); pcap_pdu_t pdu = {}; - while (queue.try_pop(&pdu)) { + while (queue.try_pop(pdu)) { write_pdu(pdu); } } @@ -84,7 +84,7 @@ void mac_pcap_base::pack_and_queue(uint8_t* payload, // copy payload into PDU buffer memcpy(pdu.pdu->msg, payload, payload_len); pdu.pdu->N_bytes = payload_len; - queue.push(std::move(pdu)); + queue.push_blocking(std::move(pdu)); } else { logger.info("Dropping PDU in PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len); } @@ -119,7 +119,7 @@ void mac_pcap_base::pack_and_queue_nr(uint8_t* payload, // copy payload into PDU buffer memcpy(pdu.pdu->msg, payload, payload_len); pdu.pdu->N_bytes = payload_len; - queue.push(std::move(pdu)); + queue.push_blocking(std::move(pdu)); } else { logger.info("Dropping PDU in NR PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len); } diff --git a/lib/src/common/mac_pcap_net.cc b/lib/src/common/mac_pcap_net.cc index 3b098be7b..08352e25d 100644 --- a/lib/src/common/mac_pcap_net.cc +++ b/lib/src/common/mac_pcap_net.cc @@ -71,7 +71,7 @@ uint32_t mac_pcap_net::close() // tell writer thread to stop running = false; pcap_pdu_t pdu = {}; - queue.push(std::move(pdu)); + queue.push_blocking(std::move(pdu)); } wait_thread_finish(); diff --git a/lib/src/mac/pdu_queue.cc b/lib/src/mac/pdu_queue.cc index 089852669..77b790cb2 100644 --- a/lib/src/mac/pdu_queue.cc +++ b/lib/src/mac/pdu_queue.cc @@ -58,7 +58,7 @@ void pdu_queue::push(const uint8_t* ptr, uint32_t len, channel_t channel) pdu_t* pdu = (pdu_t*)ptr; pdu->len = len; pdu->channel = channel; - pdu_q.push(pdu); + pdu_q.push_blocking(pdu); } else { logger.warning("Error pushing pdu: ptr is empty"); } @@ -69,7 +69,7 @@ bool pdu_queue::process_pdus() bool have_data = false; uint32_t cnt = 0; pdu_t* pdu; - while (pdu_q.try_pop(&pdu)) { + while (pdu_q.try_pop(pdu)) { if (callback) { callback->process_pdu(pdu->ptr, pdu->len, pdu->channel); } @@ -86,7 +86,7 @@ bool pdu_queue::process_pdus() void pdu_queue::reset() { pdu_t* pdu; - while (pdu_q.try_pop(&pdu)) { + while (pdu_q.try_pop(pdu)) { // nop } } diff --git a/lib/test/upper/rlc_stress_test.cc b/lib/test/upper/rlc_stress_test.cc index 3e6d39765..4313f327e 100644 --- a/lib/test/upper/rlc_stress_test.cc +++ b/lib/test/upper/rlc_stress_test.cc @@ -10,6 +10,7 @@ * */ +#include "srslte/common/block_queue.h" #include "srslte/common/crash_handler.h" #include "srslte/common/log_filter.h" #include "srslte/common/rlc_pcap.h" @@ -459,7 +460,7 @@ void stress_test(stress_test_args_t args) if (args.rat == "LTE") { if (args.mode == "AM") { // config RLC AM bearer - cnfg_ = rlc_config_t::default_rlc_am_config(); + cnfg_ = rlc_config_t::default_rlc_am_config(); cnfg_.am.max_retx_thresh = args.max_retx; } else if (args.mode == "UM") { // config UM bearer diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index 93c1a6650..a67a67eb2 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -162,7 +162,7 @@ private: // state bool started = false; - srslte::block_queue pending_stack_metrics; + srslte::dyn_block_queue pending_stack_metrics; }; } // namespace srsenb diff --git a/srsenb/hdr/stack/mac/mac.h b/srsenb/hdr/stack/mac/mac.h index e6c2ee4a2..8782d7705 100644 --- a/srsenb/hdr/stack/mac/mac.h +++ b/srsenb/hdr/stack/mac/mac.h @@ -140,8 +140,8 @@ private: std::map > ue_db, ues_to_rem; uint16_t last_rnti = 70; - srslte::block_queue > ue_pool; ///< Pool of pre-allocated UE objects - void prealloc_ue(uint32_t nof_ue); + srslte::static_block_queue, 32> ue_pool; ///< Pool of pre-allocated UE objects + void prealloc_ue(uint32_t nof_ue); uint8_t* assemble_rar(sched_interface::dl_sched_rar_grant_t* grants, uint32_t enb_cc_idx, diff --git a/srsenb/hdr/stack/rrc/rrc.h b/srsenb/hdr/stack/rrc/rrc.h index 75328750b..3cd429ef7 100644 --- a/srsenb/hdr/stack/rrc/rrc.h +++ b/srsenb/hdr/stack/rrc/rrc.h @@ -17,8 +17,8 @@ #include "rrc_cell_cfg.h" #include "rrc_metrics.h" #include "srsenb/hdr/stack/upper/common_enb.h" +#include "srslte/adt/circular_buffer.h" #include "srslte/adt/mem_pool.h" -#include "srslte/common/block_queue.h" #include "srslte/common/buffer_pool.h" #include "srslte/common/common.h" #include "srslte/common/logmap.h" @@ -28,7 +28,6 @@ #include "srslte/interfaces/enb_rrc_interfaces.h" #include "srslte/srslog/srslog.h" #include -#include namespace srsenb { @@ -190,8 +189,8 @@ private: const static uint32_t LCID_ACT_USER = 0xffff0004; const static uint32_t LCID_RTX_USER = 0xffff0005; - bool running = false; - srslte::block_queue rx_pdu_queue; + bool running = false; + srslte::dyn_block_queue rx_pdu_queue; asn1::rrc::mcch_msg_s mcch; bool enable_mbms = false; diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 237e2d1df..3acaa65af 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -36,7 +36,8 @@ enb_stack_lte::enb_stack_lte(srslte::logger* logger_, srslog::sink& log_sink) : s1ap(&task_sched, s1ap_logger), rrc(&task_sched), logger(logger_), - mac_pcap() + mac_pcap(), + pending_stack_metrics(64) { get_background_workers().set_nof_workers(2); enb_task_queue = task_sched.make_task_queue(); @@ -218,12 +219,14 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics) } rrc.get_metrics(metrics.rrc); s1ap.get_metrics(metrics.s1ap); - pending_stack_metrics.push(metrics); + if (not pending_stack_metrics.try_push(metrics)) { + stack_logger.error("Unable to push metrics to queue"); + } }); if (ret.first) { // wait for result - *metrics = pending_stack_metrics.wait_pop(); + *metrics = pending_stack_metrics.pop_blocking(); return true; } return false; diff --git a/srsenb/src/stack/mac/mac.cc b/srsenb/src/stack/mac/mac.cc index f7482950b..fc55aab1e 100644 --- a/srsenb/src/stack/mac/mac.cc +++ b/srsenb/src/stack/mac/mac.cc @@ -461,7 +461,7 @@ uint16_t mac::allocate_ue() logger.error("Ignoring RACH attempt. UE pool empty."); return SRSLTE_INVALID_RNTI; } - std::unique_ptr ue_ptr = ue_pool.wait_pop(); + std::unique_ptr ue_ptr = ue_pool.pop_blocking(); uint16_t rnti = ue_ptr->get_rnti(); // Set PCAP if available @@ -562,7 +562,10 @@ void mac::prealloc_ue(uint32_t nof_ue) for (uint32_t i = 0; i < nof_ue; i++) { std::unique_ptr ptr = std::unique_ptr( new ue(allocate_rnti(), args.nof_prb, &scheduler, rrc_h, rlc_h, phy_h, log_h, logger, cells.size())); - ue_pool.push(std::move(ptr)); + if (not ue_pool.try_push(std::move(ptr))) { + logger.info("Cannot preallocate more UEs as pool is full"); + return; + } } } diff --git a/srsenb/src/stack/rrc/rrc.cc b/srsenb/src/stack/rrc/rrc.cc index 938618813..37233748a 100644 --- a/srsenb/src/stack/rrc/rrc.cc +++ b/srsenb/src/stack/rrc/rrc.cc @@ -30,7 +30,8 @@ using namespace asn1::rrc; namespace srsenb { -rrc::rrc(srslte::task_sched_handle task_sched_) : logger(srslog::fetch_basic_logger("RRC")), task_sched(task_sched_) +rrc::rrc(srslte::task_sched_handle task_sched_) : + logger(srslog::fetch_basic_logger("RRC")), task_sched(task_sched_), rx_pdu_queue(64) { pending_paging.clear(); ue_pool.reserve(16); @@ -89,7 +90,7 @@ void rrc::stop() if (running) { running = false; rrc_pdu p = {0, LCID_EXIT, nullptr}; - rx_pdu_queue.push(std::move(p)); + rx_pdu_queue.push_blocking(std::move(p)); } users.clear(); } @@ -127,13 +128,17 @@ uint8_t* rrc::read_pdu_bcch_dlsch(const uint8_t cc_idx, const uint32_t sib_index void rrc::set_activity_user(uint16_t rnti) { rrc_pdu p = {rnti, LCID_ACT_USER, nullptr}; - rx_pdu_queue.push(std::move(p)); + if (not rx_pdu_queue.try_push(std::move(p))) { + logger.error("Failed to push UE activity command to RRC queue"); + } } void rrc::rem_user_thread(uint16_t rnti) { rrc_pdu p = {rnti, LCID_REM_USER, nullptr}; - rx_pdu_queue.push(std::move(p)); + if (not rx_pdu_queue.try_push(std::move(p))) { + logger.error("Failed to push UE remove command to RRC queue"); + } } uint32_t rrc::get_nof_users() @@ -144,7 +149,9 @@ uint32_t rrc::get_nof_users() void rrc::max_retx_attempted(uint16_t rnti) { rrc_pdu p = {rnti, LCID_RTX_USER, nullptr}; - rx_pdu_queue.push(std::move(p)); + if (not rx_pdu_queue.try_push(std::move(p))) { + logger.error("Failed to push max Retx event to RRC queue"); + } } // This function is called from PRACH worker (can wait) @@ -241,7 +248,9 @@ void rrc::send_rrc_connection_reject(uint16_t rnti) void rrc::write_pdu(uint16_t rnti, uint32_t lcid, srslte::unique_byte_buffer_t pdu) { rrc_pdu p = {rnti, lcid, std::move(pdu)}; - rx_pdu_queue.push(std::move(p)); + if (not rx_pdu_queue.try_push(std::move(p))) { + logger.error("Failed to push Release command to RRC queue"); + } } /******************************************************************************* @@ -276,7 +285,9 @@ void rrc::write_dl_info(uint16_t rnti, srslte::unique_byte_buffer_t sdu) void rrc::release_complete(uint16_t rnti) { rrc_pdu p = {rnti, LCID_REL_USER, nullptr}; - rx_pdu_queue.push(std::move(p)); + if (not rx_pdu_queue.try_push(std::move(p))) { + logger.error("Failed to push Release command to RRC queue"); + } } bool rrc::setup_ue_ctxt(uint16_t rnti, const asn1::s1ap::init_context_setup_request_s& msg) @@ -963,7 +974,7 @@ void rrc::tti_clock() { // pop cmds from queue rrc_pdu p; - while (rx_pdu_queue.try_pop(&p)) { + while (rx_pdu_queue.try_pop(p)) { // print Rx PDU if (p.pdu != nullptr) { logger.info(p.pdu->msg, p.pdu->N_bytes, "Rx %s PDU", to_string((rb_id_t)p.lcid));