created special class to manage mac::ue currently allocated rx UL buffers. This class avoids mallocs

master
Francisco 4 years ago committed by Francisco Paisana
parent c6fff54f9e
commit d27e0be609

@ -31,11 +31,11 @@ public:
bounded_vector() = default;
template <typename std::enable_if<std::is_default_constructible<T>::value, int>::type = 0>
bounded_vector(size_type N)
explicit bounded_vector(size_type N)
{
append(N);
}
template <typename U, typename std::enable_if<std::is_constructible<T, U>::value, int>::type = 0>
template <typename U, typename std::enable_if<std::is_constructible<T, const U&>::value, int>::type = 0>
bounded_vector(size_type N, const U& init_val)
{
append(N, T(init_val));

@ -18,10 +18,12 @@
#include "srslte/common/block_queue.h"
#include "srslte/common/mac_pcap.h"
#include "srslte/common/mac_pcap_net.h"
#include "srslte/common/tti_point.h"
#include "srslte/interfaces/sched_interface.h"
#include "srslte/mac/pdu.h"
#include "srslte/mac/pdu_queue.h"
#include "srslte/srslog/srslog.h"
#include "ta.h"
#include <pthread.h>
#include <vector>
@ -32,6 +34,34 @@ class rrc_interface_mac;
class rlc_interface_mac;
class phy_interface_stack_lte;
class cc_used_buffers_map
{
public:
explicit cc_used_buffers_map(srslte::pdu_queue& shared_pdu_queue_);
uint8_t* request_pdu(tti_point tti, uint32_t len);
bool push_pdu(tti_point tti, uint32_t len);
void clear_old_pdus(tti_point current_tti);
void remove_pdu(tti_point tti);
bool try_deallocate_pdu(tti_point tti);
void clear();
uint8_t*& operator[](tti_point tti);
bool has_tti(tti_point tti) const;
private:
srslog::basic_logger* logger;
srslte::pdu_queue* shared_pdu_queue;
srslte::circular_array<std::pair<tti_point, uint8_t*>, SRSLTE_FDD_NOF_HARQ * 2> pdu_map;
};
class cc_buffer_handler
{
public:
@ -40,7 +70,9 @@ public:
// List of Rx softbuffers for all HARQ processes of one carrier
using cc_softbuffer_rx_list_t = std::vector<srslte_softbuffer_rx_t>;
cc_buffer_handler();
explicit cc_buffer_handler(srslte::pdu_queue& shared_pdu_queue_);
cc_buffer_handler(cc_buffer_handler&&) noexcept = default;
cc_buffer_handler& operator=(cc_buffer_handler&&) noexcept = default;
~cc_buffer_handler();
void reset();
@ -57,7 +89,7 @@ public:
{
return tx_payload_buffer[harq_pid][tb].get();
}
std::map<uint32_t, uint8_t*>& get_rx_used_buffers() { return rx_used_buffers; }
cc_used_buffers_map& get_rx_used_buffers() { return rx_used_buffers; }
private:
// args
@ -66,9 +98,9 @@ private:
uint32_t nof_tx_harq_proc;
// buffers
cc_softbuffer_tx_list_t softbuffer_tx_list; ///< List of softbuffer lists for Tx
cc_softbuffer_rx_list_t softbuffer_rx_list; ///< List of softbuffer lists for Rx
std::map<uint32_t, uint8_t*> rx_used_buffers;
cc_softbuffer_tx_list_t softbuffer_tx_list; ///< List of softbuffer lists for Tx
cc_softbuffer_rx_list_t softbuffer_rx_list; ///< List of softbuffer lists for Rx
cc_used_buffers_map rx_used_buffers;
// One buffer per TB per HARQ process and per carrier is needed for each UE.
std::array<std::array<srslte::unique_byte_buffer_t, SRSLTE_MAX_TB>, SRSLTE_FDD_NOF_HARQ> tx_payload_buffer;

@ -23,7 +23,101 @@
namespace srsenb {
cc_buffer_handler::cc_buffer_handler()
cc_used_buffers_map::cc_used_buffers_map(srslte::pdu_queue& shared_pdu_queue_) :
pdu_map(), shared_pdu_queue(&shared_pdu_queue_), logger(&srslog::fetch_basic_logger("MAC"))
{}
bool cc_used_buffers_map::push_pdu(tti_point tti, uint32_t len)
{
if (not has_tti(tti)) {
return false;
}
auto& pdu_pair = pdu_map[tti.to_uint()];
if (len > 0) {
shared_pdu_queue->push(pdu_pair.second, len);
} else {
logger->error("Error pushing PDU: null length");
}
// clear entry in map
pdu_pair.first = tti_point();
pdu_pair.second = nullptr;
return len > 0;
}
uint8_t* cc_used_buffers_map::request_pdu(tti_point tti, uint32_t len)
{
if (pdu_map[tti.to_uint()].second != nullptr) {
logger->error("UE buffers: buffer for tti=%d already allocated", tti.to_uint());
return nullptr;
}
uint8_t* pdu = shared_pdu_queue->request(len);
if (pdu == nullptr) {
logger->error("UE buffers: Requesting buffer from pool");
return nullptr;
}
pdu_map[tti.to_uint()].first = tti;
pdu_map[tti.to_uint()].second = pdu;
return pdu;
}
void cc_used_buffers_map::clear_old_pdus(tti_point current_tti)
{
static const uint32_t old_tti_threshold = SRSLTE_FDD_NOF_HARQ + 4;
tti_point max_tti{current_tti - old_tti_threshold};
for (auto& pdu_pair : pdu_map) {
if (pdu_pair.second != nullptr and pdu_pair.first < max_tti) {
logger->warning("UE buffers: Removing old buffer tti=%d, interval=%d",
pdu_pair.first.to_uint(),
current_tti - pdu_pair.first);
remove_pdu(pdu_pair.first);
}
}
}
void cc_used_buffers_map::remove_pdu(tti_point tti)
{
auto& pdu_pair = pdu_map[tti.to_uint()];
assert(pdu_pair.second != nullptr && "cannot remove inexistent PDU");
// return pdus back to the queue
shared_pdu_queue->deallocate(pdu_pair.second);
// clear entry in map
pdu_pair.first = tti_point();
pdu_pair.second = nullptr;
}
bool cc_used_buffers_map::try_deallocate_pdu(tti_point tti)
{
if (pdu_map[tti.to_uint()].second == nullptr) {
remove_pdu(tti);
return true;
}
return false;
}
void cc_used_buffers_map::clear()
{
for (auto& pdu : pdu_map) {
remove_pdu(pdu.first);
}
}
uint8_t*& cc_used_buffers_map::operator[](tti_point tti)
{
assert(has_tti(tti) && "Trying to access buffer that does not exist");
return pdu_map[tti.to_uint()].second;
}
bool cc_used_buffers_map::has_tti(tti_point tti) const
{
return pdu_map[tti.to_uint()].first == tti and pdu_map[tti.to_uint()].second != nullptr;
}
////////////////
cc_buffer_handler::cc_buffer_handler(srslte::pdu_queue& shared_pdu_queue_) : rx_used_buffers(shared_pdu_queue_)
{
for (auto& harq_buffers : tx_payload_buffer) {
for (srslte::unique_byte_buffer_t& tb_buffer : harq_buffers) {
@ -114,9 +208,12 @@ ue::ue(uint16_t rnti_,
pdus(logger),
nof_rx_harq_proc(nof_rx_harq_proc_),
nof_tx_harq_proc(nof_tx_harq_proc_),
ta_fsm(this),
cc_buffers(nof_cells_)
ta_fsm(this)
{
cc_buffers.reserve(nof_cells_);
for (size_t i = 0; i < nof_cells_; ++i) {
cc_buffers.emplace_back(pdus);
}
pdus.init(this);
// Allocate buffer for PCell
@ -125,14 +222,9 @@ ue::ue(uint16_t rnti_,
ue::~ue()
{
{
std::unique_lock<std::mutex> lock(rx_buffers_mutex);
for (auto& cc : cc_buffers) {
for (auto& q : cc.get_rx_used_buffers()) {
pdus.deallocate(q.second);
}
cc.get_rx_used_buffers().clear();
}
std::unique_lock<std::mutex> lock(rx_buffers_mutex);
for (auto& cc : cc_buffers) {
cc.get_rx_used_buffers().clear();
}
}
@ -182,17 +274,7 @@ uint8_t* ue::request_buffer(uint32_t tti, uint32_t ue_cc_idx, const uint32_t len
std::unique_lock<std::mutex> lock(rx_buffers_mutex);
uint8_t* pdu = nullptr;
if (len > 0) {
// Deallocate oldest buffer if we didn't deallocate it
if (!cc_buffers.at(ue_cc_idx).get_rx_used_buffers().count(tti)) {
pdu = pdus.request(len);
if (pdu) {
cc_buffers.at(ue_cc_idx).get_rx_used_buffers().emplace(tti, pdu);
} else {
logger.error("UE buffers: Requesting buffer from pool");
}
} else {
logger.error("UE buffers: buffer for tti=%d already allocated", tti);
}
pdu = cc_buffers.at(ue_cc_idx).get_rx_used_buffers().request_pdu(tti_point(tti), len);
} else {
logger.error("UE buffers: Requesting buffer for zero bytes");
}
@ -205,20 +287,7 @@ void ue::clear_old_buffers(uint32_t tti)
// remove old buffers
for (auto& cc : cc_buffers) {
auto& rx_buffer_cc = cc.get_rx_used_buffers();
for (auto it = rx_buffer_cc.begin(); it != rx_buffer_cc.end();) {
if (srslte_tti_interval(tti, it->first) > 20 && srslte_tti_interval(tti, it->first) < 500) {
logger.warning("UE buffers: Removing old buffer tti=%d, rnti=%d, now is %d, interval=%d",
it->first,
rnti,
tti,
srslte_tti_interval(tti, it->first));
pdus.deallocate(it->second);
it = rx_buffer_cc.erase(it);
} else {
++it;
}
}
cc.get_rx_used_buffers().clear_old_pdus(tti_point{tti});
}
}
@ -353,11 +422,7 @@ void ue::process_pdu(uint8_t* pdu, uint32_t nof_bytes, srslte::pdu_queue::channe
void ue::deallocate_pdu(uint32_t tti, uint32_t ue_cc_idx)
{
std::unique_lock<std::mutex> lock(rx_buffers_mutex);
if (cc_buffers.at(ue_cc_idx).get_rx_used_buffers().count(tti)) {
pdus.deallocate(cc_buffers.at(ue_cc_idx).get_rx_used_buffers().at(tti));
cc_buffers.at(ue_cc_idx).get_rx_used_buffers().erase(tti);
} else {
if (not cc_buffers.at(ue_cc_idx).get_rx_used_buffers().try_deallocate_pdu(tti_point(tti))) {
logger.warning("UE buffers: Null RX PDU pointer in deallocate_pdu for rnti=0x%x pid=%d cc_idx=%d",
rnti,
tti % nof_rx_harq_proc,
@ -368,18 +433,9 @@ void ue::deallocate_pdu(uint32_t tti, uint32_t ue_cc_idx)
void ue::push_pdu(uint32_t tti, uint32_t ue_cc_idx, uint32_t len)
{
std::unique_lock<std::mutex> lock(rx_buffers_mutex);
if (cc_buffers.at(ue_cc_idx).get_rx_used_buffers().count(tti)) {
if (len > 0) {
pdus.push(cc_buffers.at(ue_cc_idx).get_rx_used_buffers().at(tti), len);
} else {
logger.error("Error pushing PDU: null length");
}
cc_buffers.at(ue_cc_idx).get_rx_used_buffers().erase(tti);
} else {
logger.warning("UE buffers: Null RX PDU pointer in push_pdu for rnti=0x%x pid=%d cc_idx=%d",
rnti,
tti % nof_rx_harq_proc,
ue_cc_idx);
if (not cc_buffers.at(ue_cc_idx).get_rx_used_buffers().push_pdu(tti_point(tti), len)) {
logger.warning(
"UE buffers: Failed to push RX PDU for rnti=0x%x pid=%d cc_idx=%d", rnti, tti % nof_rx_harq_proc, ue_cc_idx);
}
}

Loading…
Cancel
Save