rlc_am_lte: fix header reconstruction

* optimize processing of status PDU (SN is removed from window immediately)
* fix maxRetx signaling for segments
* make tx_window_t a template class, rename and use for rx_window as well
master
Andre Puschmann 4 years ago
parent 5e345df439
commit 62b2327178

@ -34,6 +34,7 @@ namespace srslte {
struct rlc_amd_rx_pdu_t {
rlc_amd_pdu_header_t header;
unique_byte_buffer_t buf;
uint32_t rlc_sn;
};
struct rlc_amd_rx_pdu_segments_t {
@ -69,14 +70,16 @@ struct pdcp_sdu_info_t {
std::vector<rlc_sn_info_t> rlc_sn_info_list; // List of RLC PDUs in transit and whether they have been acked or not.
};
struct tx_window_t {
tx_window_t() { clear(); }
void add_pdu(size_t sn)
template <class T>
struct rlc_ringbuffer_t {
rlc_ringbuffer_t() { clear(); }
T& add_pdu(size_t sn)
{
assert(not active_flag[sn]);
assert(not has_sn(sn));
window[sn].rlc_sn = sn;
active_flag[sn] = true;
count++;
return window[sn];
}
void remove_pdu(size_t sn)
{
@ -85,7 +88,7 @@ struct tx_window_t {
active_flag[sn] = false;
count--;
}
rlc_amd_tx_pdu_t& operator[](size_t sn)
T& operator[](size_t sn)
{
assert(has_sn(sn));
return window[sn];
@ -95,30 +98,27 @@ struct tx_window_t {
void clear()
{
std::fill(active_flag.begin(), active_flag.end(), false);
for (size_t i = 0; i < window.size(); ++i) {
window[i].pdcp_sns.clear();
}
count = 0;
}
bool has_sn(uint32_t sn) const { return active_flag[sn] and window[sn].rlc_sn == sn; }
rlc_amd_tx_pdu_t& front()
bool has_sn(uint32_t sn) const { return active_flag[sn] and (window[sn].rlc_sn == sn); }
// Return the sum data bytes of all active PDUs (check PDU is non-null)
uint32_t get_buffered_bytes()
{
assert(not empty());
uint32_t min_rlc_sn = std::numeric_limits<uint32_t>::max(), min_idx = std::numeric_limits<uint32_t>::max();
for (uint32_t i = 0; i < window.size(); ++i) {
if (active_flag[i] and window[i].rlc_sn < min_rlc_sn) {
min_idx = i;
min_rlc_sn = window[i].rlc_sn;
uint32_t buff_size = 0;
for (const auto& pdu : window) {
if (pdu.buf != nullptr) {
buff_size += pdu.buf->N_bytes;
}
}
assert(has_sn(min_rlc_sn));
return window[min_idx];
return buff_size;
}
private:
size_t count = 0;
srslte::circular_array<bool, RLC_AM_WINDOW_SIZE> active_flag = {};
srslte::circular_array<rlc_amd_tx_pdu_t, RLC_AM_WINDOW_SIZE> window;
size_t count = 0;
srslte::circular_array<bool, RLC_AM_WINDOW_SIZE> active_flag = {};
srslte::circular_array<T, RLC_AM_WINDOW_SIZE> window;
};
struct buffered_pdcp_pdu_list {
@ -293,6 +293,7 @@ private:
// Helpers
bool poll_required();
bool do_status();
bool sn_reached_max_retx(uint32_t sn);
rlc_am_lte* parent = nullptr;
byte_buffer_pool* pool = nullptr;
@ -343,7 +344,7 @@ private:
bsr_callback_t bsr_callback;
// Tx windows
tx_window_t tx_window;
rlc_ringbuffer_t<rlc_amd_tx_pdu_t> tx_window;
pdu_retx_queue retx_queue;
std::vector<uint32_t> notify_info_vec;
@ -414,7 +415,7 @@ private:
pthread_mutex_t mutex;
// Rx windows
std::map<uint32_t, rlc_amd_rx_pdu_t> rx_window;
rlc_ringbuffer_t<rlc_amd_rx_pdu_t> rx_window;
std::map<uint32_t, rlc_amd_rx_pdu_segments_t> rx_segments;
bool poll_received = false;

@ -286,6 +286,47 @@ bool rlc_am_lte::rlc_am_lte_tx::has_data()
(not tx_sdu_queue.is_empty())); // or if there is a SDU queued up for transmission
}
/**
* Helper to check if a SN has reached the max reTx threshold
*
* Caller _must_ hold the mutex when calling the function.
* If the retx has been reached for a SN. The SN is removed from the Tx window
* and the RLC am state variables are advanced.
*
* @param sn The SN of the PDU to check
* @return True if the max_retx counter has been reached and the SN has been removed, false otherwise
*/
bool rlc_am_lte::rlc_am_lte_tx::sn_reached_max_retx(uint32_t sn)
{
if (tx_window[sn].retx_count >= cfg.max_retx_thresh) {
logger.warning("%s Signaling max number of reTx=%d for for SN=%d", RB_NAME, tx_window[sn].retx_count, sn);
parent->rrc->max_retx_attempted();
parent->pdcp->notify_failure(parent->lcid, tx_window[sn].pdcp_sns);
parent->metrics.num_lost_pdus++;
// remove SN from Tx window
tx_window.remove_pdu(sn);
// advance window if this is was the lowest SN we've been waiting for
if (sn == vt_a) {
vt_a = (vt_a + 1) % MOD;
vt_ms = (vt_ms + 1) % MOD;
// Advance vt_a to the smallest SN for which ACK has not been received yet (Sec 5.1.3.1.1)
while (TX_MOD_BASE(vt_a) < TX_MOD_BASE(vt_s) && !tx_window.has_sn(vt_a)) {
logger.warning("SN=%d has already been removed, advance window vt_s=%d", vt_a, vt_s);
vt_a = (vt_a + 1) % MOD;
vt_ms = (vt_ms + 1) % MOD;
}
} else {
logger.warning("Don't advance window sn=%d not vt_a=%d", sn, vt_a);
}
return true;
}
return false;
}
uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state()
{
pthread_mutex_lock(&mutex);
@ -482,8 +523,8 @@ void rlc_am_lte::rlc_am_lte_tx::timer_expired(uint32_t timeout_id)
void rlc_am_lte::rlc_am_lte_tx::retransmit_pdu()
{
if (not tx_window.empty()) {
// select PDU in tx window for retransmission
rlc_amd_tx_pdu_t& pdu = tx_window.front();
// select first PDU in tx window for retransmission
rlc_amd_tx_pdu_t& pdu = tx_window[vt_a];
logger.info("%s Schedule SN=%d for reTx.", RB_NAME, pdu.rlc_sn);
rlc_amd_retx_t& retx = retx_queue.push();
retx.is_segment = false;
@ -623,15 +664,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_retx_pdu(uint8_t* payload, uint32_t nof_byt
retx_queue.pop();
tx_window[retx.sn].retx_count++;
if (tx_window[retx.sn].retx_count >= cfg.max_retx_thresh) {
logger.warning("%s Signaling max number of reTx=%d for for SN=%d", RB_NAME, tx_window[retx.sn].retx_count, retx.sn);
parent->rrc->max_retx_attempted();
parent->pdcp->notify_failure(parent->lcid, tx_window[retx.sn].pdcp_sns);
// remove SN from Tx window, advance window
tx_window.remove_pdu(retx.sn);
vt_a = (vt_a + 1) % MOD;
vt_ms = (vt_ms + 1) % MOD;
if (sn_reached_max_retx(retx.sn)) {
return 0;
}
@ -724,18 +757,12 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte
break;
}
if (pdu_space <= 2) {
break;
}
upper += old_header.li[i];
head_len = rlc_am_packed_length(&new_header);
// Accomodate some extra space for for LIs if old header contained segments too
head_len += old_header.N_li;
pdu_space = nof_bytes - head_len;
if (pdu_space < (retx.so_end - retx.so_start)) {
retx.so_end = retx.so_start + pdu_space;
}
@ -756,15 +783,34 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte
}
new_header.li[new_header.N_li] = li;
// only increment N_li if more SDU (segments) are being added
// only increment N_li if more SDU (segments) are/can being added
if (retx.so_end > upper) {
new_header.N_li++;
// Calculate header space for possible segment addition
rlc_amd_pdu_header_t tmp_header = new_header;
tmp_header.N_li++;
uint32_t tmp_header_len = rlc_am_packed_length(&tmp_header);
uint32_t tmp_data_len = retx.so_end - retx.so_start;
if (tmp_header_len + tmp_data_len <= nof_bytes) {
// Space is sufficiant to fit at least 1 B of yet another segment
new_header.N_li++;
} else {
// can't add new SDU, calculate total data length
uint32_t data_len = 0;
for (uint32_t k = 0; k <= new_header.N_li; ++k) {
data_len += new_header.li[k];
}
retx.so_end = retx.so_start + data_len;
new_header.fi &= RLC_FI_FIELD_NOT_START_ALIGNED; // segment end is aligned with this SDU
}
}
}
lower += old_header.li[i];
}
// Santity check we don't pack beyond the provided buffer
assert(head_len + (retx.so_end - retx.so_start) <= nof_bytes);
// Update retx_queue
if (tx_window[retx.sn].buf->N_bytes == retx.so_end) {
retx_queue.pop();
@ -784,6 +830,11 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte
tx_window[retx.sn].retx_count++;
}
// Check max reTx counter and abort building segment if it passed the threshold
if (sn_reached_max_retx(retx.sn)) {
return 0;
}
// Write header and pdu
uint8_t* ptr = payload;
rlc_am_write_data_pdu_header(&new_header, &ptr);
@ -796,7 +847,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_segment(uint8_t* payload, uint32_t nof_byte
if (pdu_len > static_cast<int>(nof_bytes)) {
logger.error("%s Retx PDU segment length error. Available: %d, Used: %d", RB_NAME, nof_bytes, pdu_len);
int header_len = (ptr - payload);
logger.debug("%s Retx PDU segment length error. Header len: %d, Payload len: %d, N_li: %d",
logger.debug("%s Retx PDU segment length error. Actual header len: %d, Payload len: %d, N_li: %d",
RB_NAME,
header_len,
len,
@ -867,7 +918,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
// Check for SDU segment
std::vector<uint32_t> pdcp_sns;
if (tx_sdu != NULL) {
if (tx_sdu != nullptr) {
to_move = ((pdu_space - head_len) >= tx_sdu->N_bytes) ? tx_sdu->N_bytes : pdu_space - head_len;
memcpy(pdu_ptr, tx_sdu->msg, to_move);
last_li = to_move;
@ -875,16 +926,20 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
pdu->N_bytes += to_move;
tx_sdu->N_bytes -= to_move;
tx_sdu->msg += to_move;
if (not undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) {
logger.error("Could not find PDCP SN in SDU info queue (segment). PDCP_SN=%d", tx_sdu->md.pdcp_sn);
return 0;
if (undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) {
pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn];
pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false});
pdcp_sns.push_back(tx_sdu->md.pdcp_sn);
if (tx_sdu->N_bytes == 0) {
pdcp_sdu.fully_txed = true;
}
} else {
// PDCP SNs for the RLC SDU has been removed from the queue
logger.warning("Couldn't find PDCP_SN=%d in SDU info queue (segment)", tx_sdu->md.pdcp_sn);
}
pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn];
pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false});
pdcp_sns.push_back(tx_sdu->md.pdcp_sn);
if (tx_sdu->N_bytes == 0) {
logger.debug("%s Complete SDU scheduled for tx.", RB_NAME);
pdcp_sdu.fully_txed = true;
tx_sdu.reset();
}
if (pdu_space > to_move) {
@ -894,11 +949,13 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
}
header.fi |= RLC_FI_FIELD_NOT_START_ALIGNED; // First byte does not correspond to first byte of SDU
logger.debug("%s Building PDU - added SDU segment (len:%d) - pdu_space: %d, head_len: %d ",
RB_NAME,
to_move,
pdu_space,
head_len);
logger.debug(
"%s Building PDU - added SDU segment from previous PDU (len:%d) - pdu_space: %d, head_len: %d header_sn=%d",
RB_NAME,
to_move,
pdu_space,
head_len,
header.sn);
}
// Pull SDUs from queue
@ -922,16 +979,20 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
pdu->N_bytes += to_move;
tx_sdu->N_bytes -= to_move;
tx_sdu->msg += to_move;
if (not undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) {
logger.error("Could not find PDCP SN in SDU info queue. PDCP_SN=%d", tx_sdu->md.pdcp_sn);
return 0;
if (undelivered_sdu_info_queue.has_pdcp_sn(tx_sdu->md.pdcp_sn)) {
pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn];
pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false});
pdcp_sns.push_back(tx_sdu->md.pdcp_sn);
if (tx_sdu->N_bytes == 0) {
pdcp_sdu.fully_txed = true;
}
} else {
// PDCP SNs for the RLC SDU has been removed from the queue
logger.warning("Couldn't find PDCP_SN=%d in SDU info queue.", tx_sdu->md.pdcp_sn);
}
pdcp_sdu_info_t& pdcp_sdu = undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn];
pdcp_sdu.rlc_sn_info_list.push_back({header.sn, false});
pdcp_sns.push_back(tx_sdu->md.pdcp_sn);
if (tx_sdu->N_bytes == 0) {
logger.debug("%s Complete SDU scheduled for tx. PDCP SN=%d", RB_NAME, tx_sdu->md.pdcp_sn);
pdcp_sdu.fully_txed = true;
tx_sdu.reset();
}
if (pdu_space > to_move) {
@ -1039,6 +1100,7 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no
auto& pdu = tx_window[i];
if (!retx_queue.has_sn(i)) {
rlc_amd_retx_t& retx = retx_queue.push();
assert(tx_window[i].rlc_sn == i);
retx.sn = i;
retx.is_segment = false;
retx.so_start = 0;
@ -1073,20 +1135,23 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no
}
}
}
} else {
logger.warning("%s NACKed SN=%d already removed from Tx window", RB_NAME, i);
}
}
}
if (!nack) {
// ACKed SNs get marked and removed from tx_window if possible
// ACKed SNs get marked and removed from tx_window so PDCP get's only notified once
if (tx_window.has_sn(i)) {
auto& pdu = tx_window[i];
update_notification_ack_info(pdu);
if (update_vt_a) {
tx_window.remove_pdu(i);
vt_a = (vt_a + 1) % MOD;
vt_ms = (vt_ms + 1) % MOD;
}
tx_window.remove_pdu(i);
}
// Advance window if possible
if (update_vt_a) {
vt_a = (vt_a + 1) % MOD;
vt_ms = (vt_ms + 1) % MOD;
}
}
i = (i + 1) % MOD;
@ -1335,8 +1400,7 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b
return;
}
it = rx_window.find(header.sn);
if (rx_window.end() != it) {
if (rx_window.has_sn(header.sn)) {
if (header.p) {
logger.info("%s Status packet requested through polling bit", RB_NAME);
do_status = true;
@ -1346,7 +1410,7 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b
}
// Write to rx window
rlc_amd_rx_pdu_t pdu;
rlc_amd_rx_pdu_t& pdu = rx_window.add_pdu(header.sn);
pdu.buf = srslte::make_byte_buffer();
if (pdu.buf == NULL) {
#ifdef RLC_AM_BUFFER_DEBUG
@ -1354,6 +1418,7 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b
exit(-1);
#else
logger.error("Fatal Error: Couldn't allocate PDU in handle_data_pdu().");
rx_window.remove_pdu(header.sn);
return;
#endif
}
@ -1372,18 +1437,14 @@ void rlc_am_lte::rlc_am_lte_rx::handle_data_pdu(uint8_t* payload, uint32_t nof_b
pdu.buf->N_bytes = nof_bytes;
pdu.header = header;
rx_window[header.sn] = std::move(pdu);
// Update vr_h
if (RX_MOD_BASE(header.sn) >= RX_MOD_BASE(vr_h)) {
vr_h = (header.sn + 1) % MOD;
}
// Update vr_ms
it = rx_window.find(vr_ms);
while (rx_window.end() != it) {
while (rx_window.has_sn(vr_ms)) {
vr_ms = (vr_ms + 1) % MOD;
it = rx_window.find(vr_ms);
}
// Check poll bit
@ -1535,7 +1596,7 @@ void rlc_am_lte::rlc_am_lte_rx::reassemble_rx_sdus()
}
// Iterate through rx_window, assembling and delivering SDUs
while (rx_window.end() != rx_window.find(vr_r)) {
while (rx_window.has_sn(vr_r)) {
// Handle any SDU segments
for (uint32_t i = 0; i < rx_window[vr_r].header.N_li; i++) {
len = rx_window[vr_r].header.li[i];
@ -1657,7 +1718,7 @@ void rlc_am_lte::rlc_am_lte_rx::reassemble_rx_sdus()
}
it->second.segments.clear();
}
rx_window.erase(vr_r);
rx_window.remove_pdu(vr_r);
vr_r = (vr_r + 1) % MOD;
vr_mr = (vr_mr + 1) % MOD;
}
@ -1709,9 +1770,7 @@ uint32_t rlc_am_lte::rlc_am_lte_rx::get_rx_buffered_bytes()
{
uint32_t buff_size = 0;
pthread_mutex_lock(&mutex);
for (const auto& pdu : rx_window) {
buff_size += pdu.second.buf->N_bytes;
}
buff_size = rx_window.get_buffered_bytes();
pthread_mutex_unlock(&mutex);
return buff_size;
}
@ -1737,11 +1796,9 @@ void rlc_am_lte::rlc_am_lte_rx::timer_expired(uint32_t timeout_id)
logger.debug("%s reordering timeout expiry - updating vr_ms (was %d)", RB_NAME, vr_ms);
// 36.322 v10 Section 5.1.3.2.4
vr_ms = vr_x;
std::map<uint32_t, rlc_amd_rx_pdu_t>::iterator it = rx_window.find(vr_ms);
while (rx_window.end() != it) {
vr_ms = vr_x;
while (rx_window.has_sn(vr_ms)) {
vr_ms = (vr_ms + 1) % MOD;
it = rx_window.find(vr_ms);
}
if (poll_received) {
@ -1768,7 +1825,7 @@ int rlc_am_lte::rlc_am_lte_rx::get_status_pdu(rlc_status_pdu_t* status, const ui
// We don't use segment NACKs - just NACK the full PDU
uint32_t i = vr_r;
while (RX_MOD_BASE(i) <= RX_MOD_BASE(vr_ms) && status->N_nack < RLC_AM_WINDOW_SIZE) {
if (rx_window.find(i) != rx_window.end() || i == vr_ms) {
if (rx_window.has_sn(i) || i == vr_ms) {
// only update ACK_SN if this SN has been received, or if we reached the maximum possible SN
status->ack_sn = i;
} else {
@ -1811,7 +1868,7 @@ int rlc_am_lte::rlc_am_lte_rx::get_status_pdu_length()
status.ack_sn = vr_ms;
uint32_t i = vr_r;
while (RX_MOD_BASE(i) < RX_MOD_BASE(vr_ms) && status.N_nack < RLC_AM_WINDOW_SIZE) {
if (rx_window.find(i) == rx_window.end()) {
if (not rx_window.has_sn(i)) {
status.N_nack++;
}
i = (i + 1) % MOD;
@ -1911,27 +1968,45 @@ bool rlc_am_lte::rlc_am_lte_rx::add_segment_and_check(rlc_amd_rx_pdu_segments_t*
// Reconstruct li fields
uint16_t count = 0;
uint16_t carryover = 0;
for (it = pdu->segments.begin(); it != pdu->segments.end(); it++) {
uint16_t consumed_bytes = 0; // rolling sum of all allocated LIs during segment reconstruction
for (it = pdu->segments.begin(); it != pdu->segments.end(); ++it) {
logger.debug(" Handling %d PDU segments", it->header.N_li);
for (uint32_t i = 0; i < it->header.N_li; i++) {
header.li[header.N_li] = it->header.li[i];
if (i == 0) {
header.li[header.N_li] += carryover;
// variable marks total offset of each _processed_ LI of this segment
uint32_t total_pdu_offset = it->header.so;
for (uint32_t k = 0; k <= i; k++) {
total_pdu_offset += it->header.li[k];
}
logger.debug(" - (total_pdu_offset=%d, consumed_bytes=%d, header.li[i]=%d)",total_pdu_offset, consumed_bytes, header.li[i]);
if (total_pdu_offset > header.li[i] && total_pdu_offset > consumed_bytes) {
header.li[header.N_li] = total_pdu_offset - consumed_bytes;
consumed_bytes = total_pdu_offset;
logger.debug(" - adding segment %d/%d (%d B, SO=%d, carryover=%d, count=%d)",
i + 1,
it->header.N_li,
header.li[header.N_li],
header.so,
carryover,
count);
header.N_li++;
count += it->header.li[i];
carryover = 0;
} else {
logger.debug(" - Skipping segment in reTx PDU segment which is already included (%d B, SO=%d)",
it->header.li[i],
header.so);
}
logger.debug(" - adding segment %d/%d (%d B, SO=%d, carryover=%d, count=%d)",
i + 1,
it->header.N_li,
header.li[header.N_li],
header.so,
carryover,
count);
header.N_li++;
count += it->header.li[i];
carryover = 0;
}
if (count <= it->buf->N_bytes) {
carryover += it->buf->N_bytes - count;
carryover = it->header.so + it->buf->N_bytes;
// substract all previous LIs
for (uint32_t k = 0; k < header.N_li; ++k) {
carryover -= header.li[k];
}
logger.debug("Incremented carryover (it->buf->N_bytes=%d, count=%d). New carryover=%d",
it->buf->N_bytes,
count,
@ -1952,6 +2027,7 @@ bool rlc_am_lte::rlc_am_lte_rx::add_segment_and_check(rlc_amd_rx_pdu_segments_t*
logger.debug("Header is end-aligned, overwrite header.li[%d]=%d", header.N_li, carryover);
header.li[header.N_li] = carryover;
header.N_li++;
consumed_bytes += carryover;
carryover = 0;
}
count = 0;

Loading…
Cancel
Save