SRSENB: moved phy workers to lte worker pool

master
Xavier Arteaga 4 years ago committed by Andre Puschmann
parent eed9405e40
commit 85afdf8ce3

@ -15,11 +15,12 @@
#include <string.h>
#include "phy_common.h"
#include "../phy_common.h"
#define LOG_EXECTIME
namespace srsenb {
namespace lte {
class cc_worker
{
@ -114,6 +115,7 @@ private:
std::mutex mutex;
};
} // namespace lte
} // namespace srsenb
#endif // SRSENB_CC_WORKER_H

@ -16,11 +16,12 @@
#include <mutex>
#include <string.h>
#include "../phy_common.h"
#include "cc_worker.h"
#include "phy_common.h"
#include "srslte/srslte.h"
namespace srsenb {
namespace lte {
class sf_worker : public srslte::thread_pool::worker
{
@ -68,6 +69,7 @@ private:
srslte_softbuffer_tx_t temp_mbsfn_softbuffer = {};
};
} // namespace lte
} // namespace srsenb
#endif // SRSENB_PHCH_WORKER_H

@ -0,0 +1,55 @@
/*
* Copyright 2013-2020 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#ifndef SRSENB_LTE_WORKER_POOL_H
#define SRSENB_LTE_WORKER_POOL_H
#include "sf_worker.h"
#include "srslte/common/thread_pool.h"
namespace srsenb {
namespace lte {
class worker_pool
{
private:
std::vector<std::unique_ptr<srslte::log_filter> > log_vec;
srslte::thread_pool pool;
std::vector<std::unique_ptr<sf_worker> > workers;
public:
sf_worker* operator[](std::size_t pos) { return workers.at(pos).get(); }
uint32_t get_nof_workers() { return (uint32_t)workers.size(); }
worker_pool(uint32_t max_workers);
bool init(const phy_args_t& args, phy_common* common, srslte::logger* logger, int prio);
sf_worker* wait_worker(uint32_t tti);
sf_worker* wait_worker_id(uint32_t id);
void start_worker(sf_worker* w);
void stop();
};
} // namespace lte
} // namespace srsenb
#endif // SRSENB_LTE_WORKER_POOL_H

@ -14,7 +14,7 @@
#define SRSENB_PHY_H
#include "phy_common.h"
#include "sf_worker.h"
#include "lte/sf_worker.h"
#include "srsenb/hdr/phy/enb_phy_base.h"
#include "srslte/common/log.h"
#include "srslte/common/log_filter.h"
@ -62,6 +62,8 @@ public:
void radio_overflow() override{};
void radio_failure() override{};
void srslte_phy_logger(phy_logger_level_t log_level, char* str);
private:
srslte::phy_cfg_mbsfn_t mbsfn_config = {};
uint32_t nof_workers = 0;
@ -75,11 +77,10 @@ private:
srslte::radio_interface_phy* radio = nullptr;
srslte::logger* logger = nullptr;
std::vector<std::unique_ptr<srslte::log_filter> > log_vec;
srslte::log* log_h = nullptr;
std::unique_ptr<srslte::log_filter> log_h = nullptr;
std::unique_ptr<srslte::log_filter> log_phy_lib_h = nullptr;
srslte::thread_pool workers_pool;
std::vector<sf_worker> workers;
lte::worker_pool lte_workers;
phy_common workers_common;
prach_worker_pool prach;
txrx tx_rx;

@ -42,7 +42,7 @@ struct phy_args_t {
int pusch_max_its = 10;
bool pusch_8bit_decoder = false;
float tx_amplitude = 1.0f;
int nof_phy_threads = 1;
uint32_t nof_phy_threads = 1;
std::string equalizer_mode = "mmse";
float estimator_fil_w = 1.0f;
bool pusch_meas_epre = true;

@ -15,9 +15,8 @@
#include "phy_common.h"
#include "prach_worker.h"
#include "srsenb/hdr/phy/lte/worker_pool.h"
#include "srslte/common/log.h"
#include "srslte/common/thread_pool.h"
#include "srslte/common/threads.h"
#include "srslte/config.h"
#include "srslte/phy/channel/channel.h"
#include "srslte/radio/radio.h"
@ -30,7 +29,7 @@ public:
txrx();
bool init(stack_interface_phy_lte* stack_,
srslte::radio_interface_phy* radio_handler,
srslte::thread_pool* _workers_pool,
lte::worker_pool* _workers_pool,
phy_common* worker_com,
prach_worker_pool* prach_,
srslte::log* log_h,
@ -43,7 +42,7 @@ private:
stack_interface_phy_lte* stack = nullptr;
srslte::radio_interface_phy* radio_h = nullptr;
srslte::log* log_h = nullptr;
srslte::thread_pool* workers_pool = nullptr;
lte::worker_pool* workers_pool = nullptr;
prach_worker_pool* prach = nullptr;
phy_common* worker_com = nullptr;
srslte::channel_ptr ul_channel = nullptr;

@ -186,7 +186,7 @@ void parse_args(all_args_t* args, int argc, char* argv[])
("expert.pusch_8bit_decoder", bpo::value<bool>(&args->phy.pusch_8bit_decoder)->default_value(false), "Use 8-bit for LLR representation and turbo decoder trellis computation (Experimental)")
("expert.pusch_meas_evm", bpo::value<bool>(&args->phy.pusch_meas_evm)->default_value(false), "Enable/Disable PUSCH EVM measure")
("expert.tx_amplitude", bpo::value<float>(&args->phy.tx_amplitude)->default_value(0.6), "Transmit amplitude factor")
("expert.nof_phy_threads", bpo::value<int>(&args->phy.nof_phy_threads)->default_value(3), "Number of PHY threads")
("expert.nof_phy_threads", bpo::value<uint32_t>(&args->phy.nof_phy_threads)->default_value(3), "Number of PHY threads")
("expert.max_prach_offset_us", bpo::value<float>(&args->phy.max_prach_offset_us)->default_value(30), "Maximum allowed RACH offset (in us)")
("expert.equalizer_mode", bpo::value<string>(&args->phy.equalizer_mode)->default_value("mmse"), "Equalizer mode")
("expert.estimator_fil_w", bpo::value<float>(&args->phy.estimator_fil_w)->default_value(0.1), "Chooses the coefficients for the 3-tap channel estimator centered filter.")

@ -6,7 +6,7 @@
# the distribution.
#
set(SOURCES cc_worker.cc phy.cc phy_common.cc phy_ue_db.cc prach_worker.cc sf_worker.cc txrx.cc)
set(SOURCES lte/cc_worker.cc lte/sf_worker.cc lte/worker_pool.cc phy.cc phy_common.cc phy_ue_db.cc prach_worker.cc txrx.cc)
add_library(srsenb_phy STATIC ${SOURCES})
if(ENABLE_GUI AND SRSGUI_FOUND)

@ -14,7 +14,7 @@
#include "srslte/common/threads.h"
#include "srslte/srslte.h"
#include "srsenb/hdr/phy/cc_worker.h"
#include "srsenb/hdr/phy/lte/cc_worker.h"
#define Error(fmt, ...) \
if (SRSLTE_DEBUG_ENABLED) \
@ -44,6 +44,7 @@ using namespace asn1::rrc;
//#define DEBUG_WRITE_FILE
namespace srsenb {
namespace lte {
cc_worker::cc_worker()
{
@ -651,4 +652,5 @@ int cc_worker::read_pucch_d(cf_t* pdsch_d)
return nof_re;
}
} // namespace lte
} // namespace srsenb

@ -14,7 +14,7 @@
#include "srslte/common/threads.h"
#include "srslte/srslte.h"
#include "srsenb/hdr/phy/sf_worker.h"
#include "srsenb/hdr/phy/lte/sf_worker.h"
#define Error(fmt, ...) \
if (SRSLTE_DEBUG_ENABLED) \
@ -58,6 +58,7 @@ using namespace asn1::rrc;
//#define DEBUG_WRITE_FILE
namespace srsenb {
namespace lte {
#ifdef DEBUG_WRITE_FILE
FILE* f;
@ -342,6 +343,7 @@ sf_worker::~sf_worker()
srslte_softbuffer_tx_free(&temp_mbsfn_softbuffer);
}
} // namespace lte
} // namespace srsenb
/***********************************************************

@ -0,0 +1,95 @@
/*
* Copyright 2013-2020 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
/*
* Copyright 2013-2020 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include "srsenb/hdr/phy/lte/worker_pool.h"
namespace srsenb {
namespace lte {
worker_pool::worker_pool(uint32_t max_workers) : pool(max_workers) {}
bool worker_pool::init(const phy_args_t& args, phy_common* common, srslte::logger* logger, int prio)
{
// Create logs
// Create array of pointers to phy_logs
for (uint32_t i = 0; i < args.nof_phy_threads; i++) {
auto* mylog = new srslte::log_filter;
char tmp[16];
sprintf(tmp, "PHY%d", i);
mylog->init(tmp, logger, true);
mylog->set_level(args.log.phy_level);
mylog->set_hex_limit(args.log.phy_hex_limit);
log_vec.push_back(std::unique_ptr<srslte::log_filter>(mylog));
}
// Add workers to workers pool and start threads
for (uint32_t i = 0; i < args.nof_phy_threads; i++) {
auto w = std::unique_ptr<lte::sf_worker>(new sf_worker());
w->init(common, (srslte::log*)log_vec[i].get());
pool.init_worker(i, w.get(), prio);
workers.push_back(std::move(w));
}
return true;
}
void worker_pool::start_worker(sf_worker* w)
{
pool.start_worker(w);
}
sf_worker* worker_pool::wait_worker(uint32_t tti)
{
return (sf_worker*)pool.wait_worker(tti);
}
sf_worker* worker_pool::wait_worker_id(uint32_t id)
{
return (sf_worker*)pool.wait_worker_id(id);
}
void worker_pool::stop()
{
pool.stop();
}
}; // namespace lte
}; // namespace srsenb

@ -40,9 +40,34 @@ using namespace asn1::rrc;
namespace srsenb {
phy::phy(srslte::logger* logger_) :
logger(logger_), workers_pool(MAX_WORKERS), workers(MAX_WORKERS), workers_common(), nof_workers(0)
{}
static void srslte_phy_handler(phy_logger_level_t log_level, void* ctx, char* str)
{
phy* r = (phy*)ctx;
r->srslte_phy_logger(log_level, str);
}
void phy::srslte_phy_logger(phy_logger_level_t log_level, char* str)
{
if (log_phy_lib_h) {
switch (log_level) {
case LOG_LEVEL_INFO_S:
log_phy_lib_h->info(" %s", str);
break;
case LOG_LEVEL_DEBUG_S:
log_phy_lib_h->debug(" %s", str);
break;
case LOG_LEVEL_ERROR_S:
log_phy_lib_h->error(" %s", str);
break;
default:
break;
}
} else {
printf("[PHY_LIB]: %s\n", str);
}
}
phy::phy(srslte::logger* logger_) : logger(logger_), lte_workers(MAX_WORKERS), workers_common(), nof_workers(0) {}
phy::~phy()
{
@ -72,29 +97,21 @@ int phy::init(const phy_args_t& args,
{
mlockall((uint32_t)MCL_CURRENT | (uint32_t)MCL_FUTURE);
// Create array of pointers to phy_logs
for (int i = 0; i < args.nof_phy_threads; i++) {
auto mylog = std::unique_ptr<srslte::log_filter>(new srslte::log_filter);
char tmp[16] = {};
sprintf(tmp, "PHY%d", i);
mylog->init(tmp, logger, true);
mylog->set_level(args.log.phy_level);
mylog->set_hex_limit(args.log.phy_hex_limit);
log_vec.push_back(std::move(mylog));
// Add PHY lib log
if (srslte::log::get_level_from_string(args.log.phy_lib_level) != srslte::LOG_LEVEL_NONE) {
log_phy_lib_h = std::unique_ptr<srslte::log_filter>(new srslte::log_filter);
log_phy_lib_h->init("PHY_LIB", logger, true);
log_phy_lib_h->set_level(args.log.phy_lib_level);
log_phy_lib_h->set_hex_limit(args.log.phy_hex_limit);
srslte_phy_log_register_handler(this, srslte_phy_handler);
}
log_h = log_vec[0].get();
// Add PHY lib log
if (log_vec.at(0)->get_level_from_string(args.log.phy_lib_level) != srslte::LOG_LEVEL_NONE) {
auto lib_log = std::unique_ptr<srslte::log_filter>(new srslte::log_filter);
char tmp[16] = {};
sprintf(tmp, "PHY_LIB");
lib_log->init(tmp, logger, true);
lib_log->set_level(args.log.phy_lib_level);
lib_log->set_hex_limit(args.log.phy_hex_limit);
log_vec.push_back(std::move(lib_log));
} else {
log_vec.push_back(nullptr);
// Create default log
{
log_h = std::unique_ptr<srslte::log_filter>(new srslte::log_filter);
log_h->init("PHY", logger, true);
log_h->set_level(args.log.phy_lib_level);
log_h->set_hex_limit(args.log.phy_hex_limit);
}
radio = radio_;
@ -107,20 +124,17 @@ int phy::init(const phy_args_t& args,
parse_common_config(cfg);
// Add workers to workers pool and start threads
for (uint32_t i = 0; i < nof_workers; i++) {
workers[i].init(&workers_common, log_vec.at(i).get());
workers_pool.init_worker(i, &workers[i], WORKERS_THREAD_PRIO);
}
lte_workers.init(args, &workers_common, logger, WORKERS_THREAD_PRIO);
// For each carrier, initialise PRACH worker
for (uint32_t cc = 0; cc < cfg.phy_cell_cfg.size(); cc++) {
prach_cfg.root_seq_idx = cfg.phy_cell_cfg[cc].root_seq_idx;
prach.init(cc, cfg.phy_cell_cfg[cc].cell, prach_cfg, stack_, log_vec.at(0).get(), PRACH_WORKER_THREAD_PRIO);
prach.init(cc, cfg.phy_cell_cfg[cc].cell, prach_cfg, stack_, log_h.get(), PRACH_WORKER_THREAD_PRIO);
}
prach.set_max_prach_offset_us(args.max_prach_offset_us);
// Warning this must be initialized after all workers have been added to the pool
tx_rx.init(stack_, radio, &workers_pool, &workers_common, &prach, log_vec.at(0).get(), SF_RECV_THREAD_PRIO);
tx_rx.init(stack_, radio, &lte_workers, &workers_common, &prach, log_h.get(), SF_RECV_THREAD_PRIO);
initialized = true;
@ -132,7 +146,7 @@ void phy::stop()
if (initialized) {
tx_rx.stop();
workers_common.stop();
workers_pool.stop();
lte_workers.stop();
prach.stop();
initialized = false;
@ -145,7 +159,7 @@ void phy::rem_rnti(uint16_t rnti)
{
// Remove the RNTI when the TTI finishes, this has a delay up to the pipeline length (3 ms)
for (uint32_t i = 0; i < nof_workers; i++) {
sf_worker* w = (sf_worker*)workers_pool.wait_worker_id(i);
lte::sf_worker* w = lte_workers.wait_worker_id(i);
if (w) {
w->rem_rnti(rnti);
w->release();
@ -160,7 +174,7 @@ void phy::rem_rnti(uint16_t rnti)
int phy::pregen_sequences(uint16_t rnti)
{
for (uint32_t i = 0; i < nof_workers; i++) {
if (workers[i].pregen_sequences(rnti) != SRSLTE_SUCCESS) {
if (lte_workers[i]->pregen_sequences(rnti) != SRSLTE_SUCCESS) {
return SRSLTE_ERROR;
}
}
@ -184,7 +198,7 @@ void phy::get_metrics(std::vector<phy_metrics_t>& metrics)
{
std::vector<phy_metrics_t> metrics_tmp;
for (uint32_t i = 0; i < nof_workers; i++) {
workers[i].get_metrics(metrics_tmp);
lte_workers[i]->get_metrics(metrics_tmp);
metrics.resize(std::max(metrics_tmp.size(), metrics.size()));
for (uint32_t j = 0; j < metrics_tmp.size(); j++) {
metrics[j].dl.n_samples += metrics_tmp[j].dl.n_samples;
@ -228,7 +242,7 @@ void phy::set_config(uint16_t rnti, const phy_rrc_cfg_list_t& phy_cfg_list)
if (config.configured) {
// Add RNTI to all SF workers
for (uint32_t w = 0; w < nof_workers; w++) {
workers[w].add_rnti(rnti, config.enb_cc_idx);
lte_workers[w]->add_rnti(rnti, config.enb_cc_idx);
}
}
}
@ -272,7 +286,7 @@ void phy::configure_mbsfn(srslte::sib2_mbms_t* sib2, srslte::sib13_t* sib13, con
// Start GUI
void phy::start_plot()
{
workers[0].start_plot();
lte_workers[0]->start_plot();
}
} // namespace srsenb

@ -16,7 +16,7 @@
#include "srslte/common/threads.h"
#include "srslte/srslte.h"
#include "srsenb/hdr/phy/sf_worker.h"
#include "srsenb/hdr/phy/lte/sf_worker.h"
#include "srsenb/hdr/phy/txrx.h"
#define Error(fmt, ...) \
@ -43,7 +43,7 @@ txrx::txrx() : thread("TXRX")
bool txrx::init(stack_interface_phy_lte* stack_,
srslte::radio_interface_phy* radio_h_,
srslte::thread_pool* workers_pool_,
lte::worker_pool* workers_pool_,
phy_common* worker_com_,
prach_worker_pool* prach_,
srslte::log* log_h_,
@ -80,7 +80,7 @@ void txrx::stop()
void txrx::run_thread()
{
sf_worker* worker = nullptr;
lte::sf_worker* worker = nullptr;
srslte::rf_buffer_t buffer = {};
srslte::rf_timestamp_t timestamp = {};
uint32_t sf_len = SRSLTE_SF_LEN_PRB(worker_com->get_nof_prb(0));
@ -118,7 +118,12 @@ void txrx::run_thread()
// Main loop
while (running) {
tti = TTI_ADD(tti, 1);
worker = (sf_worker*)workers_pool->wait_worker(tti);
if (log_h) {
log_h->step(tti);
}
worker = workers_pool->wait_worker(tti);
if (worker) {
// Multiple cell buffer mapping
for (uint32_t cc = 0; cc < worker_com->get_nof_carriers(); cc++) {

@ -125,19 +125,16 @@ int phy::init(const phy_args_t& args_)
// Add PHY lib log
if (srslte::log::get_level_from_string(args.log.phy_lib_level) != srslte::LOG_LEVEL_NONE) {
log_phy_lib_h = std::unique_ptr<srslte::log_filter>(new srslte::log_filter);
char tmp[16];
sprintf(tmp, "PHY_LIB");
log_phy_lib_h->init(tmp, logger, true);
log_phy_lib_h->init("PHY_LIB", logger, true);
log_phy_lib_h->set_level(args.log.phy_lib_level);
log_phy_lib_h->set_hex_limit(args.log.phy_hex_limit);
srslte_phy_log_register_handler(this, srslte_phy_handler);
}
// set default logger
{
log_h = std::unique_ptr<srslte::log_filter>(new srslte::log_filter);
char tmp[16];
sprintf(tmp, "PHY_COM");
log_h->init(tmp, logger, true);
log_h->init("PHY", logger, true);
log_h->set_level(args.log.phy_lib_level);
log_h->set_hex_limit(args.log.phy_hex_limit);
}
@ -146,10 +143,6 @@ int phy::init(const phy_args_t& args_)
return false;
}
if (log_phy_lib_h) {
srslte_phy_log_register_handler(this, srslte_phy_handler);
}
is_configured = false;
start();
return true;

Loading…
Cancel
Save