SRSUE: fix phy workers concurrency issue

master
Xavier Arteaga 5 years ago committed by Xavier Arteaga
parent 173defd676
commit 384c420c7c

@ -0,0 +1,90 @@
/*
* Copyright 2013-2019 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include <assert.h>
#include <condition_variable>
#include <deque>
#include <inttypes.h>
#include <mutex>
#ifndef SRSLTE_TTI_SEMPAHORE_H_
#define SRSLTE_TTI_SEMPAHORE_H_
namespace srslte {
template <class T>
class tti_semaphore
{
private:
const uint32_t max_timeout_ms = 60000; // 1 minute
std::mutex mutex;
std::condition_variable cvar;
std::deque<T> fifo;
public:
void reset()
{
// Do something here
}
void wait(T id)
{
bool expired = false;
std::unique_lock<std::mutex> lock(mutex);
std::chrono::system_clock::time_point expire_time = std::chrono::system_clock::now();
expire_time += std::chrono::milliseconds(max_timeout_ms);
while (fifo.front() != id && !expired) {
expired = (cvar.wait_until(lock, expire_time) == std::cv_status::timeout);
}
assert(!expired);
}
void push(T id)
{
std::unique_lock<std::mutex> lock(mutex);
fifo.push_back(id);
}
void release(T id)
{
std::unique_lock<std::mutex> lock(mutex);
fifo.pop_front();
cvar.notify_all();
}
void wait_all()
{
bool expired = false;
std::unique_lock<std::mutex> lock(mutex);
std::chrono::system_clock::time_point expire_time = std::chrono::system_clock::now();
expire_time += std::chrono::milliseconds(max_timeout_ms);
while (!fifo.empty() && !expired) {
expired = (cvar.wait_until(lock, expire_time) == std::cv_status::timeout);
}
assert(!expired);
}
};
} // namespace srslte
#endif // SRSLTE_TTI_SEMPAHORE_H_

@ -43,8 +43,8 @@ typedef _Complex float cf_t;
class phy : public ue_lte_phy_base, public thread
{
public:
phy(srslte::logger* logger_) : logger(logger_), workers_pool(MAX_WORKERS), common(MAX_WORKERS), thread("PHY"){};
~phy() { stop(); }
explicit phy(srslte::logger* logger_) : logger(logger_), workers_pool(MAX_WORKERS), common(), thread("PHY"){};
~phy() final { stop(); }
// Init defined in base class
int init(const phy_args_t& args_) final;

@ -25,13 +25,13 @@
#include "phy_metrics.h"
#include "srslte/common/gen_mch_tables.h"
#include "srslte/common/log.h"
#include "srslte/common/tti_sempahore.h"
#include "srslte/interfaces/common_interfaces.h"
#include "srslte/interfaces/ue_interfaces.h"
#include "srslte/radio/radio.h"
#include "srslte/srslte.h"
#include <condition_variable>
#include <mutex>
#include <semaphore.h>
#include <string.h>
#include <vector>
@ -85,7 +85,9 @@ public:
// Save last TBS for DL (Format1C)
int last_dl_tbs[SRSLTE_MAX_HARQ_PROC][SRSLTE_MAX_CARRIERS][SRSLTE_MAX_CODEWORDS] = {};
explicit phy_common(uint32_t max_workers);
srslte::tti_semaphore<void*> semaphore;
phy_common();
~phy_common();
@ -131,7 +133,7 @@ public:
srslte_pdsch_ack_resource_t resource);
bool get_dl_pending_ack(srslte_ul_sf_cfg_t* sf, uint32_t cc_idx, srslte_pdsch_ack_cc_t* ack);
void worker_end(uint32_t tti,
void worker_end(void* h,
bool tx_enable,
cf_t* buffer[SRSLTE_MAX_RADIOS][SRSLTE_MAX_PORTS],
uint32_t nof_samples[SRSLTE_MAX_RADIOS],
@ -169,9 +171,7 @@ private:
std::mutex mtch_mutex;
std::condition_variable mtch_cvar;
std::vector<sem_t> tx_sem;
uint32_t nof_workers = 0;
uint32_t max_workers = 0;
bool is_pending_tx_end = false;
@ -223,8 +223,6 @@ private:
} pending_dl_grant_t;
pending_dl_grant_t pending_dl_grant[FDD_HARQ_DELAY_MS][SRSLTE_MAX_CARRIERS] = {};
bool is_first_tx = true;
srslte_cell_t cell = {};
dl_metrics_t dl_metrics[SRSLTE_MAX_CARRIERS] = {};

@ -53,7 +53,7 @@ public:
/* Functions used by main PHY thread */
cf_t* get_buffer(uint32_t cc_idx, uint32_t antenna_idx);
void set_tti(uint32_t tti, uint32_t tx_worker_cnt);
void set_tti(uint32_t tti);
void set_tx_time(uint32_t radio_idx, srslte_timestamp_t tx_time, int next_offset);
void set_prach(cf_t* prach_ptr, float prach_power);
void set_cfo(const uint32_t& cc_idx, float cfo);
@ -109,7 +109,6 @@ private:
float prach_power = 0;
uint32_t tti = 0;
uint32_t tx_sem_id = 0;
srslte_timestamp_t tx_time[SRSLTE_MAX_RADIOS] = {};
int next_offset[SRSLTE_MAX_RADIOS] = {};

