RLC discard (#2515)

* Added ability to discard to dyn_block_queue

* Change way of keeping track of SDUs

* Check nullptr in poping callback. Starting to check for nullptr in RLC read_pdu.

* Adding RLC discard tests

* Clearing PDCP info when RLC discard happens

* Read SDUs until they are no longer nullptr

* Changed discard_if to use template argument
master
Pedro Alvarez 4 years ago committed by GitHub
parent 0d5038dd34
commit d91119baf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,6 +18,7 @@
#include <array>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <type_traits>
@ -156,6 +157,19 @@ public:
buffer.resize(size);
}
template <typename F>
T discard_if(const F& func)
{
for (auto it = buffer.begin(); it != buffer.end(); it++) {
if (*it != nullptr && func(*it)) {
T tmp = std::move(*it);
*it = nullptr;
return tmp;
}
}
return nullptr;
}
private:
Container buffer;
size_t rpos = 0;
@ -264,6 +278,18 @@ public:
return false;
}
template <typename F>
bool discard_if(const F& func)
{
std::lock_guard<std::mutex> lock(mutex);
T tmp = circ_buffer.discard_if(func);
if (tmp == nullptr) {
return false;
}
pop_func(tmp);
return true;
}
protected:
bool active = true;
uint8_t nof_waiting = 0;
@ -443,6 +469,12 @@ public:
base_t(push_callback, pop_callback, size)
{}
void set_size(size_t size) { base_t::circ_buffer.set_size(size); }
template <typename F>
bool discard_if(const F& func)
{
return base_t::discard_if(func);
}
};
} // namespace srsran

