Refactored worker_end mechanism for concurrent workers

master
Xavier Arteaga 4 years ago committed by Xavier Arteaga
parent e833751031
commit b57df4db10

@ -18,19 +18,40 @@
namespace srsran {
/**
* @brief Descibes a physical layer common interface
*/
class phy_common_interface
{
public:
/**
* @brief Describes a worker context
*/
struct worker_context_t {
uint32_t sf_idx = 0; ///< Subframe index
void* worker_ptr = nullptr; ///< Worker pointer for wait/release semaphore
bool last = false; ///< Indicates this worker is the last one in the sub-frame processing
srsran::rf_timestamp_t tx_time = {}; ///< Transmit time, used only by last worker
void copy(const worker_context_t& other)
{
sf_idx = other.sf_idx;
worker_ptr = other.worker_ptr;
last = other.last;
tx_time.copy(other.tx_time);
}
worker_context_t() = default;
worker_context_t(const worker_context_t& other) { copy(other); }
};
/**
* @brief Common PHY interface for workers to indicate they ended
* @param h Worker pointer used as unique identifier for synchronising Tx
* @param w_ctx Worker context
* @param tx_enable Indicates whether the buffer has baseband samples to transmit
* @param buffer Baseband buffer
* @param tx_time Transmit timestamp
* @param is_nr Indicates whether the worker is NR or not
*/
virtual void
worker_end(void* h, bool tx_enable, srsran::rf_buffer_t& buffer, srsran::rf_timestamp_t& tx_time, bool is_nr) = 0;
virtual void worker_end(const worker_context_t& w_ctx, const bool& tx_enable, srsran::rf_buffer_t& buffer) = 0;
};
} // namespace srsran

@ -112,6 +112,26 @@ public:
{
sample_buffer.at(logical_ch * nof_antennas + port_idx) = ptr;
}
void set_combine(const uint32_t& channel_idx, cf_t* ptr)
{
if (sample_buffer.at(channel_idx) == nullptr) {
sample_buffer.at(channel_idx) = ptr;
} else if (ptr != nullptr) {
srsran_vec_sum_ccc(ptr, sample_buffer.at(channel_idx), sample_buffer.at(channel_idx), nof_samples);
}
}
void set_combine(const uint32_t& logical_ch, const uint32_t& port_idx, const uint32_t& nof_antennas, cf_t* ptr)
{
set_combine(logical_ch * nof_antennas + port_idx, ptr);
}
void set_combine(const rf_buffer_interface& other)
{
// Take the other number of samples always
set_nof_samples(other.get_nof_samples());
for (uint32_t ch = 0; ch < SRSRAN_MAX_CHANNELS; ch++) {
set_combine(ch, other.get(ch));
}
}
void** to_void() override { return (void**)sample_buffer.data(); }
cf_t** to_cf_t() override { return sample_buffer.data(); }
uint32_t size() override { return nof_subframes * SRSRAN_SF_LEN_MAX; }

@ -32,7 +32,7 @@ public:
void init(phy_common* phy);
cf_t* get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx);
void set_time(uint32_t tti_, const srsran::rf_timestamp_t& tx_time_);
void set_context(const srsran::phy_common_interface::worker_context_t& w_ctx);
int add_rnti(uint16_t rnti, uint32_t cc_idx);
void rem_rnti(uint16_t rnti);
@ -59,10 +59,9 @@ private:
bool running = false;
std::mutex work_mutex;
uint32_t tti_rx = 0, tti_tx_dl = 0, tti_tx_ul = 0;
srsran::rf_timestamp_t tx_time = {};
std::vector<std::unique_ptr<cc_worker> > cc_workers;
uint32_t tti_rx = 0, tti_tx_dl = 0, tti_tx_ul = 0;
std::vector<std::unique_ptr<cc_worker> > cc_workers;
srsran::phy_common_interface::worker_context_t context = {};
srsran_softbuffer_tx_t temp_mbsfn_softbuffer = {};
};

