sched,nr: fix sched_nr_test. Resolve data race issues. Created new config structs used across the scheduler.

master
Francisco 3 years ago committed by Francisco Paisana
parent 407da794e1
commit 35a236b1b9

@ -16,7 +16,6 @@
#include "sched_nr_common.h"
#include "sched_nr_interface.h"
#include "sched_nr_ue.h"
#include "sched_nr_worker.h"
#include "srsran/adt/pool/cached_alloc.h"
#include "srsran/common/tti_point.h"
#include <array>
@ -26,13 +25,18 @@ extern "C" {
namespace srsenb {
namespace sched_nr_impl {
class sched_worker_manager;
}
class ue_event_manager;
class sched_nr final : public sched_nr_interface
{
public:
sched_nr(const sched_nr_cfg& cfg);
explicit sched_nr(const sched_nr_cfg& sched_cfg);
~sched_nr() override;
int cell_cfg(const std::vector<sched_nr_cell_cfg>& cell_list);
void ue_cfg(uint16_t rnti, const sched_nr_ue_cfg& cfg) override;
void new_tti(tti_point tti_rx) override;
@ -45,10 +49,11 @@ private:
void ue_cfg_impl(uint16_t rnti, const sched_nr_ue_cfg& cfg);
void run_tti(tti_point tti_rx, uint32_t cc);
sched_nr_cfg cfg;
// args
sched_nr_impl::sched_params cfg;
using sched_worker_manager = sched_nr_impl::sched_worker_manager;
sched_worker_manager sched_workers;
std::unique_ptr<sched_worker_manager> sched_workers;
std::array<std::array<sched_nr_res_t, SCHED_NR_MAX_CARRIERS>, SCHED_NR_NOF_SUBFRAMES> sched_results;

@ -25,6 +25,23 @@ const static size_t SCHED_NR_NOF_HARQS = 16;
namespace sched_nr_impl {
struct sched_cell_params {
const uint32_t cc;
const sched_nr_cell_cfg cell_cfg;
const sched_nr_cfg& sched_cfg;
sched_cell_params(uint32_t cc_, const sched_nr_cell_cfg& cell, const sched_nr_cfg& sched_cfg_) :
cc(cc_), cell_cfg(cell), sched_cfg(sched_cfg_)
{}
};
struct sched_params {
const sched_nr_cfg sched_cfg;
std::vector<sched_cell_params> cells;
explicit sched_params(const sched_nr_cfg& sched_cfg_) : sched_cfg(sched_cfg_) {}
};
using rbgmask_t = srsran::bounded_bitset<SCHED_NR_MAX_NOF_RBGS, true>;
} // namespace sched_nr_impl

@ -25,14 +25,15 @@ const static size_t SCHED_NR_MAX_PUSCH_DATA = 16;
const static size_t SCHED_NR_MAX_TB = 2;
struct sched_nr_cell_cfg {
uint32_t nof_prb;
uint32_t nof_rbg;
uint32_t nof_prb = 100;
uint32_t nof_rbg = 25;
uint32_t K0 = 0;
uint32_t K1 = 4;
uint32_t K2 = 4;
};
struct sched_nr_cfg {
uint32_t nof_concurrent_subframes = 1;
srsran::bounded_vector<sched_nr_cell_cfg, SCHED_NR_MAX_CARRIERS> cells;
};
struct sched_nr_ue_cc_cfg {

@ -22,7 +22,7 @@ namespace sched_nr_impl {
class slot_grid
{
public:
explicit slot_grid(uint32_t cc, const sched_nr_cfg& cfg_);
explicit slot_grid(const sched_cell_params& cfg_);
void new_tti(tti_point tti_rx_, sched_nr_res_t& sched_res_);
bool alloc_pdsch(slot_ue& ue, const rbgmask_t& dl_mask);
bool alloc_pusch(slot_ue& ue, const rbgmask_t& dl_mask);
@ -30,13 +30,11 @@ public:
void generate_dcis();
tti_point tti_tx_dl() const { return tti_rx + TX_ENB_DELAY; }
tti_point tti_tx_ul() const { return tti_tx_dl() + K2; }
tti_point tti_tx_ul() const { return tti_tx_dl() + cfg.cell_cfg.K2; }
private:
static const size_t K0 = 0, K1 = 4, K2 = 4;
const uint32_t cc;
const sched_nr_cfg& cfg;
const sched_cell_params& cfg;
private:
tti_point tti_rx;
rbgmask_t pdsch_mask;
rbgmask_t pusch_mask;

@ -27,10 +27,10 @@
namespace srsenb {
namespace sched_nr_impl {
class carrier_slot_worker
class slot_cc_worker
{
public:
explicit carrier_slot_worker(uint32_t cc_, const sched_nr_cfg& cfg_) : cc(cc_), cfg(cfg_), res_grid(cc, cfg) {}
explicit slot_cc_worker(const sched_cell_params& cell_params) : cfg(cell_params), res_grid(cfg) {}
void start(tti_point tti_rx_, sched_nr_res_t& bwp_result, ue_map_t& ue_db_);
void run();
@ -41,8 +41,7 @@ private:
void alloc_dl_ues();
void alloc_ul_ues();
const uint32_t cc;
const sched_nr_cfg& cfg;
const sched_cell_params& cfg;
tti_point tti_rx;
slot_grid res_grid;
@ -53,7 +52,7 @@ private:
class sched_worker_manager
{
public:
explicit sched_worker_manager(ue_map_t& ue_db_, const sched_nr_cfg& cfg_);
explicit sched_worker_manager(ue_map_t& ue_db_, const sched_params& cfg_);
sched_worker_manager(const sched_worker_manager&) = delete;
sched_worker_manager(sched_worker_manager&&) = delete;
~sched_worker_manager();
@ -64,15 +63,15 @@ public:
void end_tti(tti_point tti_rx);
private:
const sched_nr_cfg& cfg;
const sched_params& cfg;
ue_map_t& ue_db;
struct slot_worker_ctxt {
sem_t sf_sem;
sem_t sf_sem; // lock of all workers of the same slot. unlocked by last slot_cc_worker
tti_point tti_rx;
srsran::span<sched_nr_res_t> sf_result;
int worker_count = 0;
std::vector<carrier_slot_worker> workers;
std::atomic<int> worker_count{0}; // variable shared across slot_cc_workers
std::vector<slot_cc_worker> workers;
};
std::vector<std::unique_ptr<slot_worker_ctxt> > slot_ctxts;

@ -11,6 +11,7 @@
*/
#include "srsenb/hdr/stack/mac/nr/sched_nr.h"
#include "srsenb/hdr/stack/mac/nr/sched_nr_worker.h"
#include "srsran/common/thread_pool.h"
namespace srsenb {
@ -77,12 +78,21 @@ private:
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
sched_nr::sched_nr(const sched_nr_cfg& cfg_) :
cfg(cfg_), pending_events(new ue_event_manager(ue_db)), sched_workers(ue_db, cfg)
{}
sched_nr::sched_nr(const sched_nr_cfg& sched_cfg) : cfg(sched_cfg), pending_events(new ue_event_manager(ue_db)) {}
sched_nr::~sched_nr() {}
int sched_nr::cell_cfg(const std::vector<sched_nr_cell_cfg>& cell_list)
{
cfg.cells.reserve(cell_list.size());
for (uint32_t cc = 0; cc < cell_list.size(); ++cc) {
cfg.cells.emplace_back(cc, cell_list[cc], cfg.sched_cfg);
}
sched_workers.reset(new sched_nr_impl::sched_worker_manager(ue_db, cfg));
return SRSRAN_SUCCESS;
}
void sched_nr::ue_cfg(uint16_t rnti, const sched_nr_ue_cfg& uecfg)
{
pending_events->push_event([this, rnti, uecfg]() { ue_cfg_impl(rnti, uecfg); });
@ -100,7 +110,7 @@ void sched_nr::ue_cfg_impl(uint16_t rnti, const sched_nr_ue_cfg& uecfg)
void sched_nr::new_tti(tti_point tti_rx)
{
// Lock slot workers for provided tti_rx
sched_workers.reserve_workers(tti_rx, sched_results[tti_rx.sf_idx()]);
sched_workers->reserve_workers(tti_rx, sched_results[tti_rx.sf_idx()]);
{
// synchronize {tti,cc} state. e.g. reserve UE resources for {tti,cc} decision, process feedback
@ -108,7 +118,7 @@ void sched_nr::new_tti(tti_point tti_rx)
// Process pending events
pending_events->new_tti();
sched_workers.start_tti(tti_rx);
sched_workers->start_tti(tti_rx);
}
}
@ -116,12 +126,12 @@ void sched_nr::new_tti(tti_point tti_rx)
int sched_nr::generate_sched_result(tti_point tti_rx, uint32_t cc, sched_nr_res_t& result)
{
// unlocked, parallel region
bool all_workers_finished = sched_workers.run_tti(tti_rx, cc, result);
bool all_workers_finished = sched_workers->run_tti(tti_rx, cc, result);
if (all_workers_finished) {
// once all workers of the same subframe finished, synchronize sched outcome with ue_db
std::lock_guard<std::mutex> lock(ue_db_mutex);
sched_workers.end_tti(tti_rx);
sched_workers->end_tti(tti_rx);
}
return SRSRAN_SUCCESS;

@ -15,7 +15,9 @@
namespace srsenb {
namespace sched_nr_impl {
slot_grid::slot_grid(uint32_t cc_, const sched_nr_cfg& cfg_) : cc(cc_), cfg(cfg_) {}
slot_grid::slot_grid(const sched_cell_params& cfg_) :
cfg(cfg_), pdsch_mask(cfg.cell_cfg.nof_rbg), pusch_mask(cfg.cell_cfg.nof_rbg)
{}
void slot_grid::new_tti(tti_point tti_rx_, sched_nr_res_t& sched_res_)
{
@ -39,7 +41,7 @@ bool slot_grid::alloc_pdsch(slot_ue& ue, const rbgmask_t& dl_mask)
if (sched_res->dl_res.data.full()) {
return false;
}
if (not ue.h_dl->new_tx(tti_tx_dl(), dl_mask, mcs, K1)) {
if (not ue.h_dl->new_tx(tti_tx_dl(), dl_mask, mcs, cfg.cell_cfg.K1)) {
return false;
}

@ -16,7 +16,7 @@ namespace srsenb {
namespace sched_nr_impl {
/// Called at the beginning of TTI in a locked context, to reserve available UE resources
void carrier_slot_worker::start(tti_point tti_rx_, sched_nr_res_t& bwp_result_, ue_map_t& ue_db)
void slot_cc_worker::start(tti_point tti_rx_, sched_nr_res_t& bwp_result_, ue_map_t& ue_db)
{
srsran_assert(not running(), "scheduler worker::start() called for active worker");
// Try reserve UE cells for this worker
@ -24,7 +24,7 @@ void carrier_slot_worker::start(tti_point tti_rx_, sched_nr_res_t& bwp_result_,
uint16_t rnti = ue_pair.first;
ue& u = *ue_pair.second;
slot_ues.insert(rnti, u.try_reserve(tti_rx, cc));
slot_ues.insert(rnti, u.try_reserve(tti_rx, cfg.cc));
if (slot_ues[rnti].empty()) {
// Failed to synchronize because UE is being used by another worker
slot_ues.erase(rnti);
@ -37,7 +37,7 @@ void carrier_slot_worker::start(tti_point tti_rx_, sched_nr_res_t& bwp_result_,
tti_rx = tti_rx_;
}
void carrier_slot_worker::run()
void slot_cc_worker::run()
{
srsran_assert(running(), "scheduler worker::run() called for non-active worker");
@ -54,7 +54,7 @@ void carrier_slot_worker::run()
res_grid.generate_dcis();
}
void carrier_slot_worker::end_tti()
void slot_cc_worker::end_tti()
{
srsran_assert(running(), "scheduler worker::end() called for non-active worker");
@ -64,7 +64,7 @@ void carrier_slot_worker::end_tti()
tti_rx = {};
}
void carrier_slot_worker::alloc_dl_ues()
void slot_cc_worker::alloc_dl_ues()
{
if (slot_ues.empty()) {
return;
@ -74,11 +74,11 @@ void carrier_slot_worker::alloc_dl_ues()
return;
}
rbgmask_t dlmask(cfg.cells[cc].nof_rbg);
rbgmask_t dlmask(cfg.cell_cfg.nof_rbg);
dlmask.fill(0, dlmask.size(), true);
res_grid.alloc_pdsch(ue, dlmask);
}
void carrier_slot_worker::alloc_ul_ues()
void slot_cc_worker::alloc_ul_ues()
{
if (slot_ues.empty()) {
return;
@ -88,23 +88,23 @@ void carrier_slot_worker::alloc_ul_ues()
return;
}
rbgmask_t ulmask(cfg.cells[cc].nof_rbg);
rbgmask_t ulmask(cfg.cell_cfg.nof_rbg);
ulmask.fill(0, ulmask.size(), true);
res_grid.alloc_pusch(ue, ulmask);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
sched_worker_manager::sched_worker_manager(ue_map_t& ue_db_, const sched_nr_cfg& cfg_) : cfg(cfg_), ue_db(ue_db_)
sched_worker_manager::sched_worker_manager(ue_map_t& ue_db_, const sched_params& cfg_) : cfg(cfg_), ue_db(ue_db_)
{
// Note: For now, we only allow parallelism at the sector level
slot_ctxts.resize(cfg.nof_concurrent_subframes);
for (size_t i = 0; i < cfg.nof_concurrent_subframes; ++i) {
slot_ctxts.resize(cfg.sched_cfg.nof_concurrent_subframes);
for (size_t i = 0; i < cfg.sched_cfg.nof_concurrent_subframes; ++i) {
slot_ctxts[i].reset(new slot_worker_ctxt());
sem_init(&slot_ctxts[i]->sf_sem, 0, 1);
slot_ctxts[i]->workers.reserve(cfg.cells.size());
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
slot_ctxts[i]->workers.emplace_back(cc, cfg);
slot_ctxts[i]->workers.emplace_back(cfg.cells[cc]);
}
}
}
@ -129,7 +129,7 @@ void sched_worker_manager::reserve_workers(tti_point tti_rx_, srsran::span<sched
sf_worker_ctxt.sf_result = sf_result_;
sf_worker_ctxt.tti_rx = tti_rx_;
sf_worker_ctxt.worker_count = static_cast<int>(sf_worker_ctxt.workers.size());
sf_worker_ctxt.worker_count.store(static_cast<int>(sf_worker_ctxt.workers.size()), std::memory_order_relaxed);
}
void sched_worker_manager::start_tti(tti_point tti_rx_)
@ -146,10 +146,6 @@ bool sched_worker_manager::run_tti(tti_point tti_rx_, uint32_t cc, sched_nr_res_
{
auto& sf_worker_ctxt = get_sf(tti_rx_);
srsran_assert(sf_worker_ctxt.tti_rx == tti_rx_, "invalid run_tti(tti, cc) arguments");
if (not sf_worker_ctxt.workers[cc].running()) {
// run for this tti and cc was already called
return false;
}
// Get {tti, cc} scheduling decision
sf_worker_ctxt.workers[cc].run();
@ -158,9 +154,9 @@ bool sched_worker_manager::run_tti(tti_point tti_rx_, uint32_t cc, sched_nr_res_
result = sf_worker_ctxt.sf_result[cc];
// decrement the number of active workers
--sf_worker_ctxt.worker_count;
srsran_assert(sf_worker_ctxt.worker_count >= 0, "invalid number of calls to run_tti(tti, cc)");
return sf_worker_ctxt.worker_count == 0;
int rem_workers = sf_worker_ctxt.worker_count.fetch_sub(1, std::memory_order_release) - 1;
srsran_assert(rem_workers >= 0, "invalid number of calls to run_tti(tti, cc)");
return rem_workers == 0;
}
void sched_worker_manager::end_tti(tti_point tti_rx_)

@ -16,12 +16,50 @@
namespace srsenb {
struct task_job_manager {
std::mutex mutex;
std::condition_variable cond_var;
int tasks = 0;
int pdsch_count = 0;
int max_tasks = std::numeric_limits<int>::max() / 2;
void start_task()
{
std::unique_lock<std::mutex> lock(mutex);
while (tasks >= max_tasks) {
cond_var.wait(lock);
}
tasks++;
}
void finish_task(const sched_nr_res_t& res)
{
std::unique_lock<std::mutex> lock(mutex);
TESTASSERT(res.dl_res.data.size() <= 1);
pdsch_count += res.dl_res.data.size();
if (tasks-- >= max_tasks or tasks == 0) {
cond_var.notify_one();
}
}
void wait_task_finish()
{
std::unique_lock<std::mutex> lock(mutex);
while (tasks > 0) {
cond_var.wait(lock);
}
}
};
void sched_nr_cfg_serialized_test()
{
uint32_t max_nof_ttis = 1000;
task_job_manager tasks;
sched_nr_cfg cfg;
cfg.cells.resize(1);
std::vector<sched_nr_cell_cfg> cells_cfg;
cells_cfg.resize(1);
sched_nr sched(cfg);
sched.cell_cfg(cells_cfg);
sched_nr_ue_cfg uecfg;
uecfg.carriers.resize(1);
@ -31,90 +69,105 @@ void sched_nr_cfg_serialized_test()
for (uint32_t nof_ttis = 0; nof_ttis < 1000; ++nof_ttis) {
tti_point tti(nof_ttis % 10240);
sched.new_tti(tti);
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
for (uint32_t cc = 0; cc < cells_cfg.size(); ++cc) {
tasks.start_task();
sched_nr_res_t res;
TESTASSERT(sched.generate_sched_result(tti, cc, res) == SRSRAN_SUCCESS);
tasks.finish_task(res);
}
}
printf("TESTER: %f PDSCH/slot were allocated\n", tasks.pdsch_count / (double)max_nof_ttis);
}
void sched_nr_cfg_parallel_cc_test()
{
std::atomic<int> tasks{0};
uint32_t max_nof_ttis = 1000;
task_job_manager tasks;
sched_nr_cfg cfg;
cfg.cells.resize(4);
std::vector<sched_nr_cell_cfg> cells_cfg;
cells_cfg.resize(4);
sched_nr sched(cfg);
sched.cell_cfg(cells_cfg);
sched_nr_ue_cfg uecfg;
uecfg.carriers.resize(cfg.cells.size());
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
uecfg.carriers.resize(cells_cfg.size());
for (uint32_t cc = 0; cc < cells_cfg.size(); ++cc) {
uecfg.carriers[cc].active = true;
}
sched.ue_cfg(0x46, uecfg);
for (uint32_t nof_ttis = 0; nof_ttis < 1000; ++nof_ttis) {
for (uint32_t nof_ttis = 0; nof_ttis < max_nof_ttis; ++nof_ttis) {
tti_point tti(nof_ttis % 10240);
sched.new_tti(tti);
++tasks;
srsran::get_background_workers().push_task([&cfg, &sched, tti, &tasks]() {
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
for (uint32_t cc = 0; cc < cells_cfg.size(); ++cc) {
tasks.start_task();
srsran::get_background_workers().push_task([cc, &sched, tti, &tasks]() {
sched_nr_res_t res;
TESTASSERT(sched.generate_sched_result(tti, cc, res) == SRSRAN_SUCCESS);
}
--tasks;
tasks.finish_task(res);
});
}
while (tasks > 0) {
usleep(100);
}
tasks.wait_task_finish();
printf("TESTER: %f PDSCH/slot were allocated\n", tasks.pdsch_count / (double)max_nof_ttis);
}
void sched_nr_cfg_parallel_sf_test()
{
uint32_t max_nof_ttis = 1000;
uint32_t nof_sectors = 2;
std::atomic<int> tasks{0};
task_job_manager tasks;
sched_nr_cfg cfg;
cfg.nof_concurrent_subframes = 2;
cfg.cells.resize(nof_sectors);
std::vector<sched_nr_cell_cfg> cells_cfg;
cells_cfg.resize(nof_sectors);
sched_nr sched(cfg);
sched.cell_cfg(cells_cfg);
sched_nr_ue_cfg uecfg;
uecfg.carriers.resize(cfg.cells.size());
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
uecfg.carriers.resize(cells_cfg.size());
for (uint32_t cc = 0; cc < cells_cfg.size(); ++cc) {
uecfg.carriers[cc].active = true;
}
sched.ue_cfg(0x46, uecfg);
for (uint32_t nof_ttis = 0; nof_ttis < 1000; ++nof_ttis) {
for (uint32_t nof_ttis = 0; nof_ttis < max_nof_ttis; ++nof_ttis) {
tti_point tti(nof_ttis % 10240);
sched.new_tti(tti);
++tasks;
srsran::get_background_workers().push_task([&cfg, &sched, tti, &tasks]() {
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
tasks.start_task();
for (uint32_t cc = 0; cc < cells_cfg.size(); ++cc) {
srsran::get_background_workers().push_task([cc, &sched, tti, &tasks]() {
sched_nr_res_t res;
TESTASSERT(sched.generate_sched_result(tti, cc, res) == SRSRAN_SUCCESS);
}
--tasks;
tasks.finish_task(res);
});
}
while (tasks > 0) {
usleep(100);
}
tasks.wait_task_finish();
printf("TESTER: %f PDSCH/slot were allocated\n", tasks.pdsch_count / (double)max_nof_ttis);
}
} // namespace srsenb
int main()
{
auto& mac_logger = srslog::fetch_basic_logger("MAC");
mac_logger.set_level(srslog::basic_levels::debug);
auto& pool_logger = srslog::fetch_basic_logger("POOL");
pool_logger.set_level(srslog::basic_levels::debug);
srsran::get_background_workers().set_nof_workers(8);
srsenb::sched_nr_cfg_serialized_test();
srsenb::sched_nr_cfg_parallel_cc_test();
srsenb::sched_nr_cfg_parallel_sf_test();
// srsenb::sched_nr_cfg_parallel_sf_test();
}
Loading…
Cancel
Save