stack,optimization - replaced previous block_queue design for new bounded queue in several places in the enb

master
Francisco 4 years ago committed by Francisco Paisana
parent 28ef5833a2
commit d1236fd62f

@ -13,7 +13,7 @@
#ifndef SRSLTE_MAC_PCAP_BASE_H
#define SRSLTE_MAC_PCAP_BASE_H
#include "srslte/common/block_queue.h"
#include "srslte/adt/circular_buffer.h"
#include "srslte/common/buffer_pool.h"
#include "srslte/common/common.h"
#include "srslte/common/pcap.h"
@ -93,11 +93,11 @@ protected:
virtual void write_pdu(pcap_pdu_t& pdu) = 0;
void run_thread() final;
std::mutex mutex;
srslog::basic_logger& logger;
bool running = false;
block_queue<pcap_pdu_t> queue;
uint16_t ue_id = 0;
std::mutex mutex;
srslog::basic_logger& logger;
bool running = false;
static_block_queue<pcap_pdu_t, 256> queue;
uint16_t ue_id = 0;
private:
void pack_and_queue(uint8_t* payload,

@ -13,6 +13,7 @@
#ifndef SRSLTE_PDU_QUEUE_H
#define SRSLTE_PDU_QUEUE_H
#include "srslte/adt/circular_buffer.h"
#include "srslte/common/block_queue.h"
#include "srslte/common/buffer_pool.h"
#include "srslte/common/log.h"
@ -34,7 +35,7 @@ public:
};
pdu_queue(srslog::basic_logger& logger, uint32_t pool_size = DEFAULT_POOL_SIZE) :
pool(pool_size), callback(NULL), logger(logger)
pool(pool_size), callback(NULL), logger(logger), pdu_q(pool_size)
{}
void init(process_callback* callback);
@ -60,8 +61,8 @@ private:
} pdu_t;
block_queue<pdu_t*> pdu_q;
buffer_pool<pdu_t> pool;
dyn_block_queue<pdu_t*> pdu_q;
buffer_pool<pdu_t> pool;
process_callback* callback;
srslog::basic_logger& logger;

@ -13,7 +13,7 @@
#ifndef SRSLTE_RLC_COMMON_H
#define SRSLTE_RLC_COMMON_H
#include "srslte/common/block_queue.h"
#include "srslte/adt/circular_buffer.h"
#include "srslte/common/logmap.h"
#include "srslte/interfaces/rlc_interface_types.h"
#include "srslte/upper/rlc_metrics.h"
@ -219,13 +219,13 @@ public:
}
pdu_t p;
// Do not block
while (rx_pdu_resume_queue.try_pop(&p)) {
while (rx_pdu_resume_queue.try_pop(p)) {
write_pdu(p.payload, p.nof_bytes);
free(p.payload);
}
unique_byte_buffer_t s;
while (tx_sdu_resume_queue.try_pop(&s)) {
while (tx_sdu_resume_queue.try_pop(s)) {
write_sdu(std::move(s));
}
suspended = false;
@ -303,8 +303,8 @@ private:
uint32_t nof_bytes;
} pdu_t;
block_queue<pdu_t> rx_pdu_resume_queue;
block_queue<unique_byte_buffer_t> tx_sdu_resume_queue{256};
static_block_queue<pdu_t, 256> rx_pdu_resume_queue;
static_block_queue<unique_byte_buffer_t, 256> tx_sdu_resume_queue;
};
} // namespace srslte

@ -58,7 +58,7 @@ uint32_t mac_pcap::close()
// tell writer thread to stop
running = false;
pcap_pdu_t pdu = {};
queue.push(std::move(pdu));
queue.push_blocking(std::move(pdu));
}
wait_thread_finish();