@ -36,6 +36,7 @@ public:
uint32_t nof_max_prb = SRSRAN_MAX_PRB_NR;
uint32_t nof_tx_ports = 1;
uint32_t nof_rx_ports = 1;
uint32_t rf_port = 0;
uint32_t pusch_max_nof_iter = 10;
};
@ -50,7 +51,7 @@ public:
cf_t* get_buffer_rx(uint32_t antenna_idx);
cf_t* get_buffer_tx(uint32_t antenna_idx);
uint32_t get_buffer_len();
void set_time(const uint32_t& tti, const srsran::rf_timestamp_t& timestamp);
void set_context(const srsran::phy_common_interface::worker_context_t& w_ctx);
private:
/**
@ -74,16 +75,17 @@ private:
stack_interface_phy_nr& stack;
srslog::basic_logger& logger;
uint32_t sf_len = 0;
uint32_t cell_index = 0;
srsran_slot_cfg_t dl_slot_cfg = {};
srsran_slot_cfg_t ul_slot_cfg = {};
srsran_pdcch_cfg_nr_t pdcch_cfg = {};
srsran::rf_timestamp_t tx_time = {};
srsran_gnb_dl_t gnb_dl = {};
srsran_gnb_ul_t gnb_ul = {};
std::vector<cf_t*> tx_buffer; ///< Baseband transmit buffers
std::vector<cf_t*> rx_buffer; ///< Baseband receive buffers
uint32_t sf_len = 0;
uint32_t cell_index = 0;
uint32_t rf_port = 0;
srsran_slot_cfg_t dl_slot_cfg = {};
srsran_slot_cfg_t ul_slot_cfg = {};
srsran::phy_common_interface::worker_context_t context = {};
srsran_pdcch_cfg_nr_t pdcch_cfg = {};
srsran_gnb_dl_t gnb_dl = {};
srsran_gnb_ul_t gnb_ul = {};
std::vector<cf_t*> tx_buffer; ///< Baseband transmit buffers
std::vector<cf_t*> rx_buffer; ///< Baseband receive buffers
};
} // namespace nr

@ -57,11 +57,7 @@ public:
* @param tx_time timestamp to transmit samples
* @param is_nr flag is true if it is called from NR
*/
void worker_end(void* tx_sem_id,
bool tx_enable,
srsran::rf_buffer_t& buffer,
srsran::rf_timestamp_t& tx_time,
bool is_nr) override;
void worker_end(const worker_context_t& w_ctx, const bool& tx_enable, srsran::rf_buffer_t& buffer) override;
// Common objects
phy_args_t params = {};
@ -236,10 +232,9 @@ private:
uint8_t mch_table[40] = {};
uint8_t mcch_table[10] = {};
uint32_t mch_period_stop = 0;
srsran::rf_buffer_t tx_buffer = {};
bool is_mch_subframe(srsran_mbsfn_cfg_t* cfg, uint32_t phy_tti);
bool is_mcch_subframe(srsran_mbsfn_cfg_t* cfg, uint32_t phy_tti);
srsran::rf_buffer_t nr_tx_buffer;
bool nr_tx_buffer_ready = false;
};
} // namespace srsenb

