sched,nr: simplify scheduler threading architecture to reflect the new cc-based parallelism model

master
Francisco Paisana 3 years ago committed by Ismael Gomez
parent 74d0a8adb8
commit e6683b7387

@ -27,7 +27,7 @@ namespace srsenb {
namespace sched_nr_impl {
class sched_worker_manager;
class serv_cell_ctxt;
class serv_cell_manager;
} // namespace sched_nr_impl
class ue_event_manager;
@ -62,14 +62,11 @@ private:
std::mutex ue_db_mutex;
ue_map_t ue_db;
// management of UE feedback
std::unique_ptr<ue_event_manager> pending_events;
// management of Sched Result buffering
std::unique_ptr<sched_result_manager> pending_results;
// management of cell resources
std::vector<std::unique_ptr<sched_nr_impl::serv_cell_ctxt> > cells;
std::vector<std::unique_ptr<sched_nr_impl::serv_cell_manager> > cells;
};
} // namespace srsenb

@ -10,8 +10,8 @@
*
*/
#ifndef SRSRAN_SCHED_NR_BWP_H
#define SRSRAN_SCHED_NR_BWP_H
#ifndef SRSRAN_SCHED_NR_CELL_H
#define SRSRAN_SCHED_NR_CELL_H
#include "sched_nr_cfg.h"
#include "sched_nr_rb_grid.h"
@ -62,17 +62,26 @@ public:
bwp_res_grid grid;
};
class serv_cell_ctxt
class serv_cell_manager
{
public:
using feedback_callback_t = srsran::move_callback<void(ue_carrier&)>;
explicit serv_cell_manager(const sched_cell_params& cell_cfg_);
void add_user(uint16_t rnti, ue_carrier* ue);
void rem_user(uint16_t rnti);
srsran::bounded_vector<bwp_ctxt, SCHED_NR_MAX_BWP_PER_CELL> bwps;
const sched_cell_params& cfg;
explicit serv_cell_ctxt(const sched_cell_params& cell_cfg_);
srsran::static_circular_map<uint16_t, ue_carrier*, SCHED_NR_MAX_USERS> ues;
const sched_cell_params* cfg;
private:
srslog::basic_logger& logger;
};
} // namespace sched_nr_impl
} // namespace srsenb
#endif // SRSRAN_SCHED_NR_BWP_H
#endif // SRSRAN_SCHED_NR_CELL_H

@ -63,9 +63,7 @@ public:
srsran::bounded_vector<bwp_cfg_t, SCHED_NR_MAX_BWP_PER_CELL> bwps{1}; // idx0 for BWP-common
};
struct sched_cfg_t {
uint32_t nof_concurrent_subframes = 1;
};
struct sched_cfg_t {};
struct ue_cc_cfg_t {
bool active = false;

@ -30,41 +30,37 @@ class slot_ue
{
public:
slot_ue() = default;
explicit slot_ue(resource_guard::token ue_token, uint16_t rnti_, tti_point tti_rx_, uint32_t cc);
explicit slot_ue(uint16_t rnti_, tti_point tti_rx_, uint32_t cc);
slot_ue(slot_ue&&) noexcept = default;
slot_ue& operator=(slot_ue&&) noexcept = default;
bool empty() const { return ue_token.empty(); }
void release() { ue_token.release(); }
bool empty() const { return rnti == SCHED_NR_INVALID_RNTI; }
void release() { rnti = SCHED_NR_INVALID_RNTI; }
uint16_t rnti = SCHED_NR_INVALID_RNTI;
tti_point tti_rx;
uint32_t cc = SCHED_NR_MAX_CARRIERS;
// UE parameters common to all sectors
const bwp_ue_cfg* cfg = nullptr;
bool pending_sr;
bool pending_sr;
// UE parameters that are sector specific
const ue_cc_cfg_t* cc_cfg = nullptr;
tti_point pdcch_tti;
tti_point pdsch_tti;
tti_point pusch_tti;
tti_point uci_tti;
uint32_t dl_cqi;
uint32_t ul_cqi;
dl_harq_proc* h_dl = nullptr;
harq_proc* h_ul = nullptr;
private:
resource_guard::token ue_token;
const bwp_ue_cfg* cfg = nullptr;
tti_point pdcch_tti;
tti_point pdsch_tti;
tti_point pusch_tti;
tti_point uci_tti;
uint32_t dl_cqi;
uint32_t ul_cqi;
dl_harq_proc* h_dl = nullptr;
harq_proc* h_ul = nullptr;
};
class ue_carrier
{
public:
ue_carrier(uint16_t rnti, const ue_cfg_t& cfg, const sched_cell_params& cell_params_);
slot_ue try_reserve(tti_point pdcch_tti, const ue_cfg_t& cfg);
void push_feedback(srsran::move_callback<void(ue_carrier&)> callback);
void new_tti(tti_point pdcch_tti, const ue_cfg_t& uecfg_);
slot_ue try_reserve(tti_point pdcch_tti);
const uint16_t rnti;
const uint32_t cc;
@ -78,11 +74,6 @@ public:
private:
bwp_ue_cfg bwp_cfg;
const sched_cell_params& cell_params;
resource_guard busy;
tti_point last_tti_rx;
srsran::deque<srsran::move_callback<void(ue_carrier&)> > pending_feedback;
};
class ue
@ -90,12 +81,16 @@ class ue
public:
ue(uint16_t rnti, const ue_cfg_t& cfg, const sched_params& sched_cfg_);
slot_ue try_reserve(tti_point tti_rx, uint32_t cc);
slot_ue try_reserve(tti_point pdcch_tti, uint32_t cc);
void set_cfg(const ue_cfg_t& cfg);
void set_cfg(const ue_cfg_t& cfg);
const ue_cfg_t& cfg() const { return ue_cfg; }
void ul_sr_info(tti_point tti_rx) { pending_sr = true; }
bool has_ca() const { return ue_cfg.carriers.size() > 1; }
uint32_t pcell_cc() const { return ue_cfg.carriers[0].cc; }
std::array<std::unique_ptr<ue_carrier>, SCHED_NR_MAX_CARRIERS> carriers;
private:
@ -104,8 +99,7 @@ private:
bool pending_sr = false;
int current_idx = 0;
std::array<ue_cfg_t, 4> ue_cfgs;
ue_cfg_t ue_cfg;
};
using ue_map_t = srsran::static_circular_map<uint16_t, std::unique_ptr<ue>, SCHED_NR_MAX_USERS>;