@ -37,7 +37,7 @@ void mac_pcap_base::run_thread()
{
// blocking write until stopped
while (running) {
pcap_pdu_t pdu = queue.wait_pop();
pcap_pdu_t pdu = queue.pop_blocking();
{
std::lock_guard<std::mutex> lock(mutex);
write_pdu(pdu);
@ -47,7 +47,7 @@ void mac_pcap_base::run_thread()
// write remainder of queue
std::lock_guard<std::mutex> lock(mutex);
pcap_pdu_t pdu = {};
while (queue.try_pop(&pdu)) {
while (queue.try_pop(pdu)) {
write_pdu(pdu);
}
}
@ -84,7 +84,7 @@ void mac_pcap_base::pack_and_queue(uint8_t* payload,
// copy payload into PDU buffer
memcpy(pdu.pdu->msg, payload, payload_len);
pdu.pdu->N_bytes = payload_len;
queue.push(std::move(pdu));
queue.push_blocking(std::move(pdu));
} else {
logger.info("Dropping PDU in PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len);
}
@ -119,7 +119,7 @@ void mac_pcap_base::pack_and_queue_nr(uint8_t* payload,
// copy payload into PDU buffer
memcpy(pdu.pdu->msg, payload, payload_len);
pdu.pdu->N_bytes = payload_len;
queue.push(std::move(pdu));
queue.push_blocking(std::move(pdu));
} else {
logger.info("Dropping PDU in NR PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len);
}

@ -71,7 +71,7 @@ uint32_t mac_pcap_net::close()
// tell writer thread to stop
running = false;
pcap_pdu_t pdu = {};
queue.push(std::move(pdu));
queue.push_blocking(std::move(pdu));
}
wait_thread_finish();

@ -58,7 +58,7 @@ void pdu_queue::push(const uint8_t* ptr, uint32_t len, channel_t channel)
pdu_t* pdu = (pdu_t*)ptr;
pdu->len = len;
pdu->channel = channel;
pdu_q.push(pdu);
pdu_q.push_blocking(pdu);
} else {
logger.warning("Error pushing pdu: ptr is empty");
}
@ -69,7 +69,7 @@ bool pdu_queue::process_pdus()
bool have_data = false;
uint32_t cnt = 0;
pdu_t* pdu;
while (pdu_q.try_pop(&pdu)) {
while (pdu_q.try_pop(pdu)) {
if (callback) {
callback->process_pdu(pdu->ptr, pdu->len, pdu->channel);
}
@ -86,7 +86,7 @@ bool pdu_queue::process_pdus()
void pdu_queue::reset()
{
pdu_t* pdu;
while (pdu_q.try_pop(&pdu)) {
while (pdu_q.try_pop(pdu)) {
// nop
}
}

@ -10,6 +10,7 @@
*
*/
#include "srslte/common/block_queue.h"
#include "srslte/common/crash_handler.h"
#include "srslte/common/log_filter.h"
#include "srslte/common/rlc_pcap.h"
@ -459,7 +460,7 @@ void stress_test(stress_test_args_t args)
if (args.rat == "LTE") {
if (args.mode == "AM") {
// config RLC AM bearer
cnfg_ = rlc_config_t::default_rlc_am_config();
cnfg_ = rlc_config_t::default_rlc_am_config();
cnfg_.am.max_retx_thresh = args.max_retx;
} else if (args.mode == "UM") {
// config UM bearer

@ -162,7 +162,7 @@ private:
// state
bool started = false;
srslte::block_queue<stack_metrics_t> pending_stack_metrics;
srslte::dyn_block_queue<stack_metrics_t> pending_stack_metrics;
};
} // namespace srsenb

@ -140,8 +140,8 @@ private:
std::map<uint16_t, std::unique_ptr<ue> > ue_db, ues_to_rem;
uint16_t last_rnti = 70;
srslte::block_queue<std::unique_ptr<ue> > ue_pool; ///< Pool of pre-allocated UE objects
void prealloc_ue(uint32_t nof_ue);
srslte::static_block_queue<std::unique_ptr<ue>, 32> ue_pool; ///< Pool of pre-allocated UE objects
void prealloc_ue(uint32_t nof_ue);
uint8_t* assemble_rar(sched_interface::dl_sched_rar_grant_t* grants,
uint32_t enb_cc_idx,

@ -17,8 +17,8 @@
#include "rrc_cell_cfg.h"
#include "rrc_metrics.h"
#include "srsenb/hdr/stack/upper/common_enb.h"
#include "srslte/adt/circular_buffer.h"
#include "srslte/adt/mem_pool.h"
#include "srslte/common/block_queue.h"
#include "srslte/common/buffer_pool.h"
#include "srslte/common/common.h"
#include "srslte/common/logmap.h"
@ -28,7 +28,6 @@
#include "srslte/interfaces/enb_rrc_interfaces.h"
#include "srslte/srslog/srslog.h"
#include <map>
#include <queue>
namespace srsenb {
@ -190,8 +189,8 @@ private:
const static uint32_t LCID_ACT_USER = 0xffff0004;
const static uint32_t LCID_RTX_USER = 0xffff0005;
bool running = false;
srslte::block_queue<rrc_pdu> rx_pdu_queue;
bool running = false;
srslte::dyn_block_queue<rrc_pdu> rx_pdu_queue;
asn1::rrc::mcch_msg_s mcch;
bool enable_mbms = false;

@ -36,7 +36,8 @@ enb_stack_lte::enb_stack_lte(srslte::logger* logger_, srslog::sink& log_sink) :
s1ap(&task_sched, s1ap_logger),
rrc(&task_sched),
logger(logger_),
mac_pcap()
mac_pcap(),
pending_stack_metrics(64)
{
get_background_workers().set_nof_workers(2);
enb_task_queue = task_sched.make_task_queue();
@ -218,12 +219,14 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics)
}
rrc.get_metrics(metrics.rrc);
s1ap.get_metrics(metrics.s1ap);
pending_stack_metrics.push(metrics);
if (not pending_stack_metrics.try_push(metrics)) {
stack_logger.error("Unable to push metrics to queue");
}
});
if (ret.first) {
// wait for result
*metrics = pending_stack_metrics.wait_pop();
*metrics = pending_stack_metrics.pop_blocking();
return true;
}
return false;

@ -461,7 +461,7 @@ uint16_t mac::allocate_ue()
logger.error("Ignoring RACH attempt. UE pool empty.");
return SRSLTE_INVALID_RNTI;
}
std::unique_ptr<ue> ue_ptr = ue_pool.wait_pop();
std::unique_ptr<ue> ue_ptr = ue_pool.pop_blocking();
uint16_t rnti = ue_ptr->get_rnti();
// Set PCAP if available
@ -562,7 +562,10 @@ void mac::prealloc_ue(uint32_t nof_ue)
for (uint32_t i = 0; i < nof_ue; i++) {
std::unique_ptr<ue> ptr = std::unique_ptr<ue>(
new ue(allocate_rnti(), args.nof_prb, &scheduler, rrc_h, rlc_h, phy_h, log_h, logger, cells.size()));
ue_pool.push(std::move(ptr));
if (not ue_pool.try_push(std::move(ptr))) {
logger.info("Cannot preallocate more UEs as pool is full");
return;
}
}
}

@ -30,7 +30,8 @@ using namespace asn1::rrc;
namespace srsenb {
rrc::rrc(srslte::task_sched_handle task_sched_) : logger(srslog::fetch_basic_logger("RRC")), task_sched(task_sched_)
rrc::rrc(srslte::task_sched_handle task_sched_) :
logger(srslog::fetch_basic_logger("RRC")), task_sched(task_sched_), rx_pdu_queue(64)
{
pending_paging.clear();
ue_pool.reserve(16);
@ -89,7 +90,7 @@ void rrc::stop()
if (running) {
running = false;
rrc_pdu p = {0, LCID_EXIT, nullptr};
rx_pdu_queue.push(std::move(p));
rx_pdu_queue.push_blocking(std::move(p));
}
users.clear();
}
@ -127,13 +128,17 @@ uint8_t* rrc::read_pdu_bcch_dlsch(const uint8_t cc_idx, const uint32_t sib_index
void rrc::set_activity_user(uint16_t rnti)
{
rrc_pdu p = {rnti, LCID_ACT_USER, nullptr};
rx_pdu_queue.push(std::move(p));
if (not rx_pdu_queue.try_push(std::move(p))) {
logger.error("Failed to push UE activity command to RRC queue");
}
}
void rrc::rem_user_thread(uint16_t rnti)
{
rrc_pdu p = {rnti, LCID_REM_USER, nullptr};
rx_pdu_queue.push(std::move(p));
if (not rx_pdu_queue.try_push(std::move(p))) {
logger.error("Failed to push UE remove command to RRC queue");
}
}
uint32_t rrc::get_nof_users()
@ -144,7 +149,9 @@ uint32_t rrc::get_nof_users()
void rrc::max_retx_attempted(uint16_t rnti)
{
rrc_pdu p = {rnti, LCID_RTX_USER, nullptr};
rx_pdu_queue.push(std::move(p));
if (not rx_pdu_queue.try_push(std::move(p))) {
logger.error("Failed to push max Retx event to RRC queue");
}
}
// This function is called from PRACH worker (can wait)
@ -241,7 +248,9 @@ void rrc::send_rrc_connection_reject(uint16_t rnti)
void rrc::write_pdu(uint16_t rnti, uint32_t lcid, srslte::unique_byte_buffer_t pdu)
{
rrc_pdu p = {rnti, lcid, std::move(pdu)};
rx_pdu_queue.push(std::move(p));
if (not rx_pdu_queue.try_push(std::move(p))) {
logger.error("Failed to push Release command to RRC queue");
}
}
/*******************************************************************************
@ -276,7 +285,9 @@ void rrc::write_dl_info(uint16_t rnti, srslte::unique_byte_buffer_t sdu)
void rrc::release_complete(uint16_t rnti)
{
rrc_pdu p = {rnti, LCID_REL_USER, nullptr};
rx_pdu_queue.push(std::move(p));
if (not rx_pdu_queue.try_push(std::move(p))) {
logger.error("Failed to push Release command to RRC queue");
}
}
bool rrc::setup_ue_ctxt(uint16_t rnti, const asn1::s1ap::init_context_setup_request_s& msg)
@ -963,7 +974,7 @@ void rrc::tti_clock()
{
// pop cmds from queue
rrc_pdu p;
while (rx_pdu_queue.try_pop(&p)) {
while (rx_pdu_queue.try_pop(p)) {
// print Rx PDU
if (p.pdu != nullptr) {
logger.info(p.pdu->msg, p.pdu->N_bytes, "Rx %s PDU", to_string((rb_id_t)p.lcid));

Loading…
Cancel
Save