Converted undelivered sdus queue in the PDCP to use a vector.

This was done to improve the performance of the notify_delivery().
master
Pedro Alvarez 4 years ago committed by Francisco Paisana
parent 12f998cea3
commit 4723dd0aa9

@ -99,6 +99,12 @@ private:
uint32_t reordering_window = 0; uint32_t reordering_window = 0;
uint32_t maximum_pdcp_sn = 0; uint32_t maximum_pdcp_sn = 0;
// PDU handlers
void handle_control_pdu(srslte::unique_byte_buffer_t pdu);
void handle_srb_pdu(srslte::unique_byte_buffer_t pdu);
void handle_um_drb_pdu(srslte::unique_byte_buffer_t pdu);
void handle_am_drb_pdu(srslte::unique_byte_buffer_t pdu);
// Discard callback (discardTimer) // Discard callback (discardTimer)
class discard_callback; class discard_callback;
std::vector<unique_timer> discard_timers; std::vector<unique_timer> discard_timers;
@ -107,12 +113,149 @@ private:
// TX Queue // TX Queue
uint32_t maximum_allocated_sns_window = 2048; uint32_t maximum_allocated_sns_window = 2048;
std::map<uint32_t, unique_byte_buffer_t> undelivered_sdus_queue; class undelivered_sdus_queue_t
{
public:
undelivered_sdus_queue_t() : sdus(capacity) {}
void handle_control_pdu(srslte::unique_byte_buffer_t pdu); bool empty() { return count == 0; }
void handle_srb_pdu(srslte::unique_byte_buffer_t pdu);
void handle_um_drb_pdu(srslte::unique_byte_buffer_t pdu); bool is_full() { return count >= capacity; }
void handle_am_drb_pdu(srslte::unique_byte_buffer_t pdu);
uint32_t size() { return count; }
uint32_t get_capacity() { return capacity; }
bool has_sdu(uint32_t sn)
{
if (sn >= capacity) {
return false;
}
return sdus[sn] != nullptr;
}
bool add_sdu(uint32_t sn, const srslte::unique_byte_buffer_t& sdu)
{
assert(not has_sdu(sn));
if (is_full()) {
return false;
}
// Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs
if (not empty()) {
int32_t diff = sn - fms;
if (diff > (int32_t)(capacity / 2)) {
return false;
}
if (diff <= 0 && diff > -((int32_t)(capacity / 2))) {
return false;
}
}
// Allocate buffer and exit on error
srslte::unique_byte_buffer_t tmp = make_byte_buffer();
if (tmp == nullptr) {
return false;
}
// Update FMS and LMS if necessary
if (empty()) {
fms = sn;
lms = sn;
} else {
update_lms(sn);
}
// Add SDU
count++;
sdus[sn] = std::move(tmp);
memcpy(sdus[sn]->msg, sdu->msg, sdu->N_bytes);
sdus[sn]->N_bytes = sdu->N_bytes;
bytes += sdu->N_bytes;
sdus[sn]->set_timestamp(); // Metrics
return true;
}
unique_byte_buffer_t& operator[](uint32_t sn)
{
assert(has_sdu(sn));
return sdus[sn];
}
void clear_sdu(uint32_t sn)
{
assert(has_sdu(sn));
if (has_sdu(sn)) {
count--;
bytes -= sdus[sn]->N_bytes;
sdus[sn] = nullptr;
}
// Find next FMS,
update_fms();
}
void clear()
{
count = 0;
bytes = 0;
fms = 0;
for (uint32_t sn = 0; sn < capacity; sn++) {
sdus[sn] = nullptr;
}
}
uint32_t get_bytes() { return bytes; }
uint32_t get_fms() { return fms; }
void set_fms(uint32_t fms_) { fms = fms_; }
void update_fms()
{
if (empty()) {
fms = increment_sn(fms);
return;
}
for (uint32_t i = 0; i < capacity; ++i) {
uint32_t sn = increment_sn(fms + i);
if (has_sdu(sn)) {
fms = sn;
return;
}
}
fms = increment_sn(fms);
}
void update_lms(uint32_t sn)
{
if (empty()) {
lms = fms;
return;
}
int32_t diff = sn - lms;
if (diff > 0 && sn > lms) {
lms = sn;
} else if (diff < 0 && sn < lms) {
lms = sn;
}
}
uint32_t get_lms() { return lms; }
private:
uint32_t increment_sn(uint32_t sn) { return (sn + 1) % capacity; }
uint32_t decrement_sn(uint32_t sn) { return (sn - 1) % capacity; }
const static uint32_t capacity = 4096;
uint32_t count = 0;
uint32_t bytes = 0;
uint32_t fms = 0;
uint32_t lms = 0;
std::vector<srslte::unique_byte_buffer_t> sdus;
} undelivered_sdus_queue;
}; };
// Discard callback (discardTimer) // Discard callback (discardTimer)