@ -101,16 +101,16 @@ cf_t* sf_worker::get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx)
return cc_workers[cc_idx]->get_buffer_rx(antenna_idx);
}
void sf_worker::set_time(uint32_t tti_, const srsran::rf_timestamp_t& tx_time_)
void sf_worker::set_context(const srsran::phy_common_interface::worker_context_t& w_ctx)
{
tti_rx = tti_;
tti_rx = w_ctx.sf_idx;
tti_tx_dl = TTI_ADD(tti_rx, FDD_HARQ_DELAY_UL_MS);
tti_tx_ul = TTI_RX_ACK(tti_rx);
tx_time.copy(tx_time_);
context.copy(w_ctx);
for (auto& w : cc_workers) {
w->set_tti(tti_);
w->set_tti(w_ctx.sf_idx);
}
}
@ -147,14 +147,10 @@ void sf_worker::work_imp()
// Get Transmission buffers
srsran::rf_buffer_t tx_buffer = {};
for (uint32_t cc = 0; cc < phy->get_nof_carriers_lte(); cc++) {
for (uint32_t ant = 0; ant < phy->get_nof_ports(0); ant++) {
tx_buffer.set(cc, ant, phy->get_nof_ports(0), cc_workers[cc]->get_buffer_tx(ant));
}
}
tx_buffer.set_nof_samples(SRSRAN_SF_LEN_PRB(phy->get_nof_prb(0)));
if (!running) {
phy->worker_end(this, true, tx_buffer, tx_time, false);
phy->worker_end(context, true, tx_buffer);
return;
}
@ -192,14 +188,14 @@ void sf_worker::work_imp()
if (sf_type == SRSRAN_SF_NORM) {
if (stack->get_dl_sched(tti_tx_dl, dl_grants) < 0) {
Error("Getting DL scheduling from MAC");
phy->worker_end(this, false, tx_buffer, tx_time, false);
phy->worker_end(context, true, tx_buffer);
return;
}
} else {
dl_grants[0].cfi = mbsfn_cfg.non_mbsfn_region_length;
if (stack->get_mch_sched(tti_tx_dl, mbsfn_cfg.is_mcch, dl_grants)) {
Error("Getting MCH packets from MAC");
phy->worker_end(this, false, tx_buffer, tx_time, false);
phy->worker_end(context, true, tx_buffer);
return;
}
}
@ -207,7 +203,7 @@ void sf_worker::work_imp()
// Get UL scheduling for the TX TTI from MAC
if (stack->get_ul_sched(tti_tx_ul, ul_grants_tx) < 0) {
Error("Getting UL scheduling from MAC");
phy->worker_end(this, false, tx_buffer, tx_time, false);
phy->worker_end(context, true, tx_buffer);
return;
}
@ -232,9 +228,15 @@ void sf_worker::work_imp()
// Save grants
phy->set_ul_grants(tti_tx_ul, ul_grants_tx);
// Set or combine RF ports
for (uint32_t cc = 0; cc < phy->get_nof_carriers_lte(); cc++) {
for (uint32_t ant = 0; ant < phy->get_nof_ports(0); ant++) {
tx_buffer.set_combine(phy->get_rf_port(cc), ant, phy->get_nof_ports(0), cc_workers[cc]->get_buffer_tx(ant));
}
}
Debug("Sending to radio");
tx_buffer.set_nof_samples(SRSRAN_SF_LEN_PRB(phy->get_nof_prb(0)));
phy->worker_end(this, true, tx_buffer, tx_time, false);
phy->worker_end(context, true, tx_buffer);
#ifdef DEBUG_WRITE_FILE
fwrite(signal_buffer_tx, SRSRAN_SF_LEN_PRB(phy->cell.nof_prb) * sizeof(cf_t), 1, f);

