From 62b23271784c089e12ded1f19164ed288c2e74fe Mon Sep 17 00:00:00 2001 From: Andre Puschmann Date: Thu, 4 Mar 2021 13:27:26 +0100 Subject: [PATCH] rlc_am_lte: fix header reconstruction * optimize processing of status PDU (SN is removed from window immediately) * fix maxRetx signaling for segments * make tx_window_t a template class, rename and use for rx_window as well --- lib/include/srslte/upper/rlc_am_lte.h | 47 ++--- lib/src/upper/rlc_am_lte.cc | 238 +++++++++++++++++--------- 2 files changed, 181 insertions(+), 104 deletions(-) diff --git a/lib/include/srslte/upper/rlc_am_lte.h b/lib/include/srslte/upper/rlc_am_lte.h index c6e24c5b1..5fb46a419 100644 --- a/lib/include/srslte/upper/rlc_am_lte.h +++ b/lib/include/srslte/upper/rlc_am_lte.h @@ -34,6 +34,7 @@ namespace srslte { struct rlc_amd_rx_pdu_t { rlc_amd_pdu_header_t header; unique_byte_buffer_t buf; + uint32_t rlc_sn; }; struct rlc_amd_rx_pdu_segments_t { @@ -69,14 +70,16 @@ struct pdcp_sdu_info_t { std::vector rlc_sn_info_list; // List of RLC PDUs in transit and whether they have been acked or not. }; -struct tx_window_t { - tx_window_t() { clear(); } - void add_pdu(size_t sn) +template +struct rlc_ringbuffer_t { + rlc_ringbuffer_t() { clear(); } + T& add_pdu(size_t sn) { - assert(not active_flag[sn]); + assert(not has_sn(sn)); window[sn].rlc_sn = sn; active_flag[sn] = true; count++; + return window[sn]; } void remove_pdu(size_t sn) { @@ -85,7 +88,7 @@ struct tx_window_t { active_flag[sn] = false; count--; } - rlc_amd_tx_pdu_t& operator[](size_t sn) + T& operator[](size_t sn) { assert(has_sn(sn)); return window[sn]; @@ -95,30 +98,27 @@ struct tx_window_t { void clear() { std::fill(active_flag.begin(), active_flag.end(), false); - for (size_t i = 0; i < window.size(); ++i) { - window[i].pdcp_sns.clear(); - } count = 0; } - bool has_sn(uint32_t sn) const { return active_flag[sn] and window[sn].rlc_sn == sn; } - rlc_amd_tx_pdu_t& front() + + bool has_sn(uint32_t sn) const { return active_flag[sn] and (window[sn].rlc_sn == sn); } + + // Return the sum data bytes of all active PDUs (check PDU is non-null) + uint32_t get_buffered_bytes() { - assert(not empty()); - uint32_t min_rlc_sn = std::numeric_limits::max(), min_idx = std::numeric_limits::max(); - for (uint32_t i = 0; i < window.size(); ++i) { - if (active_flag[i] and window[i].rlc_sn < min_rlc_sn) { - min_idx = i; - min_rlc_sn = window[i].rlc_sn; + uint32_t buff_size = 0; + for (const auto& pdu : window) { + if (pdu.buf != nullptr) { + buff_size += pdu.buf->N_bytes; } } - assert(has_sn(min_rlc_sn)); - return window[min_idx]; + return buff_size; } private: - size_t count = 0; - srslte::circular_array active_flag = {}; - srslte::circular_array window; + size_t count = 0; + srslte::circular_array active_flag = {}; + srslte::circular_array window; }; struct buffered_pdcp_pdu_list { @@ -293,6 +293,7 @@ private: // Helpers bool poll_required(); bool do_status(); + bool sn_reached_max_retx(uint32_t sn); rlc_am_lte* parent = nullptr; byte_buffer_pool* pool = nullptr; @@ -343,7 +344,7 @@ private: bsr_callback_t bsr_callback; // Tx windows - tx_window_t tx_window; + rlc_ringbuffer_t tx_window; pdu_retx_queue retx_queue; std::vector notify_info_vec; @@ -414,7 +415,7 @@ private: pthread_mutex_t mutex; // Rx windows - std::map rx_window; + rlc_ringbuffer_t rx_window; std::map rx_segments; bool poll_received = false; diff --git a/lib/src/upper/rlc_am_lte.cc b/lib/src/upper/rlc_am_lte.cc index fea3f5eae..ed0376241 100644 --- a/lib/src/upper/rlc_am_lte.cc +++ b/lib/src/upper/rlc_am_lte.cc @@ -286,6 +286,47 @@ bool rlc_am_lte::rlc_am_lte_tx::has_data() (not tx_sdu_queue.is_empty())); // or if there is a SDU queued up for transmission } +/** + * Helper to check if a SN has reached the max reTx threshold + * + * Caller _must_ hold the mutex when calling the function. + * If the retx has been reached for a SN. The SN is removed from the Tx window + * and the RLC am state variables are advanced. + * + * @param sn The SN of the PDU to check + * @return True if the max_retx counter has been reached and the SN has been removed, false otherwise + */ +bool rlc_am_lte::rlc_am_lte_tx::sn_reached_max_retx(uint32_t sn) +{ + if (tx_window[sn].retx_count >= cfg.max_retx_thresh) { + logger.warning("%s Signaling max number of reTx=%d for for SN=%d", RB_NAME, tx_window[sn].retx_count, sn); + parent->rrc->max_retx_attempted(); + parent->pdcp->notify_failure(parent->lcid, tx_window[sn].pdcp_sns); + parent->metrics.num_lost_pdus++; + + // remove SN from Tx window + tx_window.remove_pdu(sn); + + // advance window if this is was the lowest SN we've been waiting for + if (sn == vt_a) { + vt_a = (vt_a + 1) % MOD; + vt_ms = (vt_ms + 1) % MOD; + + // Advance vt_a to the smallest SN for which ACK has not been received yet (Sec 5.1.3.1.1) + while (TX_MOD_BASE(vt_a) < TX_MOD_BASE(vt_s) && !tx_window.has_sn(vt_a)) { + logger.warning("SN=%d has already been removed, advance window vt_s=%d", vt_a, vt_s); + vt_a = (vt_a + 1) % MOD; + vt_ms = (vt_ms + 1) % MOD; + } + } else { + logger.warning("Don't advance window sn=%d not vt_a=%d", sn, vt_a); + } + + return true; + } + return false; +} + uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state() { pthread_mutex_lock(&mutex); @@ -482,8 +523,8 @@ void rlc_am_lte::rlc_am_lte_tx::timer_expired(uint32_t timeout_id) void rlc_am_lte::rlc_am_lte_tx::retransmit_pdu() { if (not tx_window.empty()) { - // select PDU in tx window for retransmission - rlc_amd_tx_pdu_t& pdu = tx_window.front(); + // select first PDU in tx window for retransmission + rlc_amd_tx_pdu_t& pdu = tx_window[vt_a]; logger.info("%s Schedule SN=%d for reTx.", RB_NAME, pdu.rlc_sn); rlc_amd_retx_t& retx = retx_queue.push(); retx.is_segment = false; @@ -623,15 +664,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_retx_pdu(uint8_t* payload, uint32_t nof_byt retx_queue.pop(); tx_window[retx.sn].retx_count++; - if (tx_window[retx.sn].retx_count >= cfg.max_retx_thresh) { - logger.warning("%s Signaling max number of reTx=%d for for SN=%d", RB_NAME, tx_window[retx.sn].retx_count, retx.sn); - parent->rrc->max_retx_attempted(); - parent->pdcp->notify_failure(parent->lcid, tx_window[retx.sn].pdcp_sns); - - // remove SN from Tx window, advance window - tx_window.remove_pdu(retx.sn); - vt_a = (vt_a + 1) % MOD; - vt_ms = (vt_ms + 1) % MOD; + if (sn_reached_max_retx(retx.sn)) { return 0; } @@ -724,18 +757,12 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte break; } - if (pdu_space <= 2) { - break; - } - upper += old_header.li[i]; head_len = rlc_am_packed_length(&new_header); - // Accomodate some extra space for for LIs if old header contained segments too - head_len += old_header.N_li; - pdu_space = nof_bytes - head_len; + if (pdu_space < (retx.so_end - retx.so_start)) { retx.so_end = retx.so_start + pdu_space; } @@ -756,15 +783,34 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte } new_header.li[new_header.N_li] = li; - // only increment N_li if more SDU (segments) are being added + // only increment N_li if more SDU (segments) are/can being added if (retx.so_end > upper) { - new_header.N_li++; + // Calculate header space for possible segment addition + rlc_amd_pdu_header_t tmp_header = new_header; + tmp_header.N_li++; + uint32_t tmp_header_len = rlc_am_packed_length(&tmp_header); + uint32_t tmp_data_len = retx.so_end - retx.so_start; + if (tmp_header_len + tmp_data_len <= nof_bytes) { + // Space is sufficiant to fit at least 1 B of yet another segment + new_header.N_li++; + } else { + // can't add new SDU, calculate total data length + uint32_t data_len = 0; + for (uint32_t k = 0; k <= new_header.N_li; ++k) { + data_len += new_header.li[k]; + } + retx.so_end = retx.so_start + data_len; + new_header.fi &= RLC_FI_FIELD_NOT_START_ALIGNED; // segment end is aligned with this SDU + } } } lower += old_header.li[i]; } + // Santity check we don't pack beyond the provided buffer + assert(head_len + (retx.so_end - retx.so_start) <= nof_bytes); + // Update retx_queue if (tx_window[retx.sn].buf->N_bytes == retx.so_end) { retx_queue.pop(); @@ -784,6 +830,11 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte tx_window[retx.sn].retx_count++; } + // Check max reTx counter and abort building segment if it passed the threshold + if (sn_reached_max_retx(retx.sn)) { + return 0; + } + // Write header and pdu uint8_t* ptr = payload; rlc_am_write_data_pdu_header(&new_header, &ptr); @@ -796,7 +847,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte if (pdu_len > static_cast(nof_bytes)) { logger.error("%s Retx PDU segment length error. Available: %d, Used: %d", RB_NAME, nof_bytes, pdu_len); int header_len = (ptr - payload); - logger.debug("%s Retx PDU segment length error. Header len: %d, Payload len: %d, N_li: %d", + logger.debug("%s Retx PDU segment length error. Actual header len: %d, Payload len: %d, N_li: %d", RB_NAME, header_len, len, @@ -867,7 +918,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt // Check for SDU segment std::vector pdcp_sns; - if (tx_sdu != NULL) { + if (tx_sdu != nullptr) { to_move = ((pdu_space - head_len) >= tx_sdu->N_bytes) ? tx_sdu->N_bytes : pdu_space - head_len; memcpy(pdu_ptr, tx_sdu->msg, to_move); last_li = to_move; @@ -875,16 +926,20 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt pdu->N_bytes += to_move; tx_sdu->N_bytes -= to_move; tx_sdu->msg += to_move; - if (not undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) { - logger.error("Could not find PDCP SN in SDU info queue (segment). PDCP_SN=%d", tx_sdu->md.pdcp_sn); - return 0; + if (undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) { + pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn]; + pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false}); + pdcp_sns.push_back(tx_sdu->md.pdcp_sn); + if (tx_sdu->N_bytes == 0) { + pdcp_sdu.fully_txed = true; + } + } else { + // PDCP SNs for the RLC SDU has been removed from the queue + logger.warning("Couldn't find PDCP_SN=%d in SDU info queue (segment)", tx_sdu->md.pdcp_sn); } - pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn]; - pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false}); - pdcp_sns.push_back(tx_sdu->md.pdcp_sn); + if (tx_sdu->N_bytes == 0) { logger.debug("%s Complete SDU scheduled for tx.", RB_NAME); - pdcp_sdu.fully_txed = true; tx_sdu.reset(); } if (pdu_space > to_move) { @@ -894,11 +949,13 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt } header.fi |= RLC_FI_FIELD_NOT_START_ALIGNED; // First byte does not correspond to first byte of SDU - logger.debug("%s Building PDU - added SDU segment (len:%d) - pdu_space: %d, head_len: %d ", - RB_NAME, - to_move, - pdu_space, - head_len); + logger.debug( + "%s Building PDU - added SDU segment from previous PDU (len:%d) - pdu_space: %d, head_len: %d header_sn=%d", + RB_NAME, + to_move, + pdu_space, + head_len, + header.sn); } // Pull SDUs from queue @@ -922,16 +979,20 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt pdu->N_bytes += to_move; tx_sdu->N_bytes -= to_move; tx_sdu->msg += to_move; - if (not undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) { - logger.error("Could not find PDCP SN in SDU info queue. PDCP_SN=%d", tx_sdu->md.pdcp_sn); - return 0; + if (undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) { + pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn]; + pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false}); + pdcp_sns.push_back(tx_sdu->md.pdcp_sn); + if (tx_sdu->N_bytes == 0) { + pdcp_sdu.fully_txed = true; + } + } else { + // PDCP SNs for the RLC SDU has been removed from the queue + logger.warning("Couldn't find PDCP_SN=%d in SDU info queue.", tx_sdu->md.pdcp_sn); } - pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn]; - pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false}); - pdcp_sns.push_back(tx_sdu->md.pdcp_sn); + if (tx_sdu->N_bytes == 0) { logger.debug("%s Complete SDU scheduled for tx. PDCP SN=%d", RB_NAME, tx_sdu->md.pdcp_sn); - pdcp_sdu.fully_txed = true; tx_sdu.reset(); } if (pdu_space > to_move) { @@ -1039,6 +1100,7 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no auto& pdu = tx_window[i]; if (!retx_queue.has_sn(i)) { rlc_amd_retx_t& retx = retx_queue.push(); + assert(tx_window[i].rlc_sn == i); retx.sn = i; retx.is_segment = false; retx.so_start = 0; @@ -1073,20 +1135,23 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no } } } + } else { + logger.warning("%s NACKed SN=%d already removed from Tx window", RB_NAME, i); } } } if (!nack) { - // ACKed SNs get marked and removed from tx_window if possible + // ACKed SNs get marked and removed from tx_window so PDCP get's only notified once if (tx_window.has_sn(i)) { auto& pdu = tx_window[i]; update_notification_ack_info(pdu); - if (update_vt_a) { - tx_window.remove_pdu(i); - vt_a = (vt_a + 1) % MOD; - vt_ms = (vt_ms + 1) % MOD; - } + tx_window.remove_pdu(i); + } + // Advance window if possible + if (update_vt_a) { + vt_a = (vt_a + 1) % MOD; + vt_ms = (vt_ms + 1) % MOD; } } i = (i + 1) % MOD; @@ -1335,8 +1400,7 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b return; } - it = rx_window.find(header.sn); - if (rx_window.end() != it) { + if (rx_window.has_sn(header.sn)) { if (header.p) { logger.info("%s Status packet requested through polling bit", RB_NAME); do_status = true; @@ -1346,7 +1410,7 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b } // Write to rx window - rlc_amd_rx_pdu_t pdu; + rlc_amd_rx_pdu_t& pdu = rx_window.add_pdu(header.sn); pdu.buf = srslte::make_byte_buffer(); if (pdu.buf == NULL) { #ifdef RLC_AM_BUFFER_DEBUG @@ -1354,6 +1418,7 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b exit(-1); #else logger.error("Fatal Error: Couldn't allocate PDU in handle_data_pdu()."); + rx_window.remove_pdu(header.sn); return; #endif } @@ -1372,18 +1437,14 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b pdu.buf->N_bytes = nof_bytes; pdu.header = header; - rx_window[header.sn] = std::move(pdu); - // Update vr_h if (RX_MOD_BASE(header.sn) >= RX_MOD_BASE(vr_h)) { vr_h = (header.sn + 1) % MOD; } // Update vr_ms - it = rx_window.find(vr_ms); - while (rx_window.end() != it) { + while (rx_window.has_sn(vr_ms)) { vr_ms = (vr_ms + 1) % MOD; - it = rx_window.find(vr_ms); } // Check poll bit @@ -1535,7 +1596,7 @@ void rlc_am_lte::rlc_am_lte_rx::reassemble_rx_sdus() } // Iterate through rx_window, assembling and delivering SDUs - while (rx_window.end() != rx_window.find(vr_r)) { + while (rx_window.has_sn(vr_r)) { // Handle any SDU segments for (uint32_t i = 0; i < rx_window[vr_r].header.N_li; i++) { len = rx_window[vr_r].header.li[i]; @@ -1657,7 +1718,7 @@ void rlc_am_lte::rlc_am_lte_rx::reassemble_rx_sdus() } it->second.segments.clear(); } - rx_window.erase(vr_r); + rx_window.remove_pdu(vr_r); vr_r = (vr_r + 1) % MOD; vr_mr = (vr_mr + 1) % MOD; } @@ -1709,9 +1770,7 @@ uint32_t rlc_am_lte::rlc_am_lte_rx::get_rx_buffered_bytes() { uint32_t buff_size = 0; pthread_mutex_lock(&mutex); - for (const auto& pdu : rx_window) { - buff_size += pdu.second.buf->N_bytes; - } + buff_size = rx_window.get_buffered_bytes(); pthread_mutex_unlock(&mutex); return buff_size; } @@ -1737,11 +1796,9 @@ void rlc_am_lte::rlc_am_lte_rx::timer_expired(uint32_t timeout_id) logger.debug("%s reordering timeout expiry - updating vr_ms (was %d)", RB_NAME, vr_ms); // 36.322 v10 Section 5.1.3.2.4 - vr_ms = vr_x; - std::map::iterator it = rx_window.find(vr_ms); - while (rx_window.end() != it) { + vr_ms = vr_x; + while (rx_window.has_sn(vr_ms)) { vr_ms = (vr_ms + 1) % MOD; - it = rx_window.find(vr_ms); } if (poll_received) { @@ -1768,7 +1825,7 @@ int rlc_am_lte::rlc_am_lte_rx::get_status_pdu(rlc_status_pdu_t* status, const ui // We don't use segment NACKs - just NACK the full PDU uint32_t i = vr_r; while (RX_MOD_BASE(i) <= RX_MOD_BASE(vr_ms) && status->N_nack < RLC_AM_WINDOW_SIZE) { - if (rx_window.find(i) != rx_window.end() || i == vr_ms) { + if (rx_window.has_sn(i) || i == vr_ms) { // only update ACK_SN if this SN has been received, or if we reached the maximum possible SN status->ack_sn = i; } else { @@ -1811,7 +1868,7 @@ int rlc_am_lte::rlc_am_lte_rx::get_status_pdu_length() status.ack_sn = vr_ms; uint32_t i = vr_r; while (RX_MOD_BASE(i) < RX_MOD_BASE(vr_ms) && status.N_nack < RLC_AM_WINDOW_SIZE) { - if (rx_window.find(i) == rx_window.end()) { + if (not rx_window.has_sn(i)) { status.N_nack++; } i = (i + 1) % MOD; @@ -1911,27 +1968,45 @@ bool rlc_am_lte::rlc_am_lte_rx::add_segment_and_check(rlc_amd_rx_pdu_segments_t* // Reconstruct li fields uint16_t count = 0; uint16_t carryover = 0; - for (it = pdu->segments.begin(); it != pdu->segments.end(); it++) { + uint16_t consumed_bytes = 0; // rolling sum of all allocated LIs during segment reconstruction + + for (it = pdu->segments.begin(); it != pdu->segments.end(); ++it) { logger.debug(" Handling %d PDU segments", it->header.N_li); for (uint32_t i = 0; i < it->header.N_li; i++) { - header.li[header.N_li] = it->header.li[i]; - if (i == 0) { - header.li[header.N_li] += carryover; + // variable marks total offset of each _processed_ LI of this segment + uint32_t total_pdu_offset = it->header.so; + for (uint32_t k = 0; k <= i; k++) { + total_pdu_offset += it->header.li[k]; + } + + logger.debug(" - (total_pdu_offset=%d, consumed_bytes=%d, header.li[i]=%d)",total_pdu_offset, consumed_bytes, header.li[i]); + if (total_pdu_offset > header.li[i] && total_pdu_offset > consumed_bytes) { + header.li[header.N_li] = total_pdu_offset - consumed_bytes; + consumed_bytes = total_pdu_offset; + + logger.debug(" - adding segment %d/%d (%d B, SO=%d, carryover=%d, count=%d)", + i + 1, + it->header.N_li, + header.li[header.N_li], + header.so, + carryover, + count); + header.N_li++; + count += it->header.li[i]; + carryover = 0; + } else { + logger.debug(" - Skipping segment in reTx PDU segment which is already included (%d B, SO=%d)", + it->header.li[i], + header.so); } - logger.debug(" - adding segment %d/%d (%d B, SO=%d, carryover=%d, count=%d)", - i + 1, - it->header.N_li, - header.li[header.N_li], - header.so, - carryover, - count); - header.N_li++; - count += it->header.li[i]; - carryover = 0; } if (count <= it->buf->N_bytes) { - carryover += it->buf->N_bytes - count; + carryover = it->header.so + it->buf->N_bytes; + // substract all previous LIs + for (uint32_t k = 0; k < header.N_li; ++k) { + carryover -= header.li[k]; + } logger.debug("Incremented carryover (it->buf->N_bytes=%d, count=%d). New carryover=%d", it->buf->N_bytes, count, @@ -1952,6 +2027,7 @@ bool rlc_am_lte::rlc_am_lte_rx::add_segment_and_check(rlc_amd_rx_pdu_segments_t* logger.debug("Header is end-aligned, overwrite header.li[%d]=%d", header.N_li, carryover); header.li[header.N_li] = carryover; header.N_li++; + consumed_bytes += carryover; carryover = 0; } count = 0;