@ -13,7 +13,7 @@
#ifndef SRSRAN_SCHED_NR_WORKER_H
#define SRSRAN_SCHED_NR_WORKER_H
#include "sched_nr_bwp.h"
#include "sched_nr_cell.h"
#include "sched_nr_cfg.h"
#include "sched_nr_rb_grid.h"
#include "sched_nr_ue.h"
@ -33,25 +33,41 @@ using ul_sched_t = sched_nr_interface::ul_sched_t;
class slot_cc_worker
{
public:
explicit slot_cc_worker(serv_cell_ctxt& sched);
using feedback_callback_t = srsran::move_callback<void(ue_carrier&)>;
void start(tti_point tti_rx_, ue_map_t& ue_db_);
explicit slot_cc_worker(serv_cell_manager& sched);
void start(tti_point pdcch_tti, ue_map_t& ue_db_);
void run();
void end_tti();
void finish();
bool running() const { return tti_rx.is_valid(); }
/// Enqueue feedback directed at a given UE in a given cell
void enqueue_cc_feedback(uint16_t rnti, feedback_callback_t fdbk);
private:
/// Run all pending feedback. This should be called at the beginning of a TTI
void run_feedback(ue_map_t& ue_db);
void alloc_dl_ues();
void alloc_ul_ues();
void log_result() const;
const sched_cell_params& cfg;
serv_cell_ctxt& cell;
serv_cell_manager& cell;
srslog::basic_logger& logger;
tti_point tti_rx;
bwp_slot_allocator bwp_alloc;
// Process of UE cell-specific feedback
struct feedback_t {
uint16_t rnti;
feedback_callback_t fdbk;
};
std::mutex feedback_mutex;
srsran::deque<feedback_t> pending_feedback, tmp_feedback_to_run;
srsran::static_circular_map<uint16_t, slot_ue, SCHED_NR_MAX_USERS> slot_ues;
};
@ -67,28 +83,49 @@ class sched_worker_manager
};
public:
explicit sched_worker_manager(ue_map_t& ue_db_, const sched_params& cfg_);
explicit sched_worker_manager(ue_map_t& ue_db_,
const sched_params& cfg_,
srsran::span<std::unique_ptr<serv_cell_manager> > cells_);
sched_worker_manager(const sched_worker_manager&) = delete;
sched_worker_manager(sched_worker_manager&&) = delete;
~sched_worker_manager();
void start_slot(tti_point tti_rx, srsran::move_callback<void()> process_feedback);
bool run_slot(tti_point tti_rx, uint32_t cc);
void release_slot(tti_point tti_rx);
void run_slot(tti_point tti_tx, uint32_t cc);
bool save_sched_result(tti_point pdcch_tti, uint32_t cc, dl_sched_t& dl_res, ul_sched_t& ul_res);
private:
const sched_params& cfg;
ue_map_t& ue_db;
srslog::basic_logger& logger;
void enqueue_event(uint16_t rnti, srsran::move_callback<void()> ev);
void enqueue_cc_feedback(uint16_t rnti, uint32_t cc, slot_cc_worker::feedback_callback_t fdbk)
{
cc_worker_list[cc]->worker.enqueue_cc_feedback(rnti, std::move(fdbk));
}
std::mutex ue_db_mutex;
private:
const sched_params& cfg;
ue_map_t& ue_db;
srsran::span<std::unique_ptr<serv_cell_manager> > cells;
srslog::basic_logger& logger;
struct ue_event_t {
uint16_t rnti;
srsran::move_callback<void()> callback;
};
std::mutex event_mutex;
srsran::deque<ue_event_t> next_slot_events, slot_events;
std::vector<std::unique_ptr<slot_worker_ctxt> > slot_worker_ctxts;
struct cc_context {
std::condition_variable cvar;
bool waiting = false;
slot_cc_worker worker;
srsran::bounded_vector<serv_cell_ctxt, SCHED_NR_MAX_CARRIERS> cell_grid_list;
cc_context(serv_cell_manager& sched) : worker(sched) {}
};
slot_worker_ctxt& get_sf(tti_point tti_rx);
std::mutex slot_mutex;
std::condition_variable cvar;
tti_point current_tti;
std::atomic<int> worker_count{0}; // variable shared across slot_cc_workers
std::vector<std::unique_ptr<cc_context> > cc_worker_list;
};
} // namespace sched_nr_impl