@ -23,7 +23,9 @@
#include "srsran/adt/circular_buffer.h"
#include "srsran/common/block_queue.h"
#include "srsran/common/byte_buffer.h"
#include "srsran/common/common.h"
#include <functional>
#include <pthread.h>
namespace srsran {
@ -31,7 +33,9 @@ namespace srsran {
class byte_buffer_queue
{
public:
byte_buffer_queue(int capacity = 128) : queue(capacity, push_callback(unread_bytes), pop_callback(unread_bytes)) {}
byte_buffer_queue(int capacity = 128) :
queue(capacity, push_callback(unread_bytes, n_sdus), pop_callback(unread_bytes, n_sdus))
{}
void write(unique_byte_buffer_t msg) { queue.push_blocking(std::move(msg)); }
@ -46,6 +50,7 @@ public:
void resize(uint32_t capacity) { queue.set_size(capacity); }
uint32_t size() { return (uint32_t)queue.size(); }
uint32_t get_n_sdus() { return n_sdus; }
uint32_t size_bytes() { return unread_bytes; }
@ -67,20 +72,42 @@ public:
bool is_full() { return queue.full(); }
template <typename F>
bool discard_if(const F& func)
{
return queue.discard_if(func);
}
private:
struct push_callback {
explicit push_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {}
void operator()(const unique_byte_buffer_t& msg) { *unread_bytes += msg->N_bytes; }
explicit push_callback(uint32_t& unread_bytes_, uint32_t& n_sdus_) : unread_bytes(&unread_bytes_), n_sdus(&n_sdus_)
{}
void operator()(const unique_byte_buffer_t& msg)
{
*unread_bytes += msg->N_bytes;
(*n_sdus)++;
}
uint32_t* unread_bytes;
uint32_t* n_sdus;
};
struct pop_callback {
explicit pop_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {}
void operator()(const unique_byte_buffer_t& msg) { *unread_bytes -= std::min(msg->N_bytes, *unread_bytes); }
explicit pop_callback(uint32_t& unread_bytes_, uint32_t& n_sdus_) : unread_bytes(&unread_bytes_), n_sdus(&n_sdus_)
{}
void operator()(const unique_byte_buffer_t& msg)
{
if (msg == nullptr) {
return;
}
*unread_bytes -= std::min(msg->N_bytes, *unread_bytes);
*n_sdus = std::max(0, (int32_t)(*n_sdus) - 1);
}
uint32_t* unread_bytes;
uint32_t* n_sdus;
};
dyn_blocking_queue<unique_byte_buffer_t, push_callback, pop_callback> queue;
uint32_t unread_bytes = 0;
uint32_t n_sdus = 0;
};
} // namespace srsran

@ -209,12 +209,9 @@ rlc_am_lte::rlc_am_lte_tx::rlc_am_lte_tx(rlc_am_lte* parent_) :
pool(byte_buffer_pool::get_instance()),
poll_retx_timer(parent_->timers->get_unique_timer()),
status_prohibit_timer(parent_->timers->get_unique_timer())
{
}
{}
rlc_am_lte::rlc_am_lte_tx::~rlc_am_lte_tx()
{
}
rlc_am_lte::rlc_am_lte_tx::~rlc_am_lte_tx() {}
void rlc_am_lte::rlc_am_lte_tx::set_bsr_callback(bsr_callback_t callback)
{
@ -320,7 +317,7 @@ bool rlc_am_lte::rlc_am_lte_tx::has_data()
return (((do_status() && not status_prohibit_timer.is_running())) || // if we have a status PDU to transmit
(not retx_queue.empty()) || // if we have a retransmission
(tx_sdu != NULL) || // if we are currently transmitting a SDU
(not tx_sdu_queue.is_empty())); // or if there is a SDU queued up for transmission
(tx_sdu_queue.get_n_sdus() != 0)); // or if there is a SDU queued up for transmission
}
/**
@ -345,8 +342,8 @@ void rlc_am_lte::rlc_am_lte_tx::check_sn_reached_max_retx(uint32_t sn)
uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state()
{
std::lock_guard<std::mutex> lock(mutex);
uint32_t n_bytes = 0;
uint32_t n_sdus = 0;
uint32_t n_bytes = 0;
uint32_t n_sdus = 0;
logger.debug("%s Buffer state - do_status=%s, status_prohibit_running=%s (%d/%d)",
RB_NAME,
@ -384,7 +381,7 @@ uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state()
// Bytes needed for tx SDUs
if (tx_window.size() < 1024) {
n_sdus = tx_sdu_queue.size();
n_sdus = tx_sdu_queue.get_n_sdus();
n_bytes += tx_sdu_queue.size_bytes();
if (tx_sdu != NULL) {
n_sdus++;
@ -456,7 +453,21 @@ void rlc_am_lte::rlc_am_lte_tx::discard_sdu(uint32_t discard_sn)
if (!tx_enabled) {
return;
}
logger.warning("Discard SDU not implemented yet");
bool discarded =
tx_sdu_queue.discard_if([&discard_sn](const unique_byte_buffer_t& sdu) { return sdu->md.pdcp_sn == discard_sn; });
if (discarded) {
// remove also from undelivered SDUs queue
logger.info("Discarding SDU with PDCP_SN=%d", discard_sn);
if (not undelivered_sdu_info_queue.has_pdcp_sn(discard_sn)) {
logger.info("PDCP SDU info does not exists for discarded SDU. PDCP_SN=%d", discard_sn);
} else {
undelivered_sdu_info_queue.clear_pdcp_sdu(discard_sn);
}
} else {
logger.info("Could not find SDU to discard. PDCP_SN=%d", discard_sn);
}
}
bool rlc_am_lte::rlc_am_lte_tx::sdu_queue_is_full()
@ -966,7 +977,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
}
// Pull SDUs from queue
while (pdu_space > head_len && tx_sdu_queue.size() > 0 && header.N_li < RLC_AM_WINDOW_SIZE) {
while (pdu_space > head_len && tx_sdu_queue.get_n_sdus() > 0 && header.N_li < RLC_AM_WINDOW_SIZE) {
if (last_li > 0) {
header.li[header.N_li] = last_li;
header.N_li++;
@ -978,7 +989,18 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
}
break;
}
tx_sdu = tx_sdu_queue.read();
do {
tx_sdu = tx_sdu_queue.read();
} while (tx_sdu == nullptr && tx_sdu_queue.size() != 0);
if (tx_sdu == nullptr) {
if (header.N_li > 0) {
header.N_li--;
}
break;
}
to_move = ((pdu_space - head_len) >= tx_sdu->N_bytes) ? tx_sdu->N_bytes : pdu_space - head_len;
memcpy(pdu_ptr, tx_sdu->msg, to_move);
last_li = to_move;
@ -1045,7 +1067,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
}
// Update Tx window
vt_s = (vt_s + 1) % MOD;
vt_s = (vt_s + 1) % MOD;
// Write final header and TX
tx_pdu.buf = std::move(pdu);
@ -1318,9 +1340,7 @@ rlc_am_lte::rlc_am_lte_rx::rlc_am_lte_rx(rlc_am_lte* parent_) :
reordering_timer(parent_->timers->get_unique_timer())
{}
rlc_am_lte::rlc_am_lte_rx::~rlc_am_lte_rx()
{
}
rlc_am_lte::rlc_am_lte_rx::~rlc_am_lte_rx() {}
bool rlc_am_lte::rlc_am_lte_rx::configure(rlc_am_config_t cfg_)
{
@ -1752,8 +1772,8 @@ void rlc_am_lte::rlc_am_lte_rx::write_pdu(uint8_t* payload, const uint32_t nof_b
parent->tx.handle_control_pdu(payload, nof_bytes);
} else {
std::lock_guard<std::mutex> lock(mutex);
rlc_amd_pdu_header_t header = {};
uint32_t payload_len = nof_bytes;
rlc_amd_pdu_header_t header = {};
uint32_t payload_len = nof_bytes;
rlc_am_read_data_pdu_header(&payload, &payload_len, &header);
if (payload_len > nof_bytes) {
logger.info("Dropping corrupted PDU (%d B). Remaining length after header %d B.", nof_bytes, payload_len);
@ -1858,9 +1878,9 @@ int rlc_am_lte::rlc_am_lte_rx::get_status_pdu(rlc_status_pdu_t* status, const ui
int rlc_am_lte::rlc_am_lte_rx::get_status_pdu_length()
{
std::lock_guard<std::mutex> lock(mutex);
rlc_status_pdu_t status = {};
status.ack_sn = vr_ms;
uint32_t i = vr_r;
rlc_status_pdu_t status = {};
status.ack_sn = vr_ms;
uint32_t i = vr_r;
while (RX_MOD_BASE(i) < RX_MOD_BASE(vr_ms) && status.N_nack < RLC_AM_WINDOW_SIZE) {
if (not rx_window.has_sn(i)) {
status.N_nack++;

@ -3413,6 +3413,85 @@ bool reestablish_test()
return SRSRAN_SUCCESS;
}
// This test checks the correct functioning of RLC discard functionality
bool discard_test()
{
const rlc_config_t config = rlc_config_t::default_rlc_am_config();
#if HAVE_PCAP
rlc_pcap pcap;
pcap.open("rlc_am_reestablish_test.pcap", config);
rlc_am_tester tester(&pcap);
#else
rlc_am_tester tester(NULL);
#endif
srsran::timer_handler timers(8);
rlc_am_lte rlc1(srslog::fetch_basic_logger("RLC_AM_1"), 1, &tester, &tester, &timers);
rlc_am_lte rlc2(srslog::fetch_basic_logger("RLC_AM_2"), 1, &tester, &tester, &timers);
srslog::fetch_basic_logger("RLC_AM_1").set_hex_dump_max_size(100);
srslog::fetch_basic_logger("RLC_AM_2").set_hex_dump_max_size(100);
srslog::fetch_basic_logger("RLC").set_hex_dump_max_size(100);
if (not rlc1.configure(config)) {
return -1;
}
if (not rlc2.configure(config)) {
return -1;
}
// Check has_data() after a SDU discard
{
uint32_t num_tx_pdus = 1;
for (uint32_t i = 0; i < num_tx_pdus; ++i) {
// Write SDU
unique_byte_buffer_t sdu = srsran::make_byte_buffer();
TESTASSERT(sdu != nullptr);
sdu->N_bytes = 5;
for (uint32_t k = 0; k < sdu->N_bytes; ++k) {
sdu->msg[k] = i; // Write the index into the buffer
}
sdu->md.pdcp_sn = i;
rlc1.write_sdu(std::move(sdu));
}
}
rlc1.discard_sdu(0); // Try to discard PDCP_SN=1
TESTASSERT(rlc1.has_data() == false);
// Discard an SDU in the midle of the queue and read PDUs after
{
uint32_t num_tx_pdus = 10;
for (uint32_t i = 0; i < num_tx_pdus; ++i) {
// Write SDU
unique_byte_buffer_t sdu = srsran::make_byte_buffer();
TESTASSERT(sdu != nullptr);
sdu->N_bytes = 1;
for (uint32_t k = 0; k < sdu->N_bytes; ++k) {
sdu->msg[k] = i; // Write the index into the buffer
}
sdu->md.pdcp_sn = i;
rlc1.write_sdu(std::move(sdu));
}
}
rlc1.discard_sdu(3); // Try to discard PDCP_SN=1
TESTASSERT(rlc1.has_data() == true);
TESTASSERT(rlc1.get_buffer_state() == 23); // 2 bytes fixed header, 12 , 9 bytes of data,
unique_byte_buffer_t pdu = srsran::make_byte_buffer();
uint32_t len = rlc1.read_pdu(pdu->msg, 50); // enough for all PDUs
pdu->N_bytes = len;
TESTASSERT(23 == len);
srslog::fetch_basic_logger("TEST").info("Received %zd SDUs", tester.sdus.size());
#if HAVE_PCAP
pcap.close();
#endif
return SRSRAN_SUCCESS;
}
int main(int argc, char** argv)
{
// Setup the log message spy to intercept error and warning log entries from RLC
@ -3603,5 +3682,10 @@ int main(int argc, char** argv)
exit(-1);
};
if (discard_test()) {
printf("discard_test failed\n");
exit(-1);
};
return SRSRAN_SUCCESS;
}

Loading…
Cancel
Save