@ -18,7 +18,9 @@ namespace nr {
slot_worker::slot_worker(srsran::phy_common_interface& common_,
stack_interface_phy_nr& stack_,
srslog::basic_logger& logger_) :
common(common_), stack(stack_), logger(logger_)
common(common_),
stack(stack_),
logger(logger_)
{
// Do nothing
}
@ -30,8 +32,7 @@ bool slot_worker::init(const args_t& args)
// Copy common configurations
cell_index = args.cell_index;
// FIXME:
// pdcch_cfg = args.pdcch_cfg;
rf_port = args.rf_port;
// Allocate Tx buffers
tx_buffer.resize(args.nof_tx_ports);
@ -124,12 +125,12 @@ uint32_t slot_worker::get_buffer_len()
return sf_len;
}
void slot_worker::set_time(const uint32_t& tti, const srsran::rf_timestamp_t& timestamp)
void slot_worker::set_context(const srsran::phy_common_interface::worker_context_t& w_ctx)
{
logger.set_context(tti);
ul_slot_cfg.idx = tti;
dl_slot_cfg.idx = TTI_ADD(tti, FDD_HARQ_DELAY_UL_MS);
tx_time.copy(timestamp);
logger.set_context(w_ctx.sf_idx);
ul_slot_cfg.idx = w_ctx.sf_idx;
dl_slot_cfg.idx = TTI_ADD(w_ctx.sf_idx, FDD_HARQ_DELAY_UL_MS);
context.copy(w_ctx);
}
bool slot_worker::work_ul()
@ -326,27 +327,26 @@ void slot_worker::work_imp()
stack.slot_indication(dl_slot_cfg);
// Get Transmission buffers
uint32_t nof_ant = (uint32_t)tx_buffer.size();
srsran::rf_buffer_t tx_rf_buffer = {};
for (uint32_t i = 0; i < (uint32_t)tx_buffer.size(); i++) {
tx_rf_buffer.set(i, tx_buffer[i]);
}
// Set number of samples
tx_rf_buffer.set_nof_samples(sf_len);
for (uint32_t a = 0; a < nof_ant; a++) {
tx_rf_buffer.set(rf_port, a, nof_ant, tx_buffer[a]);
}
// Process uplink
if (not work_ul()) {
common.worker_end(this, false, tx_rf_buffer, tx_time, true);
common.worker_end(context, false, tx_rf_buffer);
return;
}
// Process downlink
if (not work_dl()) {
common.worker_end(this, false, tx_rf_buffer, tx_time, true);
common.worker_end(context, false, tx_rf_buffer);
return;
}
common.worker_end(this, true, tx_rf_buffer, tx_time, true);
common.worker_end(context, true, tx_rf_buffer);
}
bool slot_worker::set_common_cfg(const srsran_carrier_nr_t& carrier, const srsran_pdcch_cfg_nr_t& pdcch_cfg_)
{

@ -52,6 +52,7 @@ bool worker_pool::init(const args_t& args, const phy_cell_cfg_list_nr_t& cell_li
w_args.nof_max_prb = cell_list[cell_index].carrier.nof_prb;
w_args.nof_tx_ports = cell_list[cell_index].carrier.max_mimo_layers;
w_args.nof_rx_ports = cell_list[cell_index].carrier.max_mimo_layers;
w_args.rf_port = cell_list[cell_index].rf_port;
w_args.pusch_max_nof_iter = args.pusch_max_nof_iter;
if (not w->init(w_args)) {

@ -104,42 +104,37 @@ void phy_common::set_ul_grants(uint32_t tti, const stack_interface_phy_lte::ul_s
* Each worker uses this function to indicate that all processing is done and data is ready for transmission or
* there is no transmission at all (tx_enable). In that case, the end of burst message will be sent to the radio
*/
void phy_common::worker_end(void* tx_sem_id,
bool tx_enable,
srsran::rf_buffer_t& buffer,
srsran::rf_timestamp_t& tx_time,
bool is_nr)
void phy_common::worker_end(const worker_context_t& w_ctx, const bool& tx_enable, srsran::rf_buffer_t& buffer)
{
// Wait for the green light to transmit in the current TTI
semaphore.wait(tx_sem_id);
semaphore.wait(w_ctx.worker_ptr);
// If this is for NR, save Tx buffers...
if (is_nr) {
nr_tx_buffer = buffer;
nr_tx_buffer_ready = true;
// For combine buffer with previous buffers
if (tx_enable) {
tx_buffer.set_nof_samples(buffer.get_nof_samples());
tx_buffer.set_combine(buffer);
}
// If the current worker is not the last one, skip transmission
if (not w_ctx.last) {
// Release semaphore and let next worker to get in
semaphore.release();
return;
}
// ... otherwise, append NR base-band from saved buffer if available
if (nr_tx_buffer_ready) {
uint32_t j = 0;
for (uint32_t i = 0; i < SRSRAN_MAX_CHANNELS; i++) {
if (buffer.get(i) == nullptr) {
buffer.set(i, nr_tx_buffer.get(j));
j++;
}
}
nr_tx_buffer_ready = false;
}
// Add current time alignment
srsran::rf_timestamp_t tx_time = w_ctx.tx_time; // get transmit time from the last worker
// Use last buffer number of samples
tx_buffer.set_nof_samples(buffer.get_nof_samples());
// Run DL channel emulator if created
if (dl_channel) {
dl_channel->run(buffer.to_cf_t(), buffer.to_cf_t(), buffer.get_nof_samples(), tx_time.get(0));
dl_channel->run(tx_buffer.to_cf_t(), tx_buffer.to_cf_t(), tx_buffer.get_nof_samples(), tx_time.get(0));
}
// Always transmit on single radio
radio->tx(buffer, tx_time);
radio->tx(tx_buffer, tx_time);
// Allow next TTI to transmit
semaphore.release();

@ -172,20 +172,41 @@ void txrx::run_thread()
timestamp.get(0).frac_secs,
lte_worker->get_id());
lte_worker->set_time(tti, timestamp);
// Trigger prach worker execution
for (uint32_t cc = 0; cc < worker_com->get_nof_carriers_lte(); cc++) {
prach->new_tti(cc, tti, buffer.get(worker_com->get_rf_port(cc), 0, worker_com->get_nof_ports(0)));
}
// Launch NR worker only if available
// Set NR worker context and start
if (nr_worker != nullptr) {
nr_worker->set_time(tti, timestamp);
srsran::phy_common_interface::worker_context_t context;
context.sf_idx = tti;
context.worker_ptr = nr_worker;
context.last = (lte_worker == nullptr); // Set last if standalone
context.tx_time.copy(timestamp);
nr_worker->set_context(context);
// NR worker needs to be launched first, phy_common::worker_end expects first the NR worker and the LTE worker.
worker_com->semaphore.push(nr_worker);
nr_workers->start_worker(nr_worker);
}
// Set LTE worker context and start
if (lte_worker != nullptr) {
srsran::phy_common_interface::worker_context_t context;
context.sf_idx = tti;
context.worker_ptr = lte_worker;
context.last = true;
context.tx_time.copy(timestamp);
lte_worker->set_context(context);
// NR worker needs to be launched first, phy_common::worker_end expects first the NR worker and the LTE worker.
worker_com->semaphore.push(lte_worker);
lte_workers->start_worker(lte_worker);
}
// Trigger phy worker execution
worker_com->semaphore.push(lte_worker);
lte_workers->start_worker(lte_worker);

@ -42,8 +42,7 @@ public:
/* Functions used by main PHY thread */
cf_t* get_buffer(uint32_t cc_idx, uint32_t antenna_idx);
uint32_t get_buffer_len();
void set_tti(uint32_t tti);
void set_tx_time(const srsran::rf_timestamp_t& tx_time);
void set_context(const srsran::phy_common_interface::worker_context_t& w_ctx);
void set_prach(cf_t* prach_ptr, float prach_power);
void set_cfo_unlocked(const uint32_t& cc_idx, float cfo);
@ -89,8 +88,7 @@ private:
cf_t* prach_ptr = nullptr;
float prach_power = 0;
uint32_t tti = 0;
srsran::rf_timestamp_t tx_time = {};
srsran::phy_common_interface::worker_context_t context = {};
};
} // namespace lte

@ -39,8 +39,7 @@ public:
/* Functions used by main PHY thread */
cf_t* get_buffer(uint32_t cc_idx, uint32_t antenna_idx);
uint32_t get_buffer_len();
void set_tti(uint32_t tti);
void set_tx_time(const srsran::rf_timestamp_t& tx_time_);
void set_context(const srsran::phy_common_interface::worker_context_t& w_ctx);
int read_pdsch_d(cf_t* pdsch_d);
void start_plot();
@ -52,13 +51,14 @@ private:
std::vector<std::unique_ptr<cc_worker> > cc_workers;
srsran::phy_common_interface& common;
state& phy_state;
srslog::basic_logger& logger;
srsran::rf_timestamp_t tx_time = {};
uint32_t tti_rx = 0;
cf_t* prach_ptr = nullptr;
float prach_power = 0;
srsran::phy_common_interface& common;
state& phy_state;
srslog::basic_logger& logger;
srsran::rf_timestamp_t tx_time = {};
uint32_t tti_rx = 0;
cf_t* prach_ptr = nullptr;
float prach_power = 0;
srsran::phy_common_interface::worker_context_t context = {};
};
} // namespace nr

@ -130,11 +130,7 @@ public:
srsran_pdsch_ack_resource_t resource);
bool get_dl_pending_ack(srsran_ul_sf_cfg_t* sf, uint32_t cc_idx, srsran_pdsch_ack_cc_t* ack);
void worker_end(void* h,
bool tx_enable,
srsran::rf_buffer_t& buffer,
srsran::rf_timestamp_t& tx_time,
bool is_nr) override;
void worker_end(const worker_context_t& w_ctx, const bool& tx_enable, srsran::rf_buffer_t& buffer) override;
void set_cell(const srsran_cell_t& c);
@ -371,8 +367,8 @@ private:
bool is_mcch_subframe(srsran_mbsfn_cfg_t* cfg, uint32_t phy_tti);
// NR carriers buffering synchronization, LTE workers are in charge of transmitting
srsran::rf_buffer_t nr_tx_buffer;
bool nr_tx_buffer_ready = false;
bool tx_enabled = false;
srsran::rf_buffer_t tx_buffer = {};
};
} // namespace srsue