@ -352,7 +352,6 @@ private:
srslte_timestamp_t radio_ts = {};
std::array<uint8_t, SRSLTE_BCH_PAYLOAD_LEN> mib;
uint32_t tx_worker_cnt = 0;
uint32_t nof_workers = 0;
float ul_dl_factor = NAN;

@ -45,25 +45,14 @@ namespace srsue {
static cf_t zeros[50000] = {};
static cf_t* zeros_multi[SRSLTE_MAX_PORTS] = {zeros, zeros, zeros, zeros};
phy_common::phy_common(uint32_t max_workers_) : tx_sem(max_workers_)
phy_common::phy_common()
{
max_workers = max_workers_;
for (uint32_t i = 0; i < max_workers; i++) {
sem_init(&tx_sem[i], 0, 0); // All semaphores start blocked
}
reset();
}
phy_common::~phy_common()
{
for (uint32_t i = 0; i < max_workers; i++) {
sem_post(&tx_sem[i]);
}
for (uint32_t i = 0; i < max_workers; i++) {
sem_destroy(&tx_sem[i]);
}
}
void phy_common::set_nof_workers(uint32_t nof_workers_)
@ -80,7 +69,6 @@ void phy_common::init(phy_args_t* _args,
radio_h = _radio;
stack = _stack;
args = _args;
is_first_tx = true;
sr_last_tx_tti = -1;
// Instantiate UL channel emulator
@ -533,23 +521,14 @@ bool phy_common::get_dl_pending_ack(srslte_ul_sf_cfg_t* sf, uint32_t cc_idx, srs
* Each worker uses this function to indicate that all processing is done and data is ready for transmission or
* there is no transmission at all (tx_enable). In that case, the end of burst message will be sent to the radio
*/
void phy_common::worker_end(uint32_t tti,
void phy_common::worker_end(void* tx_sem_id,
bool tx_enable,
cf_t* buffer[SRSLTE_MAX_RADIOS][SRSLTE_MAX_PORTS],
uint32_t nof_samples[SRSLTE_MAX_RADIOS],
srslte_timestamp_t tx_time[SRSLTE_MAX_RADIOS])
{
// This variable is not protected but it is very unlikely that 2 threads arrive here simultaneously since at the
// beginning there is no workload and threads are separated by 1 ms
if (is_first_tx) {
is_first_tx = false;
// Allow my own transmission if I'm the first to transmit
sem_post(&tx_sem[tti % nof_workers]);
}
// Wait for the green light to transmit in the current TTI
sem_wait(&tx_sem[tti % nof_workers]);
semaphore.wait(tx_sem_id);
// For each radio, transmit
for (uint32_t i = 0; i < args->nof_radios; i++) {
@ -585,7 +564,7 @@ void phy_common::worker_end(uint32_t tti,
}
// Allow next TTI to transmit
sem_post(&tx_sem[(tti + 1) % nof_workers]);
semaphore.release(tx_sem_id);
}
void phy_common::set_cell(const srslte_cell_t& c)
@ -670,7 +649,7 @@ void phy_common::get_sync_metrics(sync_metrics_t m[SRSLTE_MAX_CARRIERS])
void phy_common::reset_radio()
{
is_first_tx = true;
semaphore.reset();
// End Tx streams even if they are continuous
// Since is_first_of_burst is set to true, the radio need to send

@ -115,7 +115,7 @@ cf_t* sf_worker::get_buffer(uint32_t carrier_idx, uint32_t antenna_idx)
return cc_workers[carrier_idx]->get_rx_buffer(antenna_idx);
}
void sf_worker::set_tti(uint32_t tti_, uint32_t tx_worker_cnt)
void sf_worker::set_tti(uint32_t tti_)
{
tti = tti_;
@ -123,7 +123,6 @@ void sf_worker::set_tti(uint32_t tti_, uint32_t tx_worker_cnt)
cc_worker->set_tti(tti);
}
tx_sem_id = tx_worker_cnt;
log_h->step(tti);
if (log_phy_lib_h) {
@ -258,7 +257,7 @@ void sf_worker::work_imp()
}
// Call worker_end to transmit the signal
phy->worker_end(tx_sem_id, tx_signal_ready, tx_signal_ptr, nof_samples, tx_time);
phy->worker_end(this, tx_signal_ready, tx_signal_ptr, nof_samples, tx_time);
if (rx_signal_ok) {
update_measurements();

@ -135,6 +135,7 @@ sync::~sync()
void sync::stop()
{
worker_com->semaphore.wait_all();
intra_freq_meas.stop();
running = false;
wait_thread_finish();
@ -146,11 +147,10 @@ void sync::reset()
radio_overflow_return = false;
in_sync_cnt = 0;
out_of_sync_cnt = 0;
tx_worker_cnt = 0;
time_adv_sec = 0;
next_offset = 0;
srate_mode = SRATE_NONE;
ZERO_OBJECT(next_radio_offset);
srate_mode = SRATE_NONE;
current_earfcn = -1;
sfn_p.reset();
search_p.reset();
@ -535,7 +535,7 @@ void sync::run_thread()
worker->set_cfo(cc, cfo);
}
worker->set_tti(tti, tx_worker_cnt);
worker->set_tti(tti);
worker->set_tx_time(0, tx_time, next_radio_offset[0] + next_offset);
next_offset = 0;
ZERO_OBJECT(next_radio_offset);
@ -544,7 +544,6 @@ void sync::run_thread()
if (next_time_adv_sec != time_adv_sec) {
time_adv_sec = next_time_adv_sec;
}
tx_worker_cnt = (tx_worker_cnt + 1) % nof_workers;
// Advance/reset prach subframe pointer
if (prach_ptr) {
@ -558,6 +557,7 @@ void sync::run_thread()
is_end_of_burst = true;
// Start worker
worker_com->semaphore.push(worker);
workers_pool->start_worker(worker);
// Save signal for Intra-frequency measurement

@ -45,6 +45,12 @@ target_link_libraries(phy_worker_test
${Boost_LIBRARIES})
add_test(phy_worker_test phy_worker_test)
add_executable(phy_concurrency_test phy_concurrency_test.cc)
target_link_libraries(phy_concurrency_test
srsue_phy
srslte_common)
add_test(phy_concurrency_test phy_concurrency_test)
add_executable(scell_search_test scell_search_test.cc)
target_link_libraries(scell_search_test

@ -0,0 +1,181 @@
/*
* Copyright 2013-2019 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include <srslte/common/thread_pool.h>
#include <srslte/common/tti_sempahore.h>
#include <srslte/phy/utils/random.h>
#include <srslte/srslte.h>
#include <srsue/hdr/phy/phy.h>
class dummy_radio
{
private:
static const int radio_delay_us = 200;
srslte::log_filter* log_h = nullptr;
std::mutex mutex;
uint32_t last_tti = 0;
bool first = true;
bool late = false;
public:
dummy_radio(srslte::log_filter& log_h_) : log_h(&log_h_) { log_h->info("Dummy radio created\n"); }
void tx(uint32_t tti)
{
std::lock_guard<std::mutex> lock(mutex);
log_h->info("Transmitting TTI %d\n", tti);
// Exit if TTI was advanced
if (!first && tti <= last_tti) {
late = true;
}
// Save TTI
last_tti = tti;
first = false;
// Simulate radio delay
usleep(radio_delay_us);
}
bool is_late() { return late; }
};
class dummy_worker : public srslte::thread_pool::worker
{
private:
static const int sleep_time_min_us = 50;
static const int sleep_time_max_us = 2000;
srslte::tti_semaphore<uint32_t>* tti_semaphore = nullptr;
srslte::log_filter* log_h = nullptr;
dummy_radio* radio = nullptr;
srslte_random_t random_gen = nullptr;
uint32_t tti = 0;
public:
dummy_worker(uint32_t id,
srslte::tti_semaphore<uint32_t>* tti_semaphore_,
srslte::log_filter* log_h_,
dummy_radio* radio_)
{
tti_semaphore = tti_semaphore_;
log_h = log_h_;
radio = radio_;
random_gen = srslte_random_init(id);
log_h->info("Dummy worker created\n");
}
~dummy_worker() { srslte_random_free(random_gen); }
void set_tti(uint32_t tti_) { tti = tti_; }
protected:
void work_imp() override
{
// Choose a random time to work
int sleep_time_us = srslte_random_uniform_int_dist(random_gen, sleep_time_min_us, sleep_time_max_us);
// Inform
// Actual work ;)
log_h->info("Start working for %d us.\n", sleep_time_us);
usleep(sleep_time_us);
log_h->info("Stopped working\n");
// Wait for green light
tti_semaphore->wait(tti);
// Simulate radio delay
radio->tx(tti);
// Release semaphore
tti_semaphore->release(tti);
}
};
int main(int argc, char** argv)
{
int ret = SRSLTE_SUCCESS;
// Simulation Constants
const uint32_t nof_workers = FDD_HARQ_DELAY_MS;
const uint32_t nof_tti = 10240;
const float enable_probability = 0.9f;
srslte_random_t random_gen = srslte_random_init(1234);
// Pools and workers
srslte::thread_pool pool(nof_workers);
std::vector<std::unique_ptr<dummy_worker> > workers;
srslte::tti_semaphore<uint32_t> tti_semaphore;
// Loggers
srslte::logger_stdout logger;
srslte::log_filter radio_log("radio", &logger);
std::vector<std::unique_ptr<srslte::log_filter> > worker_logs;
radio_log.set_level("info");
// Radio
dummy_radio radio(radio_log);
// Create workers
for (uint32_t i = 0; i < nof_workers; i++) {
// Create logging name
char log_name[32] = {};
snprintf(log_name, sizeof(log_name), "PHY%d", i);
// Create log filter
srslte::log_filter* log_filter = new srslte::log_filter(log_name, &logger);
log_filter->set_level("info");
// Create worker
auto* worker = new dummy_worker(i, &tti_semaphore, log_filter, &radio);
// Push back objects
worker_logs.push_back(std::unique_ptr<srslte::log_filter>(log_filter));
workers.push_back(std::unique_ptr<dummy_worker>(worker));
// Init worker in pool
pool.init_worker(i, worker);
}
for (uint32_t tti = 0; tti < nof_tti && radio.is_late(); tti++) {
if (enable_probability > srslte_random_uniform_real_dist(random_gen, 0.0f, 1.0f)) {
// Wait worker
auto worker = (dummy_worker*)pool.wait_worker(tti);
// Set tti
worker->set_tti(tti);
// Launch
tti_semaphore.push(tti);
pool.start_worker(worker);
}
}
tti_semaphore.wait_all();
pool.stop();
srslte_random_free(random_gen);
return ret;
}

@ -378,7 +378,7 @@ int main(int argc, char** argv)
srsue::scell::intra_measure intra_measure;
srslte::log_filter logger("intra_measure");
dummy_rrc rrc;
srsue::phy_common common(1);
srsue::phy_common common;
// Simulation only
std::vector<std::unique_ptr<test_enb> > test_enb_v;

Loading…
Cancel
Save