From 4723dd0aa9c568e209c3205d3e80cf74602f6310 Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Mon, 22 Feb 2021 17:39:54 +0000 Subject: [PATCH] Converted undelivered sdus queue in the PDCP to use a vector. This was done to improve the performance of the notify_delivery(). --- lib/include/srslte/upper/pdcp_entity_lte.h | 155 ++++++++++++++++++++- lib/src/upper/pdcp_entity_lte.cc | 123 ++++++++-------- 2 files changed, 207 insertions(+), 71 deletions(-) diff --git a/lib/include/srslte/upper/pdcp_entity_lte.h b/lib/include/srslte/upper/pdcp_entity_lte.h index df60a0a1a..bcb92724e 100644 --- a/lib/include/srslte/upper/pdcp_entity_lte.h +++ b/lib/include/srslte/upper/pdcp_entity_lte.h @@ -99,6 +99,12 @@ private: uint32_t reordering_window = 0; uint32_t maximum_pdcp_sn = 0; + // PDU handlers + void handle_control_pdu(srslte::unique_byte_buffer_t pdu); + void handle_srb_pdu(srslte::unique_byte_buffer_t pdu); + void handle_um_drb_pdu(srslte::unique_byte_buffer_t pdu); + void handle_am_drb_pdu(srslte::unique_byte_buffer_t pdu); + // Discard callback (discardTimer) class discard_callback; std::vector discard_timers; @@ -106,13 +112,150 @@ private: void stop_discard_timer(uint32_t sn); // TX Queue - uint32_t maximum_allocated_sns_window = 2048; - std::map undelivered_sdus_queue; + uint32_t maximum_allocated_sns_window = 2048; + class undelivered_sdus_queue_t + { + public: + undelivered_sdus_queue_t() : sdus(capacity) {} - void handle_control_pdu(srslte::unique_byte_buffer_t pdu); - void handle_srb_pdu(srslte::unique_byte_buffer_t pdu); - void handle_um_drb_pdu(srslte::unique_byte_buffer_t pdu); - void handle_am_drb_pdu(srslte::unique_byte_buffer_t pdu); + bool empty() { return count == 0; } + + bool is_full() { return count >= capacity; } + + uint32_t size() { return count; } + + uint32_t get_capacity() { return capacity; } + + bool has_sdu(uint32_t sn) + { + if (sn >= capacity) { + return false; + } + return sdus[sn] != nullptr; + } + + bool add_sdu(uint32_t sn, const srslte::unique_byte_buffer_t& sdu) + { + assert(not has_sdu(sn)); + + if (is_full()) { + return false; + } + + // Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs + if (not empty()) { + int32_t diff = sn - fms; + if (diff > (int32_t)(capacity / 2)) { + return false; + } + if (diff <= 0 && diff > -((int32_t)(capacity / 2))) { + return false; + } + } + + // Allocate buffer and exit on error + srslte::unique_byte_buffer_t tmp = make_byte_buffer(); + if (tmp == nullptr) { + return false; + } + + // Update FMS and LMS if necessary + if (empty()) { + fms = sn; + lms = sn; + } else { + update_lms(sn); + } + // Add SDU + count++; + sdus[sn] = std::move(tmp); + memcpy(sdus[sn]->msg, sdu->msg, sdu->N_bytes); + sdus[sn]->N_bytes = sdu->N_bytes; + bytes += sdu->N_bytes; + sdus[sn]->set_timestamp(); // Metrics + return true; + } + + unique_byte_buffer_t& operator[](uint32_t sn) + { + assert(has_sdu(sn)); + return sdus[sn]; + } + + void clear_sdu(uint32_t sn) + { + assert(has_sdu(sn)); + if (has_sdu(sn)) { + count--; + bytes -= sdus[sn]->N_bytes; + sdus[sn] = nullptr; + } + // Find next FMS, + update_fms(); + } + + void clear() + { + count = 0; + bytes = 0; + fms = 0; + for (uint32_t sn = 0; sn < capacity; sn++) { + sdus[sn] = nullptr; + } + } + + uint32_t get_bytes() { return bytes; } + + uint32_t get_fms() { return fms; } + + void set_fms(uint32_t fms_) { fms = fms_; } + + void update_fms() + { + if (empty()) { + fms = increment_sn(fms); + return; + } + + for (uint32_t i = 0; i < capacity; ++i) { + uint32_t sn = increment_sn(fms + i); + if (has_sdu(sn)) { + fms = sn; + return; + } + } + + fms = increment_sn(fms); + } + + void update_lms(uint32_t sn) + { + if (empty()) { + lms = fms; + return; + } + + int32_t diff = sn - lms; + if (diff > 0 && sn > lms) { + lms = sn; + } else if (diff < 0 && sn < lms) { + lms = sn; + } + } + + uint32_t get_lms() { return lms; } + + private: + uint32_t increment_sn(uint32_t sn) { return (sn + 1) % capacity; } + uint32_t decrement_sn(uint32_t sn) { return (sn - 1) % capacity; } + + const static uint32_t capacity = 4096; + uint32_t count = 0; + uint32_t bytes = 0; + uint32_t fms = 0; + uint32_t lms = 0; + std::vector sdus; + } undelivered_sdus_queue; }; // Discard callback (discardTimer) diff --git a/lib/src/upper/pdcp_entity_lte.cc b/lib/src/upper/pdcp_entity_lte.cc index f03a7c82a..34eccb0e0 100644 --- a/lib/src/upper/pdcp_entity_lte.cc +++ b/lib/src/upper/pdcp_entity_lte.cc @@ -107,21 +107,7 @@ void pdcp_entity_lte::reestablish() st.rx_hfn = 0; st.next_pdcp_rx_sn = 0; } else { - // Send status report if required on reestablishment in RLC AM - // send_status_report(); - - // Re-transmit unacknowledged SDUs - /* - send_status_report(); - - // Re-transmit unacknowledged SDUs - std::map undelivered_sdus = std::move(undelivered_sdus_queue); - undelivered_sdus_queue.clear(); - - for (std::map::iterator it = undelivered_sdus.begin(); it != undelivered_sdus.end(); - ++it) { - write_sdu(std::move(it->second), it->first); - }*/ + // Sending the status report will be triggered by the RRC if required } } @@ -160,6 +146,7 @@ void pdcp_entity_lte::write_sdu(unique_byte_buffer_t sdu, int upper_sn) if (!rlc->rb_is_um(lcid)) { if (not store_sdu(used_sn, sdu)) { // Could not store the SDU, discarding + logger.error("Could not store SDU. Discarding %d\n", used_sn); return; } @@ -458,10 +445,11 @@ void pdcp_entity_lte::send_status_report() if (undelivered_sdus_queue.empty()) { fms = st.next_pdcp_tx_sn; } else { - fms = undelivered_sdus_queue.begin()->first; + fms = undelivered_sdus_queue.get_fms(); } - logger.debug("Status report: FMS=%d", fms); + // Get Last Missing Segment + uint32_t lms = undelivered_sdus_queue.get_lms(); // Allocate Status Report PDU unique_byte_buffer_t pdu = make_byte_buffer(); @@ -470,6 +458,7 @@ void pdcp_entity_lte::send_status_report() return; } + logger.debug("Status report: FMS=%d, LMS=%d", fms, lms); // Set control bit and type of PDU pdu->msg[0] = ((uint8_t)PDCP_DC_FIELD_CONTROL_PDU << 7) | ((uint8_t)PDCP_PDU_TYPE_STATUS_REPORT << 4); @@ -494,19 +483,20 @@ void pdcp_entity_lte::send_status_report() // Add bitmap of missing PDUs, if necessary if (not undelivered_sdus_queue.empty()) { // First check size of bitmap - uint32_t last_sn = undelivered_sdus_queue.rbegin()->first; - uint32_t bitmap_sz = std::ceil((float)(last_sn - (fms - 1)) / 8); + uint32_t bitmap_sz = std::ceil((float)(lms - (fms - 1)) / 8); memset(&pdu->msg[pdu->N_bytes], 0, bitmap_sz); logger.debug( - "Setting status report bitmap. Last SN acked=%d, Last SN acked in sequence=%d, Bitmap size in bytes=%d", - last_sn, + "Setting status report bitmap. Last missing SN=%d, Last SN acked in sequence=%d, Bitmap size in bytes=%d", + lms, fms - 1, bitmap_sz); - for (auto it = undelivered_sdus_queue.begin(); it != undelivered_sdus_queue.end(); it++) { - uint32_t offset = it->first - fms; - uint32_t bit_offset = offset % 8; - uint32_t byte_offset = offset / 8; - pdu->msg[pdu->N_bytes + byte_offset] |= 1 << (7 - bit_offset); + for (uint32_t sn = fms; sn <= lms; sn++) { + if (undelivered_sdus_queue.has_sdu(sn)) { + uint32_t offset = sn - fms; + uint32_t bit_offset = offset % 8; + uint32_t byte_offset = offset / 8; + pdu->msg[pdu->N_bytes + byte_offset] |= 1 << (7 - bit_offset); + } } pdu->N_bytes += bitmap_sz; } @@ -548,12 +538,10 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu) } // Remove all SDUs with SN smaller than FMS - for (auto it = undelivered_sdus_queue.begin(); it != undelivered_sdus_queue.end();) { - if (it->first < fms) { - stop_discard_timer(it->first); - it = undelivered_sdus_queue.erase(it); - } else { - ++it; + for (uint32_t sn = 0; sn < fms; sn++) { + if (sn < fms && undelivered_sdus_queue.has_sdu(sn)) { + stop_discard_timer(sn); + undelivered_sdus_queue.clear_sdu(sn); } } @@ -571,27 +559,32 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu) // Discard ACK'ed SDUs for (uint32_t sn : acked_sns) { logger.debug("Status report ACKed SN=%d.", sn); - undelivered_sdus_queue.erase(sn); + undelivered_sdus_queue.clear_sdu(sn); stop_discard_timer(sn); } } /**************************************************************************** * TX PDUs Queue Helper ***************************************************************************/ + bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu) { logger.debug("Storing SDU in undelivered SDUs queue. SN=%d, Queue size=%ld", sn, undelivered_sdus_queue.size()); // Check wether PDU is already in the queue - if (undelivered_sdus_queue.find(sn) != undelivered_sdus_queue.end()) { + if (undelivered_sdus_queue.has_sdu(sn)) { logger.error("PDU already exists in the queue. TX_COUNT=%d", sn); return false; } + if (undelivered_sdus_queue.is_full()) { + logger.error("Undelivered SDUs queue is full. TX_COUNT=%d", sn); + return false; + } + // Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs if (not undelivered_sdus_queue.empty()) { - auto fms_it = undelivered_sdus_queue.begin(); - uint32_t fms_sn = fms_it->first; + uint32_t fms_sn = undelivered_sdus_queue.get_fms(); int32_t diff = sn - fms_sn; if (diff > (int32_t)maximum_allocated_sns_window) { // This SN is too large to assign, it may cause HFN de-synchronization. @@ -616,14 +609,7 @@ bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu) } // Copy PDU contents into queue - unique_byte_buffer_t sdu_copy = make_byte_buffer(); - memcpy(sdu_copy->msg, sdu->msg, sdu->N_bytes); - sdu_copy->N_bytes = sdu->N_bytes; - - // Metrics - sdu_copy->set_timestamp(); - - undelivered_sdus_queue.insert(std::make_pair(sn, std::move(sdu_copy))); + undelivered_sdus_queue.add_sdu(sn, sdu); return true; } @@ -636,8 +622,8 @@ void pdcp_entity_lte::discard_callback::operator()(uint32_t timer_id) parent->logger.debug("Discard timer expired for PDU with SN = %d", discard_sn); // Discard PDU if unacknowledged - if (parent->undelivered_sdus_queue.find(discard_sn) != parent->undelivered_sdus_queue.end()) { - parent->undelivered_sdus_queue.erase(discard_sn); + if (parent->undelivered_sdus_queue.has_sdu(discard_sn)) { + parent->undelivered_sdus_queue.clear_sdu(discard_sn); parent->logger.debug("Removed undelivered PDU with TX_COUNT=%d", discard_sn); } else { parent->logger.debug("Could not find PDU to discard. TX_COUNT=%d", discard_sn); @@ -659,20 +645,23 @@ void pdcp_entity_lte::notify_delivery(const std::vector& pdcp_sns) for (uint32_t sn : pdcp_sns) { logger.debug("Delivery notification received for PDU with SN=%d", sn); + if (sn == UINT32_MAX) { + continue; + } // Find undelivered PDU info - std::map::iterator it = undelivered_sdus_queue.find(sn); - if (it == undelivered_sdus_queue.end()) { + if (not undelivered_sdus_queue.has_sdu(sn)) { logger.warning("Could not find PDU for delivery notification. Notified SN=%d", sn); } else { // Metrics - tx_pdu_ack_latency_ms.push(std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - it->second->get_timestamp()) - .count()); - metrics.num_tx_acked_bytes += it->second->N_bytes; - metrics.num_tx_buffered_pdus_bytes -= it->second->N_bytes; + tx_pdu_ack_latency_ms.push( + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - + undelivered_sdus_queue[sn]->get_timestamp()) + .count()); + metrics.num_tx_acked_bytes += undelivered_sdus_queue[sn]->N_bytes; + metrics.num_tx_buffered_pdus_bytes -= undelivered_sdus_queue[sn]->N_bytes; // Remove PDU and disarm timer. - undelivered_sdus_queue.erase(sn); + undelivered_sdus_queue.clear_sdu(sn); stop_discard_timer(sn); } } @@ -684,13 +673,15 @@ void pdcp_entity_lte::notify_failure(const std::vector& pdcp_sns) for (uint32_t sn : pdcp_sns) { logger.info("Failure notification received for PDU with SN=%d", sn); + if (sn == UINT32_MAX) { + continue; + } // Find undelivered PDU info - std::map::iterator it = undelivered_sdus_queue.find(sn); - if (it == undelivered_sdus_queue.end()) { + if (not undelivered_sdus_queue.has_sdu(sn)) { logger.info("Could not find PDU for failure notification. Notified SN=%d", sn); } else { // Remove PDU and disarm timer. - undelivered_sdus_queue.erase(sn); + undelivered_sdus_queue.clear_sdu(sn); stop_discard_timer(sn); } } @@ -741,10 +732,15 @@ std::map pdcp_entity_lte::get_buffered_p // Deep copy undelivered SDUs // TODO: investigate wheter the deep copy can be avoided by moving the undelivered SDU queue. // That can only be done just before the PDCP is disabled though. - for (auto it = undelivered_sdus_queue.begin(); it != undelivered_sdus_queue.end(); it++) { - cpy[it->first] = make_byte_buffer(); - (*cpy[it->first]) = *(it->second); - logger.debug(it->second->msg, it->second->N_bytes, "Forwarding buffered PDU with SN=%d", it->first); + for (uint32_t sn = 0; sn < undelivered_sdus_queue.get_capacity(); sn++) { + if (undelivered_sdus_queue.has_sdu(sn)) { + logger.debug(undelivered_sdus_queue[sn]->msg, + undelivered_sdus_queue[sn]->N_bytes, + "Forwarding buffered PDU with SN=%d", + sn); + cpy[sn] = make_byte_buffer(); + (*cpy[sn]) = *(undelivered_sdus_queue[sn]); + } } return cpy; } @@ -779,10 +775,7 @@ void pdcp_entity_lte::stop_discard_timer(uint32_t sn) pdcp_bearer_metrics_t pdcp_entity_lte::get_metrics() { metrics.num_tx_buffered_pdus = undelivered_sdus_queue.size(); - metrics.num_tx_buffered_pdus_bytes = 0; - for (auto sdu_it = undelivered_sdus_queue.begin(); sdu_it != undelivered_sdus_queue.end(); ++sdu_it) { - metrics.num_tx_buffered_pdus_bytes += sdu_it->second->N_bytes; //< Number of bytes of PDUs waiting for ACK - } + metrics.num_tx_buffered_pdus_bytes = undelivered_sdus_queue.get_bytes(); //< Number of bytes of PDUs waiting for ACK metrics.tx_notification_latency_ms = tx_pdu_ack_latency_ms.value(); //< Average time in ms from PDU delivery to RLC to ACK notification from RLC return metrics;