@ -103,20 +103,15 @@ uint32_t sf_worker::get_buffer_len()
return cc_workers.at(0)->get_buffer_len();
}
void sf_worker::set_tti(uint32_t tti_)
void sf_worker::set_context(const srsran::phy_common_interface::worker_context_t& w_ctx)
{
tti = tti_;
context.copy(w_ctx);
for (auto& cc_worker : cc_workers) {
cc_worker->set_tti(tti);
cc_worker->set_tti(w_ctx.sf_idx);
}
logger.set_context(tti);
}
void sf_worker::set_tx_time(const srsran::rf_timestamp_t& tx_time_)
{
tx_time.copy(tx_time_);
logger.set_context(w_ctx.sf_idx);
}
void sf_worker::set_prach(cf_t* prach_ptr_, float prach_power_)
@ -153,9 +148,10 @@ void sf_worker::set_config_unlocked(uint32_t cc_idx, const srsran::phy_cfg_t& ph
void sf_worker::work_imp()
{
uint32_t tti = context.sf_idx;
srsran::rf_buffer_t tx_signal_ptr = {};
if (!cell_initiated) {
phy->worker_end(this, false, tx_signal_ptr, tx_time, false);
phy->worker_end(context, false, tx_signal_ptr);
return;
}
@ -226,7 +222,7 @@ void sf_worker::work_imp()
}
// Call worker_end to transmit the signal
phy->worker_end(this, tx_signal_ready, tx_signal_ptr, tx_time, false);
phy->worker_end(context, tx_signal_ready, tx_signal_ptr);
if (rx_signal_ok) {
update_measurements();

@ -28,7 +28,9 @@ static int plot_worker_id = -1;
namespace srsue {
namespace nr {
sf_worker::sf_worker(srsran::phy_common_interface& common_, state& phy_state_, srslog::basic_logger& log) :
phy_state(phy_state_), common(common_), logger(log)
phy_state(phy_state_),
common(common_),
logger(log)
{
for (uint32_t i = 0; i < phy_state.args.nof_carriers; i++) {
cc_worker* w = new cc_worker(i, log, phy_state);
@ -59,18 +61,14 @@ uint32_t sf_worker::get_buffer_len()
return cc_workers.at(0)->get_buffer_len();
}
void sf_worker::set_tti(uint32_t tti)
void sf_worker::set_context(const srsran::phy_common_interface::worker_context_t& w_ctx)
{
tti_rx = tti;
logger.set_context(tti);
tti_rx = w_ctx.sf_idx;
logger.set_context(w_ctx.sf_idx);
for (auto& w : cc_workers) {
w->set_tti(tti);
w->set_tti(w_ctx.sf_idx);
}
}
void sf_worker::set_tx_time(const srsran::rf_timestamp_t& tx_time_)
{
tx_time.copy(tx_time_);
context.copy(w_ctx);
}
void sf_worker::work_imp()
@ -100,7 +98,7 @@ void sf_worker::work_imp()
0);
// Transmit NR PRACH
common.worker_end(this, true, tx_buffer, tx_time, true);
common.worker_end(context, true, tx_buffer);
// Reset PRACH pointer
prach_ptr = nullptr;
@ -120,7 +118,7 @@ void sf_worker::work_imp()
tx_buffer.set_nof_samples(SRSRAN_SF_LEN_PRB_NR(phy_state.cfg.carrier.nof_prb));
// Always call worker_end before returning
common.worker_end(this, true, tx_buffer, tx_time, true);
common.worker_end(context, true, tx_buffer);
// Tell the plotting thread to draw the plots
#ifdef ENABLE_GUI

@ -45,10 +45,10 @@ void phy_common::init(phy_args_t* _args,
stack_interface_phy_lte* _stack,
rsrp_insync_itf* _chest_loop)
{
radio_h = _radio;
stack = _stack;
args = _args;
insync_itf = _chest_loop;
radio_h = _radio;
stack = _stack;
args = _args;
insync_itf = _chest_loop;
sr.reset();
// Instantiate UL channel emulator
@ -531,48 +531,48 @@ bool phy_common::get_dl_pending_ack(srsran_ul_sf_cfg_t* sf, uint32_t cc_idx, srs
* Each worker uses this function to indicate that all processing is done and data is ready for transmission or
* there is no transmission at all (tx_enable). In that case, the end of burst message will be sent to the radio
*/
void phy_common::worker_end(void* tx_sem_id,
bool tx_enable,
srsran::rf_buffer_t& buffer,
srsran::rf_timestamp_t& tx_time,
bool is_nr)
void phy_common::worker_end(const worker_context_t& w_ctx, const bool& tx_enable, srsran::rf_buffer_t& buffer)
{
// Wait for the green light to transmit in the current TTI
semaphore.wait(tx_sem_id);
semaphore.wait(w_ctx.worker_ptr);
// If this is for NR, save Tx buffers...
if (is_nr) {
nr_tx_buffer = buffer;
nr_tx_buffer_ready = true;
semaphore.release();
return;
// For each channel set or combine baseband
if (tx_enable) {
tx_buffer.set_combine(buffer);
}
// ... otherwise, append NR base-band from saved buffer if available
if (nr_tx_buffer_ready) {
// Load NR carrier base-band
for (uint32_t i = 0; i < args->nof_nr_carriers * args->nof_rx_ant; i++) {
uint32_t channel_idx = args->nof_lte_carriers * args->nof_rx_ant + i;
buffer.set(channel_idx, nr_tx_buffer.get(i));
}
// Remove NR buffer flag
nr_tx_buffer_ready = false;
// Flag transmit enabled
tx_enabled = true;
// Make sure it transmits in this TTI
tx_enable = true;
// If the current worker is not the last one, skip transmission
if (not w_ctx.last) {
// Release semaphore and let next worker to get in
semaphore.release();
return;
}
// Add Time Alignment
// Add current time alignment
srsran::rf_timestamp_t tx_time = w_ctx.tx_time; // get transmit time from the last worker
tx_time.sub((double)ta.get_sec());
// For each radio, transmit
if (tx_enable) {
// Check if any worker had a transmission
if (tx_enabled) {
// Set number of samples to the latest transmit buffer
tx_buffer.set_nof_samples(buffer.get_nof_samples());
// Run uplink channel emulator
if (ul_channel) {
ul_channel->run(buffer.to_cf_t(), buffer.to_cf_t(), buffer.get_nof_samples(), tx_time.get(0));
ul_channel->run(tx_buffer.to_cf_t(), tx_buffer.to_cf_t(), tx_buffer.get_nof_samples(), tx_time.get(0));
}
radio_h->tx(buffer, tx_time);
// Actual baseband transmission
radio_h->tx(tx_buffer, tx_time);
// Reset tx buffer
tx_enabled = false;
for (uint32_t ch = 0; ch < SRSRAN_MAX_CHANNELS; ch++) {
tx_buffer.set(ch, nullptr);
}
} else {
if (radio_h->is_continuous_tx()) {
if (is_pending_tx_end) {

@ -519,11 +519,19 @@ void sync::run_camping_in_sync_state(lte::sf_worker* lte_worker,
worker_com->update_cfo_measurement(cc, cfo);
}
lte_worker->set_tti(tti);
// Compute TX time: Any transmission happens in TTI+4 thus advance 4 ms the reception time
last_rx_time.add(FDD_HARQ_DELAY_DL_MS * 1e-3);
lte_worker->set_tx_time(last_rx_time);
// Set LTE worker context
if (lte_worker != nullptr) {
srsran::phy_common_interface::worker_context_t context;
context.sf_idx = tti;
context.worker_ptr = lte_worker;
context.last = true;
context.tx_time.copy(last_rx_time);
lte_worker->set_context(context);
}
// Advance/reset prach subframe pointer
if (prach_ptr) {
@ -534,17 +542,35 @@ void sync::run_camping_in_sync_state(lte::sf_worker* lte_worker,
}
}
// Start NR worker only if present
// Set NR worker context and start
if (nr_worker != nullptr) {
srsran::phy_common_interface::worker_context_t context;
context.sf_idx = tti;
context.worker_ptr = nr_worker;
context.last = (lte_worker == nullptr); // Set last if standalone
context.tx_time.copy(last_rx_time);
nr_worker->set_context(context);
// NR worker needs to be launched first, phy_common::worker_end expects first the NR worker and the LTE worker.
nr_worker->set_tti(tti);
worker_com->semaphore.push(nr_worker);
nr_worker_pool->start_worker(nr_worker);
}
// Start LTE worker
worker_com->semaphore.push(lte_worker);
lte_worker_pool->start_worker(lte_worker);
// Set LTE worker context and start
if (lte_worker != nullptr) {
srsran::phy_common_interface::worker_context_t context;
context.sf_idx = tti;
context.worker_ptr = lte_worker;
context.last = true;
context.tx_time.copy(last_rx_time);
lte_worker->set_context(context);
// NR worker needs to be launched first, phy_common::worker_end expects first the NR worker and the LTE worker.
worker_com->semaphore.push(lte_worker);
lte_worker_pool->start_worker(lte_worker);
}
}
void sync::run_camping_state()
{
@ -1042,7 +1068,7 @@ void sync::set_inter_frequency_measurement(uint32_t cc_idx, uint32_t earfcn_, sr
void sync::set_cells_to_meas(uint32_t earfcn_, const std::set<uint32_t>& pci)
{
std::lock_guard<std::mutex> lock(intra_freq_cfg_mutex);
bool found = false;
bool found = false;
for (size_t i = 0; i < intra_freq_meas.size() and not found; i++) {
if (earfcn_ == intra_freq_meas[i]->get_earfcn()) {
intra_freq_meas[i]->set_cells_to_meas(pci);

@ -146,7 +146,9 @@ public:
uint32_t nof_channels = 1;
args_t(double srate_hz_, uint32_t buffer_sz_ms_, uint32_t nof_channels_) :
srate_hz(srate_hz_), buffer_sz_ms(buffer_sz_ms_), nof_channels(nof_channels_)
srate_hz(srate_hz_),
buffer_sz_ms(buffer_sz_ms_),
nof_channels(nof_channels_)
{}
};
@ -179,21 +181,20 @@ public:
void push_semaphore(void* worker_ptr) { semaphore.push(worker_ptr); }
void
worker_end(void* h, bool tx_enable, srsran::rf_buffer_t& buffer, srsran::rf_timestamp_t& tx_time, bool is_nr) override
void worker_end(const worker_context_t& w_ctx, const bool& tx_enable, srsran::rf_buffer_t& buffer) override
{
// Synchronize worker
semaphore.wait(h);
semaphore.wait(w_ctx.worker_ptr);
// Protect internal buffers and states
std::unique_lock<std::mutex> lock(ringbuffers_mutex);
uint64_t tx_ts = srsran_timestamp_uint64(&tx_time.get(0), srate_hz);
uint64_t tx_ts = srsran_timestamp_uint64(&w_ctx.tx_time.get(0), srate_hz);
// Check transmit timestamp is not in the past
if (tx_ts < write_ts) {
logger.error("Tx time (%f) is %d samples in the past",
srsran_timestamp_real(tx_time.get_ptr(0)),
srsran_timestamp_real(&w_ctx.tx_time.get(0)),
(uint32_t)(write_ts - tx_ts));
semaphore.release();
return;

@ -138,7 +138,14 @@ public:
// Set gNb time
gnb_time.add(TX_ENB_DELAY * 1e-3);
gnb_worker->set_time(slot_idx, gnb_time);
// Set gnb context
srsran::phy_common_interface::worker_context_t gnb_context;
gnb_context.sf_idx = slot_idx;
gnb_context.worker_ptr = gnb_worker;
gnb_context.last = true; // Set last if standalone
gnb_context.tx_time.copy(gnb_time);
gnb_worker->set_context(gnb_context);
// Start gNb work
gnb_phy_com.push_semaphore(gnb_worker);
@ -158,8 +165,14 @@ public:
// Set UE time
ue_time.add(TX_ENB_DELAY * 1e-3);
ue_worker->set_tti(slot_idx);
ue_worker->set_tx_time(ue_time);
// Set gnb context
srsran::phy_common_interface::worker_context_t ue_context;
ue_context.sf_idx = slot_idx;
ue_context.worker_ptr = ue_worker;
ue_context.last = true; // Set last if standalone
ue_context.tx_time.copy(gnb_time);
ue_worker->set_context(ue_context);
// Run UE stack
ue_stack.run_tti(slot_idx);

Loading…
Cancel
Save