@ -107,21 +107,7 @@ void pdcp_entity_lte::reestablish()
st.rx_hfn = 0; st.rx_hfn = 0;
st.next_pdcp_rx_sn = 0; st.next_pdcp_rx_sn = 0;
} else { } else {
// Send status report if required on reestablishment in RLC AM // Sending the status report will be triggered by the RRC if required
// send_status_report();
// Re-transmit unacknowledged SDUs
/*
send_status_report();
// Re-transmit unacknowledged SDUs
std::map<uint32_t, unique_byte_buffer_t> undelivered_sdus = std::move(undelivered_sdus_queue);
undelivered_sdus_queue.clear();
for (std::map<uint32_t, unique_byte_buffer_t>::iterator it = undelivered_sdus.begin(); it != undelivered_sdus.end();
++it) {
write_sdu(std::move(it->second), it->first);
}*/
} }
} }
@ -160,6 +146,7 @@ void pdcp_entity_lte::write_sdu(unique_byte_buffer_t sdu, int upper_sn)
if (!rlc->rb_is_um(lcid)) { if (!rlc->rb_is_um(lcid)) {
if (not store_sdu(used_sn, sdu)) { if (not store_sdu(used_sn, sdu)) {
// Could not store the SDU, discarding // Could not store the SDU, discarding
logger.error("Could not store SDU. Discarding %d\n", used_sn);
return; return;
} }
@ -458,10 +445,11 @@ void pdcp_entity_lte::send_status_report()
if (undelivered_sdus_queue.empty()) { if (undelivered_sdus_queue.empty()) {
fms = st.next_pdcp_tx_sn; fms = st.next_pdcp_tx_sn;
} else { } else {
fms = undelivered_sdus_queue.begin()->first; fms = undelivered_sdus_queue.get_fms();
} }
logger.debug("Status report: FMS=%d", fms); // Get Last Missing Segment
uint32_t lms = undelivered_sdus_queue.get_lms();
// Allocate Status Report PDU // Allocate Status Report PDU
unique_byte_buffer_t pdu = make_byte_buffer(); unique_byte_buffer_t pdu = make_byte_buffer();
@ -470,6 +458,7 @@ void pdcp_entity_lte::send_status_report()
return; return;
} }
logger.debug("Status report: FMS=%d, LMS=%d", fms, lms);
// Set control bit and type of PDU // Set control bit and type of PDU
pdu->msg[0] = ((uint8_t)PDCP_DC_FIELD_CONTROL_PDU << 7) | ((uint8_t)PDCP_PDU_TYPE_STATUS_REPORT << 4); pdu->msg[0] = ((uint8_t)PDCP_DC_FIELD_CONTROL_PDU << 7) | ((uint8_t)PDCP_PDU_TYPE_STATUS_REPORT << 4);
@ -494,20 +483,21 @@ void pdcp_entity_lte::send_status_report()
// Add bitmap of missing PDUs, if necessary // Add bitmap of missing PDUs, if necessary
if (not undelivered_sdus_queue.empty()) { if (not undelivered_sdus_queue.empty()) {
// First check size of bitmap // First check size of bitmap
uint32_t last_sn = undelivered_sdus_queue.rbegin()->first; uint32_t bitmap_sz = std::ceil((float)(lms - (fms - 1)) / 8);
uint32_t bitmap_sz = std::ceil((float)(last_sn - (fms - 1)) / 8);
memset(&pdu->msg[pdu->N_bytes], 0, bitmap_sz); memset(&pdu->msg[pdu->N_bytes], 0, bitmap_sz);
logger.debug( logger.debug(
"Setting status report bitmap. Last SN acked=%d, Last SN acked in sequence=%d, Bitmap size in bytes=%d", "Setting status report bitmap. Last missing SN=%d, Last SN acked in sequence=%d, Bitmap size in bytes=%d",
last_sn, lms,
fms - 1, fms - 1,
bitmap_sz); bitmap_sz);
for (auto it = undelivered_sdus_queue.begin(); it != undelivered_sdus_queue.end(); it++) { for (uint32_t sn = fms; sn <= lms; sn++) {
uint32_t offset = it->first - fms; if (undelivered_sdus_queue.has_sdu(sn)) {
uint32_t offset = sn - fms;
uint32_t bit_offset = offset % 8; uint32_t bit_offset = offset % 8;
uint32_t byte_offset = offset / 8; uint32_t byte_offset = offset / 8;
pdu->msg[pdu->N_bytes + byte_offset] |= 1 << (7 - bit_offset); pdu->msg[pdu->N_bytes + byte_offset] |= 1 << (7 - bit_offset);
} }
}
pdu->N_bytes += bitmap_sz; pdu->N_bytes += bitmap_sz;
} }
pdu->md.pdcp_sn = -1; pdu->md.pdcp_sn = -1;
@ -548,12 +538,10 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu)
} }
// Remove all SDUs with SN smaller than FMS // Remove all SDUs with SN smaller than FMS
for (auto it = undelivered_sdus_queue.begin(); it != undelivered_sdus_queue.end();) { for (uint32_t sn = 0; sn < fms; sn++) {
if (it->first < fms) { if (sn < fms && undelivered_sdus_queue.has_sdu(sn)) {
stop_discard_timer(it->first); stop_discard_timer(sn);
it = undelivered_sdus_queue.erase(it); undelivered_sdus_queue.clear_sdu(sn);
} else {
++it;
} }
} }
@ -571,27 +559,32 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu)
// Discard ACK'ed SDUs // Discard ACK'ed SDUs
for (uint32_t sn : acked_sns) { for (uint32_t sn : acked_sns) {
logger.debug("Status report ACKed SN=%d.", sn); logger.debug("Status report ACKed SN=%d.", sn);
undelivered_sdus_queue.erase(sn); undelivered_sdus_queue.clear_sdu(sn);
stop_discard_timer(sn); stop_discard_timer(sn);
} }
} }
/**************************************************************************** /****************************************************************************
* TX PDUs Queue Helper * TX PDUs Queue Helper
***************************************************************************/ ***************************************************************************/
bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu) bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu)
{ {
logger.debug("Storing SDU in undelivered SDUs queue. SN=%d, Queue size=%ld", sn, undelivered_sdus_queue.size()); logger.debug("Storing SDU in undelivered SDUs queue. SN=%d, Queue size=%ld", sn, undelivered_sdus_queue.size());
// Check wether PDU is already in the queue // Check wether PDU is already in the queue
if (undelivered_sdus_queue.find(sn) != undelivered_sdus_queue.end()) { if (undelivered_sdus_queue.has_sdu(sn)) {
logger.error("PDU already exists in the queue. TX_COUNT=%d", sn); logger.error("PDU already exists in the queue. TX_COUNT=%d", sn);
return false; return false;
} }
if (undelivered_sdus_queue.is_full()) {
logger.error("Undelivered SDUs queue is full. TX_COUNT=%d", sn);
return false;
}
// Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs // Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs
if (not undelivered_sdus_queue.empty()) { if (not undelivered_sdus_queue.empty()) {
auto fms_it = undelivered_sdus_queue.begin(); uint32_t fms_sn = undelivered_sdus_queue.get_fms();
uint32_t fms_sn = fms_it->first;
int32_t diff = sn - fms_sn; int32_t diff = sn - fms_sn;
if (diff > (int32_t)maximum_allocated_sns_window) { if (diff > (int32_t)maximum_allocated_sns_window) {
// This SN is too large to assign, it may cause HFN de-synchronization. // This SN is too large to assign, it may cause HFN de-synchronization.
@ -616,14 +609,7 @@ bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu)
} }
// Copy PDU contents into queue // Copy PDU contents into queue
unique_byte_buffer_t sdu_copy = make_byte_buffer(); undelivered_sdus_queue.add_sdu(sn, sdu);
memcpy(sdu_copy->msg, sdu->msg, sdu->N_bytes);
sdu_copy->N_bytes = sdu->N_bytes;
// Metrics
sdu_copy->set_timestamp();
undelivered_sdus_queue.insert(std::make_pair(sn, std::move(sdu_copy)));
return true; return true;
} }
@ -636,8 +622,8 @@ void pdcp_entity_lte::discard_callback::operator()(uint32_t timer_id)
parent->logger.debug("Discard timer expired for PDU with SN = %d", discard_sn); parent->logger.debug("Discard timer expired for PDU with SN = %d", discard_sn);
// Discard PDU if unacknowledged // Discard PDU if unacknowledged
if (parent->undelivered_sdus_queue.find(discard_sn) != parent->undelivered_sdus_queue.end()) { if (parent->undelivered_sdus_queue.has_sdu(discard_sn)) {
parent->undelivered_sdus_queue.erase(discard_sn); parent->undelivered_sdus_queue.clear_sdu(discard_sn);
parent->logger.debug("Removed undelivered PDU with TX_COUNT=%d", discard_sn); parent->logger.debug("Removed undelivered PDU with TX_COUNT=%d", discard_sn);
} else { } else {
parent->logger.debug("Could not find PDU to discard. TX_COUNT=%d", discard_sn); parent->logger.debug("Could not find PDU to discard. TX_COUNT=%d", discard_sn);
@ -659,20 +645,23 @@ void pdcp_entity_lte::notify_delivery(const std::vector<uint32_t>& pdcp_sns)
for (uint32_t sn : pdcp_sns) { for (uint32_t sn : pdcp_sns) {
logger.debug("Delivery notification received for PDU with SN=%d", sn); logger.debug("Delivery notification received for PDU with SN=%d", sn);
if (sn == UINT32_MAX) {
continue;
}
// Find undelivered PDU info // Find undelivered PDU info
std::map<uint32_t, unique_byte_buffer_t>::iterator it = undelivered_sdus_queue.find(sn); if (not undelivered_sdus_queue.has_sdu(sn)) {
if (it == undelivered_sdus_queue.end()) {
logger.warning("Could not find PDU for delivery notification. Notified SN=%d", sn); logger.warning("Could not find PDU for delivery notification. Notified SN=%d", sn);
} else { } else {
// Metrics // Metrics
tx_pdu_ack_latency_ms.push(std::chrono::duration_cast<std::chrono::milliseconds>( tx_pdu_ack_latency_ms.push(
std::chrono::high_resolution_clock::now() - it->second->get_timestamp()) std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() -
undelivered_sdus_queue[sn]->get_timestamp())
.count()); .count());
metrics.num_tx_acked_bytes += it->second->N_bytes; metrics.num_tx_acked_bytes += undelivered_sdus_queue[sn]->N_bytes;
metrics.num_tx_buffered_pdus_bytes -= it->second->N_bytes; metrics.num_tx_buffered_pdus_bytes -= undelivered_sdus_queue[sn]->N_bytes;
// Remove PDU and disarm timer. // Remove PDU and disarm timer.
undelivered_sdus_queue.erase(sn); undelivered_sdus_queue.clear_sdu(sn);
stop_discard_timer(sn); stop_discard_timer(sn);
} }
} }
@ -684,13 +673,15 @@ void pdcp_entity_lte::notify_failure(const std::vector<uint32_t>& pdcp_sns)
for (uint32_t sn : pdcp_sns) { for (uint32_t sn : pdcp_sns) {
logger.info("Failure notification received for PDU with SN=%d", sn); logger.info("Failure notification received for PDU with SN=%d", sn);
if (sn == UINT32_MAX) {
continue;
}
// Find undelivered PDU info // Find undelivered PDU info
std::map<uint32_t, unique_byte_buffer_t>::iterator it = undelivered_sdus_queue.find(sn); if (not undelivered_sdus_queue.has_sdu(sn)) {
if (it == undelivered_sdus_queue.end()) {
logger.info("Could not find PDU for failure notification. Notified SN=%d", sn); logger.info("Could not find PDU for failure notification. Notified SN=%d", sn);
} else { } else {
// Remove PDU and disarm timer. // Remove PDU and disarm timer.
undelivered_sdus_queue.erase(sn); undelivered_sdus_queue.clear_sdu(sn);
stop_discard_timer(sn); stop_discard_timer(sn);
} }
} }
@ -741,10 +732,15 @@ std::map<uint32_t, srslte::unique_byte_buffer_t> pdcp_entity_lte::get_buffered_p
// Deep copy undelivered SDUs // Deep copy undelivered SDUs
// TODO: investigate wheter the deep copy can be avoided by moving the undelivered SDU queue. // TODO: investigate wheter the deep copy can be avoided by moving the undelivered SDU queue.
// That can only be done just before the PDCP is disabled though. // That can only be done just before the PDCP is disabled though.
for (auto it = undelivered_sdus_queue.begin(); it != undelivered_sdus_queue.end(); it++) { for (uint32_t sn = 0; sn < undelivered_sdus_queue.get_capacity(); sn++) {
cpy[it->first] = make_byte_buffer(); if (undelivered_sdus_queue.has_sdu(sn)) {
(*cpy[it->first]) = *(it->second); logger.debug(undelivered_sdus_queue[sn]->msg,
logger.debug(it->second->msg, it->second->N_bytes, "Forwarding buffered PDU with SN=%d", it->first); undelivered_sdus_queue[sn]->N_bytes,
"Forwarding buffered PDU with SN=%d",
sn);
cpy[sn] = make_byte_buffer();
(*cpy[sn]) = *(undelivered_sdus_queue[sn]);
}
} }
return cpy; return cpy;
} }
@ -779,10 +775,7 @@ void pdcp_entity_lte::stop_discard_timer(uint32_t sn)
pdcp_bearer_metrics_t pdcp_entity_lte::get_metrics() pdcp_bearer_metrics_t pdcp_entity_lte::get_metrics()
{ {
metrics.num_tx_buffered_pdus = undelivered_sdus_queue.size(); metrics.num_tx_buffered_pdus = undelivered_sdus_queue.size();
metrics.num_tx_buffered_pdus_bytes = 0; metrics.num_tx_buffered_pdus_bytes = undelivered_sdus_queue.get_bytes(); //< Number of bytes of PDUs waiting for ACK
for (auto sdu_it = undelivered_sdus_queue.begin(); sdu_it != undelivered_sdus_queue.end(); ++sdu_it) {
metrics.num_tx_buffered_pdus_bytes += sdu_it->second->N_bytes; //< Number of bytes of PDUs waiting for ACK
}
metrics.tx_notification_latency_ms = metrics.tx_notification_latency_ms =
tx_pdu_ack_latency_ms.value(); //< Average time in ms from PDU delivery to RLC to ACK notification from RLC tx_pdu_ack_latency_ms.value(); //< Average time in ms from PDU delivery to RLC to ACK notification from RLC
return metrics; return metrics;

Loading…
Cancel
Save