From f0ed1e06a826c58650c14947c07a5fe042487c3a Mon Sep 17 00:00:00 2001 From: Francisco Date: Mon, 8 Mar 2021 11:37:12 +0000 Subject: [PATCH] documentation,bugfix - added documentation to new blocking queues, changed class names, and removed blocking pushes from the critical path --- lib/include/srslte/adt/circular_buffer.h | 90 +++++++++++++++++--- lib/include/srslte/common/mac_pcap_base.h | 10 +-- lib/include/srslte/mac/pdu_queue.h | 4 +- lib/include/srslte/upper/byte_buffer_queue.h | 4 +- lib/include/srslte/upper/rlc_common.h | 4 +- lib/src/common/mac_pcap_base.cc | 8 +- lib/test/adt/circular_buffer_test.cc | 4 +- srsenb/hdr/stack/enb_stack_lte.h | 2 +- srsenb/hdr/stack/mac/mac.h | 4 +- srsenb/hdr/stack/rrc/rrc.h | 4 +- 10 files changed, 102 insertions(+), 32 deletions(-) diff --git a/lib/include/srslte/adt/circular_buffer.h b/lib/include/srslte/adt/circular_buffer.h index fa29655f4..7c4076a15 100644 --- a/lib/include/srslte/adt/circular_buffer.h +++ b/lib/include/srslte/adt/circular_buffer.h @@ -39,6 +39,14 @@ size_t get_max_size(const std::vector& a) return a.capacity(); } +/** + * Base common class for definition of circular buffer data structures with the following features: + * - no allocations while pushing/popping new elements. Just an internal index update + * - it provides helper methods to add/remove objects + * - it provides an iterator interface to iterate over added elements in the buffer + * - not thread-safe + * @tparam Container underlying container type used as buffer (e.g. std::array or std::vector) + */ template class base_circular_buffer { @@ -85,9 +93,16 @@ public: }; template - base_circular_buffer(Args&&... args) : buffer(std::forward(args)...) + explicit base_circular_buffer(Args&&... args) : buffer(std::forward(args)...) {} + bool try_push(T&& t) + { + if (full()) { + return false; + } + push(std::move(t)); + } void push(T&& t) { assert(not full()); @@ -95,6 +110,13 @@ public: buffer[wpos] = std::move(t); count++; } + bool try_push(const T& t) + { + if (full()) { + return false; + } + push(t); + } void push(const T& t) { assert(not full()); @@ -148,17 +170,26 @@ struct noop_operator { } }; +/** + * Base common class for definition of blocking queue data structures with the following features: + * - it stores pushed/popped samples in an internal circular buffer + * - provides blocking and non-blocking push/pop APIs + * - thread-safe + * @tparam CircBuffer underlying circular buffer data type (e.g. static_circular_buffer or dyn_circular_buffer) + * @tparam PushingFunc function void(const T&) called while pushing an element to the queue + * @tparam PoppingFunc function void(const T&) called while popping an element from the queue + */ template -class base_block_queue +class base_blocking_queue { using T = typename CircBuffer::value_type; public: template - base_block_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) : + base_blocking_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) : circ_buffer(std::forward(args)...), push_func(push_func_), pop_func(pop_func_) {} - ~base_block_queue() { stop(); } + ~base_blocking_queue() { stop(); } void stop() { @@ -323,10 +354,24 @@ protected: } // namespace detail +/** + * Circular buffer with fixed, embedded buffer storage via a std::array. + * - Single allocation at object creation for std::array. Given that the buffer size is known at compile-time, the + * circular iteration over the buffer may be more optimized (e.g. when N is a power of 2, % operator can be avoided) + * - not thread-safe + * @tparam T value type stored by buffer + * @tparam N size of the queue + */ template class static_circular_buffer : public detail::base_circular_buffer > {}; +/** + * Circular buffer with buffer storage via a std::vector. + * - size can be defined at run-time. + * - not thread-safe + * @tparam T value type stored by buffer + */ template class dyn_circular_buffer : public detail::base_circular_buffer > { @@ -344,31 +389,52 @@ public: } }; +/** + * Blocking queue with fixed, embedded buffer storage via a std::array. + * - Blocking push/pop API via push_blocking(...) and pop_blocking(...) methods + * - Non-blocking push/pop API via try_push(...) and try_pop(...) methods + * - Only one initial allocation for the std::array + * - thread-safe + * @tparam T value type stored by buffer + * @tparam N size of queue + * @tparam PushingCallback function void(const T&) called while pushing an element to the queue + * @tparam PoppingCallback function void(const T&) called while popping an element from the queue + */ template -class static_block_queue - : public detail::base_block_queue, PushingCallback, PoppingCallback> +class static_blocking_queue + : public detail::base_blocking_queue, PushingCallback, PoppingCallback> { - using base_t = detail::base_block_queue, PushingCallback, PoppingCallback>; + using base_t = detail::base_blocking_queue, PushingCallback, PoppingCallback>; public: - explicit static_block_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : + explicit static_blocking_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : base_t(push_callback, pop_callback) {} }; +/** + * Blocking queue with buffer storage represented via a std::vector. Features: + * - Blocking push/pop API via push_blocking(...) and pop_blocking(...) methods + * - Non-blocking push/pop API via try_push(...) and try_pop(...) methods + * - Size can be defined at runtime. + * - thread-safe + * @tparam T value type stored by buffer + * @tparam PushingCallback function void(const T&) called while pushing an element to the queue + * @tparam PoppingCallback function void(const T&) called while popping an element from the queue + */ template -class dyn_block_queue : public detail::base_block_queue, PushingCallback, PoppingCallback> +class dyn_blocking_queue : public detail::base_blocking_queue, PushingCallback, PoppingCallback> { - using base_t = detail::base_block_queue, PushingCallback, PoppingCallback>; + using base_t = detail::base_blocking_queue, PushingCallback, PoppingCallback>; public: - dyn_block_queue() = default; - explicit dyn_block_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : + dyn_blocking_queue() = default; + explicit dyn_blocking_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : base_t(push_callback, pop_callback, size) {} void set_size(size_t size) { base_t::circ_buffer.set_size(size); } diff --git a/lib/include/srslte/common/mac_pcap_base.h b/lib/include/srslte/common/mac_pcap_base.h index 35eb3b748..fb321d7df 100644 --- a/lib/include/srslte/common/mac_pcap_base.h +++ b/lib/include/srslte/common/mac_pcap_base.h @@ -93,11 +93,11 @@ protected: virtual void write_pdu(pcap_pdu_t& pdu) = 0; void run_thread() final; - std::mutex mutex; - srslog::basic_logger& logger; - bool running = false; - static_block_queue queue; - uint16_t ue_id = 0; + std::mutex mutex; + srslog::basic_logger& logger; + bool running = false; + static_blocking_queue queue; + uint16_t ue_id = 0; private: void pack_and_queue(uint8_t* payload, diff --git a/lib/include/srslte/mac/pdu_queue.h b/lib/include/srslte/mac/pdu_queue.h index 5b968f239..9a26ae1e5 100644 --- a/lib/include/srslte/mac/pdu_queue.h +++ b/lib/include/srslte/mac/pdu_queue.h @@ -61,8 +61,8 @@ private: } pdu_t; - dyn_block_queue pdu_q; - buffer_pool pool; + dyn_blocking_queue pdu_q; + buffer_pool pool; process_callback* callback; srslog::basic_logger& logger; diff --git a/lib/include/srslte/upper/byte_buffer_queue.h b/lib/include/srslte/upper/byte_buffer_queue.h index db8b050ed..f061f2c31 100644 --- a/lib/include/srslte/upper/byte_buffer_queue.h +++ b/lib/include/srslte/upper/byte_buffer_queue.h @@ -79,8 +79,8 @@ private: uint32_t* unread_bytes; }; - dyn_block_queue queue; - uint32_t unread_bytes = 0; + dyn_blocking_queue queue; + uint32_t unread_bytes = 0; }; } // namespace srslte diff --git a/lib/include/srslte/upper/rlc_common.h b/lib/include/srslte/upper/rlc_common.h index 1b33ac886..ab32faa53 100644 --- a/lib/include/srslte/upper/rlc_common.h +++ b/lib/include/srslte/upper/rlc_common.h @@ -303,8 +303,8 @@ private: uint32_t nof_bytes; } pdu_t; - static_block_queue rx_pdu_resume_queue; - static_block_queue tx_sdu_resume_queue; + static_blocking_queue rx_pdu_resume_queue; + static_blocking_queue tx_sdu_resume_queue; }; } // namespace srslte diff --git a/lib/src/common/mac_pcap_base.cc b/lib/src/common/mac_pcap_base.cc index 2cd2483e4..f89705e8d 100644 --- a/lib/src/common/mac_pcap_base.cc +++ b/lib/src/common/mac_pcap_base.cc @@ -84,7 +84,9 @@ void mac_pcap_base::pack_and_queue(uint8_t* payload, // copy payload into PDU buffer memcpy(pdu.pdu->msg, payload, payload_len); pdu.pdu->N_bytes = payload_len; - queue.push_blocking(std::move(pdu)); + if (not queue.try_push(std::move(pdu))) { + logger.error("Failed to push message to pcap writer queue"); + } } else { logger.info("Dropping PDU in PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len); } @@ -119,7 +121,9 @@ void mac_pcap_base::pack_and_queue_nr(uint8_t* payload, // copy payload into PDU buffer memcpy(pdu.pdu->msg, payload, payload_len); pdu.pdu->N_bytes = payload_len; - queue.push_blocking(std::move(pdu)); + if (not queue.try_push(std::move(pdu))) { + logger.error("Failed to push message to pcap writer queue"); + } } else { logger.info("Dropping PDU in NR PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len); } diff --git a/lib/test/adt/circular_buffer_test.cc b/lib/test/adt/circular_buffer_test.cc index cc7b4d3c7..a2a7e1387 100644 --- a/lib/test/adt/circular_buffer_test.cc +++ b/lib/test/adt/circular_buffer_test.cc @@ -66,7 +66,7 @@ int test_static_circular_buffer() int test_queue_block_api() { - dyn_block_queue queue(100); + dyn_blocking_queue queue(100); std::thread t([&queue]() { int count = 0; @@ -94,7 +94,7 @@ int test_queue_block_api_2() std::thread t; { - dyn_block_queue queue(100); + dyn_blocking_queue queue(100); t = std::thread([&queue]() { int count = 0; diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index a67a67eb2..f0ca26a6b 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -162,7 +162,7 @@ private: // state bool started = false; - srslte::dyn_block_queue pending_stack_metrics; + srslte::dyn_blocking_queue pending_stack_metrics; }; } // namespace srsenb diff --git a/srsenb/hdr/stack/mac/mac.h b/srsenb/hdr/stack/mac/mac.h index 8782d7705..82d9060c7 100644 --- a/srsenb/hdr/stack/mac/mac.h +++ b/srsenb/hdr/stack/mac/mac.h @@ -140,8 +140,8 @@ private: std::map > ue_db, ues_to_rem; uint16_t last_rnti = 70; - srslte::static_block_queue, 32> ue_pool; ///< Pool of pre-allocated UE objects - void prealloc_ue(uint32_t nof_ue); + srslte::static_blocking_queue, 32> ue_pool; ///< Pool of pre-allocated UE objects + void prealloc_ue(uint32_t nof_ue); uint8_t* assemble_rar(sched_interface::dl_sched_rar_grant_t* grants, uint32_t enb_cc_idx, diff --git a/srsenb/hdr/stack/rrc/rrc.h b/srsenb/hdr/stack/rrc/rrc.h index 3cd429ef7..b9c93c39c 100644 --- a/srsenb/hdr/stack/rrc/rrc.h +++ b/srsenb/hdr/stack/rrc/rrc.h @@ -189,8 +189,8 @@ private: const static uint32_t LCID_ACT_USER = 0xffff0004; const static uint32_t LCID_RTX_USER = 0xffff0005; - bool running = false; - srslte::dyn_block_queue rx_pdu_queue; + bool running = false; + srslte::dyn_blocking_queue rx_pdu_queue; asn1::rrc::mcch_msg_s mcch; bool enable_mbms = false;