@ -16,7 +16,7 @@ set(SOURCES mac_nr.cc
sched_nr_pdcch.cc
sched_nr_cfg.cc
sched_nr_helpers.cc
sched_nr_bwp.cc
sched_nr_cell.cc
sched_nr_rb.cc
harq_softbuffer.cc)

@ -12,7 +12,7 @@
#include "srsenb/hdr/stack/mac/nr/sched_nr.h"
#include "srsenb/hdr/stack/mac/nr/harq_softbuffer.h"
#include "srsenb/hdr/stack/mac/nr/sched_nr_bwp.h"
#include "srsenb/hdr/stack/mac/nr/sched_nr_cell.h"
#include "srsenb/hdr/stack/mac/nr/sched_nr_worker.h"
#include "srsran/common/thread_pool.h"
@ -22,62 +22,6 @@ using namespace sched_nr_impl;
static int assert_ue_cfg_valid(uint16_t rnti, const sched_nr_interface::ue_cfg_t& uecfg);
class ue_event_manager
{
using callback_t = srsran::move_callback<void()>;
using callback_list = srsran::deque<callback_t>;
public:
explicit ue_event_manager(ue_map_t& ue_db_) : ue_db(ue_db_) {}
void push_event(srsran::move_callback<void()> event)
{
std::lock_guard<std::mutex> lock(common_mutex);
common_events.push_back(std::move(event));
}
void push_cc_feedback(uint16_t rnti, uint32_t cc, srsran::move_callback<void(ue_carrier&)> event)
{
std::lock_guard<std::mutex> lock(common_mutex);
feedback_list.emplace_back();
feedback_list.back().rnti = rnti;
feedback_list.back().cc = cc;
feedback_list.back().callback = std::move(event);
}
void new_slot()
{
{
std::lock_guard<std::mutex> lock(common_mutex);
common_events.swap(common_events_tmp); // reuse memory
feedback_list.swap(feedback_list_tmp);
}
while (not common_events_tmp.empty()) {
common_events_tmp.front()();
common_events_tmp.pop_front();
}
while (not feedback_list_tmp.empty()) {
auto& e = feedback_list_tmp.front();
if (ue_db.contains(e.rnti) and ue_db[e.rnti]->carriers[e.cc] != nullptr) {
ue_db[e.rnti]->carriers[e.cc]->push_feedback(std::move(e.callback));
}
feedback_list_tmp.pop_front();
}
}
private:
ue_map_t& ue_db;
std::mutex common_mutex;
callback_list common_events;
struct ue_feedback {
uint16_t rnti = SCHED_NR_INVALID_RNTI;
uint32_t cc = SCHED_NR_MAX_CARRIERS;
srsran::move_callback<void(ue_carrier&)> callback;
};
srsran::deque<ue_feedback> feedback_list;
callback_list common_events_tmp;
srsran::deque<ue_feedback> feedback_list_tmp;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class sched_result_manager
@ -142,21 +86,26 @@ private:
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
sched_nr::sched_nr(const sched_cfg_t& sched_cfg) :
cfg(sched_cfg), pending_events(new ue_event_manager(ue_db)), logger(srslog::fetch_basic_logger("MAC"))
{}
sched_nr::sched_nr(const sched_cfg_t& sched_cfg) : cfg(sched_cfg), logger(srslog::fetch_basic_logger("MAC")) {}
sched_nr::~sched_nr() {}
int sched_nr::cell_cfg(srsran::const_span<cell_cfg_t> cell_list)
{
// Initiate Common Sched Configuration
cfg.cells.reserve(cell_list.size());
for (uint32_t cc = 0; cc < cell_list.size(); ++cc) {
cfg.cells.emplace_back(cc, cell_list[cc], cfg.sched_cfg);
}
// Initiate cell-specific schedulers
cells.reserve(cell_list.size());
for (uint32_t cc = 0; cc < cell_list.size(); ++cc) {
cells.emplace_back(new serv_cell_manager{cfg.cells[cc]});
}
pending_results.reset(new sched_result_manager(cell_list.size()));
sched_workers.reset(new sched_nr_impl::sched_worker_manager(ue_db, cfg));
sched_workers.reset(new sched_nr_impl::sched_worker_manager(ue_db, cfg, cells));
return SRSRAN_SUCCESS;
}
@ -164,7 +113,7 @@ int sched_nr::cell_cfg(srsran::const_span<cell_cfg_t> cell_list)
void sched_nr::ue_cfg(uint16_t rnti, const ue_cfg_t& uecfg)
{
srsran_assert(assert_ue_cfg_valid(rnti, uecfg) == SRSRAN_SUCCESS, "Invalid UE configuration");
pending_events->push_event([this, rnti, uecfg]() { ue_cfg_impl(rnti, uecfg); });
sched_workers->enqueue_event(rnti, [this, rnti, uecfg]() { ue_cfg_impl(rnti, uecfg); });
}
void sched_nr::ue_cfg_impl(uint16_t rnti, const ue_cfg_t& uecfg)
@ -179,22 +128,8 @@ void sched_nr::ue_cfg_impl(uint16_t rnti, const ue_cfg_t& uecfg)
/// Generate {tti,cc} scheduling decision
int sched_nr::generate_slot_result(tti_point pdcch_tti, uint32_t cc)
{
tti_point tti_rx = pdcch_tti - TX_ENB_DELAY;
// Lock carrier workers for provided tti_rx
sched_workers->start_slot(tti_rx, [this]() {
// In case it is first worker for the given slot
// synchronize {tti,cc} state. e.g. reserve UE resources for {tti,cc} decision, process feedback
pending_events->new_slot();
});
// unlocked, parallel region
bool all_workers_finished = sched_workers->run_slot(tti_rx, cc);
if (all_workers_finished) {
// once all workers of the same subframe finished, synchronize sched outcome with ue_db
sched_workers->release_slot(tti_rx);
}
// Generate {slot_idx,cc} result
sched_workers->run_slot(pdcch_tti, cc);
// Copy results to intermediate buffer
dl_sched_t& dl_res = pending_results->add_dl_result(pdcch_tti, cc);
@ -225,17 +160,13 @@ int sched_nr::get_ul_sched(tti_point tti_rx, uint32_t cc, ul_sched_t& result)
void sched_nr::dl_ack_info(uint16_t rnti, uint32_t cc, uint32_t pid, uint32_t tb_idx, bool ack)
{
pending_events->push_cc_feedback(
sched_workers->enqueue_cc_feedback(
rnti, cc, [pid, tb_idx, ack](ue_carrier& ue_cc) { ue_cc.harq_ent.dl_ack_info(pid, tb_idx, ack); });
}
void sched_nr::ul_sr_info(tti_point tti_rx, uint16_t rnti)
{
pending_events->push_event([this, rnti, tti_rx]() {
if (ue_db.contains(rnti)) {
ue_db[rnti]->ul_sr_info(tti_rx);
}
});
sched_workers->enqueue_event(rnti, [this, rnti, tti_rx]() { ue_db[rnti]->ul_sr_info(tti_rx); });
}
#define VERIFY_INPUT(cond, msg, ...) \

@ -10,7 +10,7 @@
*
*/
#include "srsenb/hdr/stack/mac/nr/sched_nr_bwp.h"
#include "srsenb/hdr/stack/mac/nr/sched_nr_cell.h"
#include "srsran/common/standard_streams.h"
#include "srsran/common/string_helpers.h"
@ -147,14 +147,15 @@ bwp_ctxt::bwp_ctxt(const bwp_params& bwp_cfg) : cfg(&bwp_cfg), ra(bwp_cfg), grid
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
serv_cell_ctxt::serv_cell_ctxt(const sched_cell_params& cell_cfg_) : cfg(&cell_cfg_)
serv_cell_manager::serv_cell_manager(const sched_cell_params& cell_cfg_) :
cfg(cell_cfg_), logger(srslog::fetch_basic_logger("MAC"))
{
for (uint32_t bwp_id = 0; bwp_id < cfg->cell_cfg.bwps.size(); ++bwp_id) {
for (uint32_t bwp_id = 0; bwp_id < cfg.cell_cfg.bwps.size(); ++bwp_id) {
bwps.emplace_back(cell_cfg_.bwps[bwp_id]);
}
// Pre-allocate HARQs in common pool of softbuffers
harq_softbuffer_pool::get_instance().init_pool(cfg->nof_prb());
harq_softbuffer_pool::get_instance().init_pool(cfg.nof_prb());
}
} // namespace sched_nr_impl

@ -16,9 +16,7 @@
namespace srsenb {
namespace sched_nr_impl {
slot_ue::slot_ue(resource_guard::token ue_token_, uint16_t rnti_, tti_point tti_rx_, uint32_t cc_) :
ue_token(std::move(ue_token_)), rnti(rnti_), tti_rx(tti_rx_), cc(cc_)
{}
slot_ue::slot_ue(uint16_t rnti_, tti_point tti_rx_, uint32_t cc_) : rnti(rnti_), tti_rx(tti_rx_), cc(cc_) {}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -30,40 +28,22 @@ ue_carrier::ue_carrier(uint16_t rnti_, const ue_cfg_t& uecfg_, const sched_cell_
harq_ent(cell_params_.nof_prb())
{}
void ue_carrier::push_feedback(srsran::move_callback<void(ue_carrier&)> callback)
{
pending_feedback.push_back(std::move(callback));
}
slot_ue ue_carrier::try_reserve(tti_point tti_rx, const ue_cfg_t& uecfg_)
void ue_carrier::new_tti(tti_point pdcch_tti, const ue_cfg_t& uecfg_)
{
slot_ue sfu(busy, rnti, tti_rx, cc);
if (sfu.empty()) {
return sfu;
}
// successfully acquired. Process any CC-specific pending feedback
if (bwp_cfg.ue_cfg() != &uecfg_) {
bwp_cfg = bwp_ue_cfg(rnti, cell_params.bwps[0], uecfg_);
}
while (not pending_feedback.empty()) {
pending_feedback.front()(*this);
pending_feedback.pop_front();
}
if (not last_tti_rx.is_valid()) {
last_tti_rx = tti_rx;
harq_ent.new_tti(tti_rx);
} else {
while (last_tti_rx++ < tti_rx) {
harq_ent.new_tti(tti_rx);
}
}
harq_ent.new_tti(pdcch_tti - TX_ENB_DELAY);
}
// set UE parameters common to all carriers
sfu.cfg = &bwp_cfg;
slot_ue ue_carrier::try_reserve(tti_point pdcch_tti)
{
tti_point tti_rx = pdcch_tti - TX_ENB_DELAY;
// copy cc-specific parameters and find available HARQs
sfu.cc_cfg = &uecfg_.carriers[cc];
sfu.pdcch_tti = tti_rx + TX_ENB_DELAY;
slot_ue sfu(rnti, tti_rx, cc);
sfu.cfg = &bwp_cfg;
sfu.pdcch_tti = pdcch_tti;
const uint32_t k0 = 0;
sfu.pdsch_tti = sfu.pdcch_tti + k0;
uint32_t k1 =
@ -100,9 +80,9 @@ slot_ue ue_carrier::try_reserve(tti_point tti_rx, const ue_cfg_t& uecfg_)
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ue::ue(uint16_t rnti_, const ue_cfg_t& cfg, const sched_params& sched_cfg_) : rnti(rnti_), sched_cfg(sched_cfg_)
ue::ue(uint16_t rnti_, const ue_cfg_t& cfg, const sched_params& sched_cfg_) :
rnti(rnti_), sched_cfg(sched_cfg_), ue_cfg(cfg)
{
ue_cfgs[0] = cfg;
for (uint32_t cc = 0; cc < cfg.carriers.size(); ++cc) {
if (cfg.carriers[cc].active) {
carriers[cc].reset(new ue_carrier(rnti, cfg, sched_cfg.cells[cc]));
@ -112,19 +92,19 @@ ue::ue(uint16_t rnti_, const ue_cfg_t& cfg, const sched_params& sched_cfg_) : rn
void ue::set_cfg(const ue_cfg_t& cfg)
{
current_idx = (current_idx + 1U) % ue_cfgs.size();
ue_cfgs[current_idx] = ue_cfg_extended(rnti, cfg);
ue_cfg = cfg;
}
slot_ue ue::try_reserve(tti_point tti_rx, uint32_t cc)
slot_ue ue::try_reserve(tti_point pdcch_tti, uint32_t cc)
{
if (carriers[cc] == nullptr) {
return slot_ue();
}
slot_ue sfu = carriers[cc]->try_reserve(tti_rx, ue_cfgs[current_idx]);
slot_ue sfu = carriers[cc]->try_reserve(pdcch_tti);
if (sfu.empty()) {
return slot_ue();
}
// set UE-common parameters
sfu.pending_sr = pending_sr;

@ -16,24 +16,57 @@
namespace srsenb {
namespace sched_nr_impl {
slot_cc_worker::slot_cc_worker(serv_cell_ctxt& cc_sched) :
cell(cc_sched), cfg(*cc_sched.cfg), bwp_alloc(cc_sched.bwps[0].grid), logger(srslog::fetch_basic_logger("MAC"))
slot_cc_worker::slot_cc_worker(serv_cell_manager& cc_sched) :
cell(cc_sched), cfg(cc_sched.cfg), bwp_alloc(cc_sched.bwps[0].grid), logger(srslog::fetch_basic_logger("MAC"))
{}
void slot_cc_worker::enqueue_cc_feedback(uint16_t rnti, feedback_callback_t fdbk)
{
std::lock_guard<std::mutex> lock(feedback_mutex);
pending_feedback.emplace_back();
pending_feedback.back().rnti = rnti;
pending_feedback.back().fdbk = std::move(fdbk);
}
void slot_cc_worker::run_feedback(ue_map_t& ue_db)
{
{
std::lock_guard<std::mutex> lock(feedback_mutex);
tmp_feedback_to_run.swap(pending_feedback);
}
for (feedback_t& f : tmp_feedback_to_run) {
if (ue_db.contains(f.rnti) and ue_db[f.rnti]->carriers[cfg.cc] != nullptr) {
f.fdbk(*ue_db[f.rnti]->carriers[cfg.cc]);
} else {
logger.warning("SCHED: feedback received for invalid rnti=0x%x, cc=%d", f.rnti, cfg.cc);
}
}
tmp_feedback_to_run.clear();
}
/// Called at the beginning of TTI in a locked context, to reserve available UE resources
void slot_cc_worker::start(tti_point tti_rx_, ue_map_t& ue_db)
void slot_cc_worker::start(tti_point pdcch_tti, ue_map_t& ue_db)
{
srsran_assert(not running(), "scheduler worker::start() called for active worker");
tti_rx = tti_rx_;
tti_rx = pdcch_tti - TX_ENB_DELAY;
// Try reserve UE cells for this worker
// Run pending cell feedback
run_feedback(ue_db);
// Reserve UEs for this worker slot
for (auto& ue_pair : ue_db) {
uint16_t rnti = ue_pair.first;
ue& u = *ue_pair.second;
if (u.carriers[cfg.cc] == nullptr) {
continue;
}
slot_ues.insert(rnti, u.try_reserve(tti_rx, cfg.cc));
u.carriers[cfg.cc]->new_tti(pdcch_tti, u.cfg());
slot_ues.insert(rnti, u.try_reserve(pdcch_tti, cfg.cc));
if (slot_ues[rnti].empty()) {
// Failed to synchronize because UE is being used by another worker
// Failed to generate slot UE because UE has no conditions for DL/UL tx
slot_ues.erase(rnti);
continue;
}
@ -56,18 +89,17 @@ void slot_cc_worker::run()
// Log CC scheduler result
log_result();
}
void slot_cc_worker::end_tti()
{
srsran_assert(running(), "scheduler worker::end() called for non-active worker");
// releases UE resources
slot_ues.clear();
tti_rx = {};
}
void slot_cc_worker::finish()
{
// synchronize results
}
void slot_cc_worker::alloc_dl_ues()
{
if (slot_ues.empty()) {
@ -108,7 +140,7 @@ void slot_cc_worker::log_result() const
fmt::format_to(fmtbuf,
"SCHED: DL {}, cc={}, rnti=0x{:x}, pid={}, nrtx={}, dai={}, tti_pdsch={}, tti_ack={}",
ue.h_dl->nof_retx() == 0 ? "tx" : "retx",
cell.cfg->cc,
cell.cfg.cc,
ue.rnti,
ue.h_dl->pid,
ue.h_dl->nof_retx(),
@ -116,7 +148,7 @@ void slot_cc_worker::log_result() const
ue.pdsch_tti,
ue.uci_tti);
} else if (pdcch.dci.ctx.rnti_type == srsran_rnti_type_ra) {
fmt::format_to(fmtbuf, "SCHED: DL RAR, cc={}", cell.cfg->cc);
fmt::format_to(fmtbuf, "SCHED: DL RAR, cc={}", cell.cfg.cc);
} else {
fmt::format_to(fmtbuf, "SCHED: unknown format");
}
@ -127,105 +159,112 @@ void slot_cc_worker::log_result() const
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
sched_worker_manager::sched_worker_manager(ue_map_t& ue_db_, const sched_params& cfg_) :
cfg(cfg_), ue_db(ue_db_), logger(srslog::fetch_basic_logger("MAC"))
sched_worker_manager::sched_worker_manager(ue_map_t& ue_db_,
const sched_params& cfg_,
srsran::span<std::unique_ptr<serv_cell_manager> > cells_) :
cfg(cfg_), ue_db(ue_db_), logger(srslog::fetch_basic_logger("MAC")), cells(cells_)
{
cc_worker_list.reserve(cfg.cells.size());
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
cell_grid_list.emplace_back(cfg.cells[cc]);
}
// Note: For now, we only allow parallelism at the sector level
slot_worker_ctxts.resize(cfg.sched_cfg.nof_concurrent_subframes);
for (size_t i = 0; i < cfg.sched_cfg.nof_concurrent_subframes; ++i) {
slot_worker_ctxts[i].reset(new slot_worker_ctxt());
slot_worker_ctxts[i]->workers.reserve(cfg.cells.size());
for (uint32_t cc = 0; cc < cfg.cells.size(); ++cc) {
slot_worker_ctxts[i]->workers.emplace_back(cell_grid_list[cc]);
}
cc_worker_list.emplace_back(new cc_context{*cells[cc]});
}
}
sched_worker_manager::~sched_worker_manager() = default;
sched_worker_manager::slot_worker_ctxt& sched_worker_manager::get_sf(tti_point tti_rx)
void sched_worker_manager::enqueue_event(uint16_t rnti, srsran::move_callback<void()> ev)
{
return *slot_worker_ctxts[tti_rx.to_uint() % slot_worker_ctxts.size()];
std::lock_guard<std::mutex> lock(event_mutex);
next_slot_events.push_back(ue_event_t{rnti, std::move(ev)});
}
void sched_worker_manager::start_slot(tti_point tti_rx, srsran::move_callback<void()> process_feedback)
void sched_worker_manager::run_slot(tti_point tti_tx, uint32_t cc)
{
auto& sf_worker_ctxt = get_sf(tti_rx);
std::unique_lock<std::mutex> lock(sf_worker_ctxt.slot_mutex);
while ((sf_worker_ctxt.tti_rx.is_valid() and sf_worker_ctxt.tti_rx != tti_rx)) {
// wait for previous slot to finish
sf_worker_ctxt.nof_workers_waiting++;
sf_worker_ctxt.cvar.wait(lock);
sf_worker_ctxt.nof_workers_waiting--;
}
if (sf_worker_ctxt.tti_rx == tti_rx) {
// another worker with the same slot idx already started
return;
}
srsran::bounded_vector<std::condition_variable*, SRSRAN_MAX_CARRIERS> waiting_cvars;
{
std::lock_guard<std::mutex> db_lock(ue_db_mutex);
process_feedback();
std::unique_lock<std::mutex> lock(slot_mutex);
while (current_tti.is_valid() and current_tti != tti_tx) {
// Wait for previous slot to finish
cc_worker_list[cc]->waiting = true;
cc_worker_list[cc]->cvar.wait(lock);
cc_worker_list[cc]->waiting = false;
}
if (not current_tti.is_valid()) {
/* First Worker to start slot */
// process non-cc specific feedback if pending (e.g. SRs, buffer updates, UE config) for UEs with CA
// NOTE: there is no parallelism in these operations
slot_events.clear();
{
std::lock_guard<std::mutex> ev_lock(event_mutex);
next_slot_events.swap(slot_events);
}
for (ue_event_t& ev : slot_events) {
if (not ue_db.contains(ev.rnti) or ue_db[ev.rnti]->has_ca()) {
ev.callback();
}
}
for (uint32_t cc = 0; cc < sf_worker_ctxt.workers.size(); ++cc) {
sf_worker_ctxt.workers[cc].start(tti_rx, ue_db);
// mark the start of slot. awake remaining workers if locking on the mutex
current_tti = tti_tx;
worker_count.store(static_cast<int>(cc_worker_list.size()), std::memory_order_relaxed);
for (auto& w : cc_worker_list) {
if (w->waiting) {
waiting_cvars.push_back(&w->cvar);
}
}
lock.unlock();
for (auto& w : waiting_cvars) {
w->notify_one();
}
waiting_cvars.clear();
}
}
sf_worker_ctxt.tti_rx = tti_rx;
sf_worker_ctxt.worker_count.store(static_cast<int>(sf_worker_ctxt.workers.size()), std::memory_order_relaxed);
if (sf_worker_ctxt.nof_workers_waiting > 0) {
sf_worker_ctxt.cvar.notify_all();
/* Parallel Region */
// process non-cc specific feedback if pending (e.g. SRs, buffer updates, UE config) for UEs without CA
for (ue_event_t& ev : slot_events) {
if (ue_db.contains(ev.rnti) and not ue_db[ev.rnti]->has_ca() and ue_db[ev.rnti]->pcell_cc() == cc) {
ev.callback();
}
}
}
bool sched_worker_manager::run_slot(tti_point tti_rx_, uint32_t cc)
{
auto& sf_worker_ctxt = get_sf(tti_rx_);
srsran_assert(sf_worker_ctxt.tti_rx == tti_rx_, "invalid run_tti(tti, cc) arguments");
// process pending feedback and pre-cache UE state for slot decision
cc_worker_list[cc]->worker.start(tti_tx, ue_db);
// Get {tti, cc} scheduling decision
sf_worker_ctxt.workers[cc].run();
cc_worker_list[cc]->worker.run();
// decrement the number of active workers
int rem_workers = sf_worker_ctxt.worker_count.fetch_sub(1, std::memory_order_release) - 1;
int rem_workers = worker_count.fetch_sub(1, std::memory_order_release) - 1;
srsran_assert(rem_workers >= 0, "invalid number of calls to run_tti(tti, cc)");
if (rem_workers == 0) {
/* Last Worker to finish slot */
return rem_workers == 0;
}
void sched_worker_manager::release_slot(tti_point tti_rx_)
{
auto& sf_worker_ctxt = get_sf(tti_rx_);
srsran_assert(sf_worker_ctxt.tti_rx == tti_rx_, "invalid run_tti(tti, cc) arguments");
srsran_assert(sf_worker_ctxt.worker_count == 0, "invalid number of calls to run_tti(tti, cc)");
{
std::lock_guard<std::mutex> lock(ue_db_mutex);
// Signal the release of slot if it is the last worker that finished its own generation
std::unique_lock<std::mutex> lock(slot_mutex);
current_tti = {};
// All the workers of the same slot have finished. Synchronize scheduling decisions with UEs state
for (slot_cc_worker& worker : sf_worker_ctxt.workers) {
worker.end_tti();
for (auto& c : cc_worker_list) {
c->worker.finish();
if (c->waiting) {
waiting_cvars.push_back(&c->cvar);
}
}
}
std::unique_lock<std::mutex> lock(sf_worker_ctxt.slot_mutex);
sf_worker_ctxt.tti_rx = {};
if (sf_worker_ctxt.nof_workers_waiting > 0) {
// Awake waiting workers
lock.unlock();
sf_worker_ctxt.cvar.notify_one();
for (auto& c : waiting_cvars) {
c->notify_one();
}
}
}
bool sched_worker_manager::save_sched_result(tti_point pdcch_tti, uint32_t cc, dl_sched_t& dl_res, ul_sched_t& ul_res)
{
auto& bwp_slot = cell_grid_list[cc].bwps[0].grid[pdcch_tti];
auto& bwp_slot = cells[cc]->bwps[0].grid[pdcch_tti];
dl_res.pdcch_dl = bwp_slot.dl_pdcchs;
dl_res.pdcch_ul = bwp_slot.ul_pdcchs;

@ -142,7 +142,7 @@ struct task_job_manager {
void sched_nr_cfg_serialized_test()
{
uint32_t max_nof_ttis = 1000, nof_sectors = 2;
uint32_t max_nof_ttis = 1000, nof_sectors = 4;
task_job_manager tasks;
sched_nr_interface::sched_cfg_t cfg;
@ -150,7 +150,7 @@ void sched_nr_cfg_serialized_test()
sched_nr_sim_base sched_tester(cfg, cells_cfg, "Serialized Test");
sched_nr_interface::ue_cfg_t uecfg = get_default_ue_cfg(2);
sched_nr_interface::ue_cfg_t uecfg = get_default_ue_cfg(nof_sectors);
sched_tester.add_user(0x46, uecfg, 0);
@ -188,7 +188,7 @@ void sched_nr_cfg_serialized_test()
void sched_nr_cfg_parallel_cc_test()
{
uint32_t nof_sectors = 2;
uint32_t nof_sectors = 4;
uint32_t max_nof_ttis = 1000;
task_job_manager tasks;
@ -236,64 +236,14 @@ void sched_nr_cfg_parallel_cc_test()
printf("Total time taken per slot [usec]: %f\n", final_avg_usec);
}
void sched_nr_cfg_parallel_sf_test()
{
uint32_t max_nof_ttis = 1000;
uint32_t nof_sectors = 2;
task_job_manager tasks;
sched_nr_interface::sched_cfg_t cfg;
cfg.nof_concurrent_subframes = 2;
std::vector<sched_nr_interface::cell_cfg_t> cells_cfg = get_default_cells_cfg(nof_sectors);
sched_nr_sim_base sched_tester(cfg, cells_cfg, "Parallel SF Test");
sched_nr_interface::ue_cfg_t uecfg = get_default_ue_cfg(cells_cfg.size());
sched_tester.add_user(0x46, uecfg, 0);
std::array<std::atomic<long>, SRSRAN_MAX_CARRIERS> nano_count{};
for (uint32_t nof_ttis = 0; nof_ttis < max_nof_ttis; ++nof_ttis) {
tti_point tti_rx(nof_ttis % 10240);
tti_point tti_tx = tti_rx + TX_ENB_DELAY;
tasks.start_slot(tti_tx, nof_sectors);
sched_tester.new_slot(tti_tx);
for (uint32_t cc = 0; cc < cells_cfg.size(); ++cc) {
srsran::get_background_workers().push_task([cc, tti_tx, &sched_tester, &tasks, &nano_count]() {
sched_nr_interface::dl_sched_t dl_res;
sched_nr_interface::ul_sched_t ul_res;
auto tp1 = std::chrono::steady_clock::now();
TESTASSERT(sched_tester.get_sched()->get_dl_sched(tti_tx, cc, dl_res) == SRSRAN_SUCCESS);
TESTASSERT(sched_tester.get_sched()->get_ul_sched(tti_tx, cc, ul_res) == SRSRAN_SUCCESS);
auto tp2 = std::chrono::steady_clock::now();
nano_count[cc].fetch_add(std::chrono::duration_cast<std::chrono::nanoseconds>(tp2 - tp1).count(),
std::memory_order_relaxed);
sched_nr_cc_output_res_t out{tti_tx, cc, &dl_res, &ul_res};
sched_tester.update(out);
tasks.finish_cc(tti_tx, dl_res, ul_res);
});
}
}
tasks.wait_task_finish();
tasks.print_results();
double final_avg_usec = 0;
for (uint32_t i = 0; i < nof_sectors; ++i) {
final_avg_usec += nano_count[i];
}
final_avg_usec = final_avg_usec / 1000.0 / max_nof_ttis / nof_sectors;
printf("Total time taken per slot [usec]: %f\n", final_avg_usec);
}
} // namespace srsenb
int main()
{
auto& test_logger = srslog::fetch_basic_logger("TEST");
test_logger.set_level(srslog::basic_levels::info);
test_logger.set_level(srslog::basic_levels::warning);
auto& mac_logger = srslog::fetch_basic_logger("MAC");
mac_logger.set_level(srslog::basic_levels::info);
mac_logger.set_level(srslog::basic_levels::warning);
auto& pool_logger = srslog::fetch_basic_logger("POOL");
pool_logger.set_level(srslog::basic_levels::info);
@ -304,5 +254,4 @@ int main()
srsenb::sched_nr_cfg_serialized_test();
srsenb::sched_nr_cfg_parallel_cc_test();
srsenb::sched_nr_cfg_parallel_sf_test();
}
Loading…
Cancel
Save