unified stack task defer method

master
Francisco Paisana 5 years ago committed by Francisco Paisana
parent d35c9e2b89
commit c09f76ed6c

@ -85,6 +85,7 @@ public:
virtual srslte::timer_handler::unique_timer get_unique_timer() = 0; virtual srslte::timer_handler::unique_timer get_unique_timer() = 0;
virtual srslte::task_multiqueue::queue_handler make_task_queue() = 0; virtual srslte::task_multiqueue::queue_handler make_task_queue() = 0;
virtual void defer_callback(uint32_t duration_ms, std::function<void()> func) = 0; virtual void defer_callback(uint32_t duration_ms, std::function<void()> func) = 0;
virtual void defer_task(srslte::move_task_t func) = 0;
virtual void enqueue_background_task(std::function<void(uint32_t)> task) = 0; virtual void enqueue_background_task(std::function<void(uint32_t)> task) = 0;
virtual void notify_background_task_result(srslte::move_task_t task) = 0; virtual void notify_background_task_result(srslte::move_task_t task) = 0;
}; };

@ -174,20 +174,12 @@ public:
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
while (running) { while (running) {
// Round-robin for all queues if (round_robin_pop_(value)) {
for (const queue_wrapper& q : queues) { if (nof_threads_waiting > 0) {
spin_idx = (spin_idx + 1) % queues.size(); lock.unlock();
if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { queues[spin_idx].cv_full.notify_one();
if (value) {
*value = std::move(queues[spin_idx].front());
}
queues[spin_idx].pop();
if (nof_threads_waiting > 0) {
lock.unlock();
queues[spin_idx].cv_full.notify_one();
}
return spin_idx;
} }
return spin_idx;
} }
nof_threads_waiting++; nof_threads_waiting++;
cv_empty.wait(lock); cv_empty.wait(lock);
@ -197,6 +189,24 @@ public:
return -1; return -1;
} }
int try_pop(myobj* value)
{
std::unique_lock<std::mutex> lock(mutex);
if (running) {
if (round_robin_pop_(value)) {
if (nof_threads_waiting > 0) {
lock.unlock();
queues[spin_idx].cv_full.notify_one();
}
return spin_idx;
}
// didn't find any task
return -1;
}
cv_exit.notify_one();
return -1;
}
bool empty(int qidx) bool empty(int qidx)
{ {
std::lock_guard<std::mutex> lck(mutex); std::lock_guard<std::mutex> lck(mutex);
@ -237,6 +247,22 @@ public:
private: private:
bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } bool is_queue_active_(int qidx) const { return running and queues[qidx].active; }
bool round_robin_pop_(myobj* value)
{
// Round-robin for all queues
for (const queue_wrapper& q : queues) {
spin_idx = (spin_idx + 1) % queues.size();
if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) {
if (value) {
*value = std::move(queues[spin_idx].front());
}
queues[spin_idx].pop();
return true;
}
}
return false;
}
std::mutex mutex; std::mutex mutex;
std::condition_variable cv_empty, cv_exit; std::condition_variable cv_empty, cv_exit;
uint32_t spin_idx = 0; uint32_t spin_idx = 0;

@ -29,6 +29,8 @@ namespace srsue {
class stack_test_dummy : public stack_interface_rrc class stack_test_dummy : public stack_interface_rrc
{ {
public: public:
stack_test_dummy() { stack_queue_id = pending_tasks.add_queue(); }
srslte::timer_handler::unique_timer get_unique_timer() override { return timers.get_unique_timer(); } srslte::timer_handler::unique_timer get_unique_timer() override { return timers.get_unique_timer(); }
void start_cell_search() override {} void start_cell_search() override {}
void start_cell_select(const phy_interface_rrc_lte::phy_cell_t* cell) override {} void start_cell_select(const phy_interface_rrc_lte::phy_cell_t* cell) override {}
@ -40,9 +42,31 @@ public:
{ {
timers.defer_callback(duration_ms, func); timers.defer_callback(duration_ms, func);
} }
void defer_task(srslte::move_task_t task) final { pending_tasks.push(stack_queue_id, std::move(task)); }
// Testing utility functions
void call_on_every_tti(srslte::move_task_t t) { tti_callbacks.push_back(std::move(t)); }
void process_tasks()
{
// Make sure to process any stack pending tasks
srslte::move_task_t task;
while (pending_tasks.try_pop(&task) >= 0) {
task();
}
}
void run_tti()
{
process_tasks();
for (auto& t : tti_callbacks) {
t();
}
timers.step_all();
}
srslte::timer_handler timers{100}; srslte::timer_handler timers{100};
srslte::task_multiqueue pending_tasks; srslte::task_multiqueue pending_tasks;
std::vector<srslte::move_task_t> tti_callbacks;
int stack_queue_id = -1;
}; };
class rlc_dummy_interface : public rlc_interface_mac class rlc_dummy_interface : public rlc_interface_mac

@ -65,16 +65,13 @@ public:
void write_pdu_pcch(unique_byte_buffer_t sdu); void write_pdu_pcch(unique_byte_buffer_t sdu);
private: private:
srsue::rlc_interface_pdcp* rlc = nullptr; srsue::rlc_interface_pdcp* rlc = nullptr;
srsue::rrc_interface_pdcp* rrc = nullptr; srsue::rrc_interface_pdcp* rrc = nullptr;
srsue::gw_interface_pdcp* gw = nullptr; srsue::gw_interface_pdcp* gw = nullptr;
typedef std::map<uint16_t, pdcp_entity_lte*> pdcp_map_t;
typedef std::pair<uint16_t, pdcp_entity_lte*> pdcp_map_pair_t;
srslte::task_handler_interface* task_executor = nullptr; srslte::task_handler_interface* task_executor = nullptr;
srslte::log_ref pdcp_log; srslte::log_ref pdcp_log;
pdcp_map_t pdcp_array, pdcp_array_mrb;
std::map<uint16_t, std::unique_ptr<pdcp_entity_lte> > pdcp_array, pdcp_array_mrb;
// cache valid lcids to be checked from separate thread // cache valid lcids to be checked from separate thread
std::mutex cache_mutex; std::mutex cache_mutex;

@ -36,14 +36,7 @@ pdcp::~pdcp()
valid_lcids_cached.clear(); valid_lcids_cached.clear();
} }
// destroy all remaining entities // destroy all remaining entities
for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); ++it) {
delete (it->second);
}
pdcp_array.clear(); pdcp_array.clear();
for (pdcp_map_t::iterator it = pdcp_array_mrb.begin(); it != pdcp_array_mrb.end(); ++it) {
delete (it->second);
}
pdcp_array_mrb.clear(); pdcp_array_mrb.clear();
} }
@ -58,8 +51,8 @@ void pdcp::stop() {}
void pdcp::reestablish() void pdcp::reestablish()
{ {
for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); ++it) { for (auto& lcid_it : pdcp_array) {
it->second->reestablish(); lcid_it.second->reestablish();
} }
} }
@ -77,11 +70,7 @@ void pdcp::reset()
valid_lcids_cached.clear(); valid_lcids_cached.clear();
} }
// destroy all bearers // destroy all bearers
for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); /* post increment in erase */) { pdcp_array.clear();
it->second->reset();
delete (it->second);
pdcp_array.erase(it++);
}
} }
/******************************************************************************* /*******************************************************************************
@ -114,7 +103,7 @@ void pdcp::write_sdu_mch(uint32_t lcid, unique_byte_buffer_t sdu)
void pdcp::add_bearer(uint32_t lcid, pdcp_config_t cfg) void pdcp::add_bearer(uint32_t lcid, pdcp_config_t cfg)
{ {
if (not valid_lcid(lcid)) { if (not valid_lcid(lcid)) {
if (not pdcp_array.insert(pdcp_map_pair_t(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log))) if (not pdcp_array.insert(std::make_pair(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log)))
.second) { .second) {
pdcp_log->error("Error inserting PDCP entity in to array\n."); pdcp_log->error("Error inserting PDCP entity in to array\n.");
return; return;
@ -137,7 +126,7 @@ void pdcp::add_bearer(uint32_t lcid, pdcp_config_t cfg)
void pdcp::add_bearer_mrb(uint32_t lcid, pdcp_config_t cfg) void pdcp::add_bearer_mrb(uint32_t lcid, pdcp_config_t cfg)
{ {
if (not valid_mch_lcid(lcid)) { if (not valid_mch_lcid(lcid)) {
if (not pdcp_array_mrb.insert(pdcp_map_pair_t(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log))) if (not pdcp_array_mrb.insert(std::make_pair(lcid, new pdcp_entity_lte(rlc, rrc, gw, task_executor, pdcp_log)))
.second) { .second) {
pdcp_log->error("Error inserting PDCP entity in to array\n."); pdcp_log->error("Error inserting PDCP entity in to array\n.");
return; return;
@ -160,8 +149,6 @@ void pdcp::del_bearer(uint32_t lcid)
valid_lcids_cached.erase(lcid); valid_lcids_cached.erase(lcid);
} }
if (valid_lcid(lcid)) { if (valid_lcid(lcid)) {
pdcp_map_t::iterator it = pdcp_array.find(lcid);
delete (it->second);
pdcp_array.erase(it); pdcp_array.erase(it);
pdcp_log->warning("Deleted PDCP bearer %s\n", rrc->get_rb_name(lcid).c_str()); pdcp_log->warning("Deleted PDCP bearer %s\n", rrc->get_rb_name(lcid).c_str());
} else { } else {
@ -174,19 +161,17 @@ void pdcp::change_lcid(uint32_t old_lcid, uint32_t new_lcid)
// make sure old LCID exists and new LCID is still free // make sure old LCID exists and new LCID is still free
if (valid_lcid(old_lcid) && not valid_lcid(new_lcid)) { if (valid_lcid(old_lcid) && not valid_lcid(new_lcid)) {
// insert old PDCP entity into new LCID // insert old PDCP entity into new LCID
pdcp_map_t::iterator it = pdcp_array.find(old_lcid); std::lock_guard<std::mutex> lock(cache_mutex);
pdcp_entity_lte* pdcp_entity = it->second; auto it = pdcp_array.find(old_lcid);
if (not pdcp_array.insert(pdcp_map_pair_t(new_lcid, pdcp_entity)).second) { std::unique_ptr<pdcp_entity_lte> pdcp_entity = std::move(it->second);
if (not pdcp_array.insert(std::make_pair(new_lcid, std::move(pdcp_entity))).second) {
pdcp_log->error("Error inserting PDCP entity into array\n."); pdcp_log->error("Error inserting PDCP entity into array\n.");
return; return;
} }
{
std::lock_guard<std::mutex> lock(cache_mutex);
valid_lcids_cached.erase(old_lcid);
valid_lcids_cached.insert(new_lcid);
}
// erase from old position // erase from old position
pdcp_array.erase(it); pdcp_array.erase(it);
valid_lcids_cached.erase(old_lcid);
valid_lcids_cached.insert(new_lcid);
pdcp_log->warning("Changed LCID of PDCP bearer from %d to %d\n", old_lcid, new_lcid); pdcp_log->warning("Changed LCID of PDCP bearer from %d to %d\n", old_lcid, new_lcid);
} else { } else {
pdcp_log->error( pdcp_log->error(
@ -206,8 +191,8 @@ void pdcp::config_security(uint32_t lcid, as_security_config_t sec_cfg)
void pdcp::config_security_all(as_security_config_t sec_cfg) void pdcp::config_security_all(as_security_config_t sec_cfg)
{ {
for (pdcp_map_t::iterator it = pdcp_array.begin(); it != pdcp_array.end(); ++it) { for (auto& it : pdcp_array) {
it->second->config_security(sec_cfg); it.second->config_security(sec_cfg);
} }
} }
@ -284,11 +269,7 @@ bool pdcp::valid_lcid(uint32_t lcid)
return false; return false;
} }
if (pdcp_array.find(lcid) == pdcp_array.end()) { return pdcp_array.find(lcid) != pdcp_array.end();
return false;
}
return true;
} }
bool pdcp::valid_mch_lcid(uint32_t lcid) bool pdcp::valid_mch_lcid(uint32_t lcid)
@ -298,11 +279,7 @@ bool pdcp::valid_mch_lcid(uint32_t lcid)
return false; return false;
} }
if (pdcp_array_mrb.find(lcid) == pdcp_array_mrb.end()) { return pdcp_array_mrb.find(lcid) != pdcp_array_mrb.end();
return false;
}
return true;
} }
} // namespace srslte } // namespace srslte

@ -36,7 +36,10 @@ pdcp_entity_lte::pdcp_entity_lte(srsue::rlc_interface_pdcp* rlc_,
{ {
} }
pdcp_entity_lte::~pdcp_entity_lte() {} pdcp_entity_lte::~pdcp_entity_lte()
{
reset();
}
void pdcp_entity_lte::init(uint32_t lcid_, pdcp_config_t cfg_) void pdcp_entity_lte::init(uint32_t lcid_, pdcp_config_t cfg_)
{ {
@ -87,10 +90,10 @@ void pdcp_entity_lte::reestablish()
// Used to stop/pause the entity (called on RRC conn release) // Used to stop/pause the entity (called on RRC conn release)
void pdcp_entity_lte::reset() void pdcp_entity_lte::reset()
{ {
active = false; if (active and log) {
if (log) {
log->debug("Reset %s\n", rrc->get_rb_name(lcid).c_str()); log->debug("Reset %s\n", rrc->get_rb_name(lcid).c_str());
} }
active = false;
} }
// GW/RRC interface // GW/RRC interface

@ -117,6 +117,7 @@ public:
void defer_callback(uint32_t duration_ms, std::function<void()> func) final; void defer_callback(uint32_t duration_ms, std::function<void()> func) final;
void enqueue_background_task(std::function<void(uint32_t)> task) final; void enqueue_background_task(std::function<void(uint32_t)> task) final;
void notify_background_task_result(srslte::move_task_t task) final; void notify_background_task_result(srslte::move_task_t task) final;
void defer_task(srslte::move_task_t task) final;
private: private:
static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD
@ -164,7 +165,9 @@ private:
// state // state
bool started = false; bool started = false;
srslte::task_multiqueue pending_tasks; srslte::task_multiqueue pending_tasks;
int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1; int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1,
stack_queue_id = -1;
std::vector<srslte::move_task_t> deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking
srslte::block_queue<stack_metrics_t> pending_stack_metrics; srslte::block_queue<stack_metrics_t> pending_stack_metrics;
}; };

@ -35,11 +35,12 @@ enb_stack_lte::enb_stack_lte(srslte::logger* logger_) :
pdcp(this, "PDCP"), pdcp(this, "PDCP"),
thread("STACK") thread("STACK")
{ {
enb_queue_id = pending_tasks.add_queue(); enb_queue_id = pending_tasks.add_queue();
sync_queue_id = pending_tasks.add_queue(); sync_queue_id = pending_tasks.add_queue();
mme_queue_id = pending_tasks.add_queue(); mme_queue_id = pending_tasks.add_queue();
gtpu_queue_id = pending_tasks.add_queue(); gtpu_queue_id = pending_tasks.add_queue();
mac_queue_id = pending_tasks.add_queue(); mac_queue_id = pending_tasks.add_queue();
stack_queue_id = pending_tasks.add_queue();
pool = byte_buffer_pool::get_instance(); pool = byte_buffer_pool::get_instance();
} }
@ -135,6 +136,10 @@ void enb_stack_lte::tti_clock()
void enb_stack_lte::tti_clock_impl() void enb_stack_lte::tti_clock_impl()
{ {
for (auto& t : deferred_stack_tasks) {
t();
}
deferred_stack_tasks.clear();
timers.step_all(); timers.step_all();
rrc.tti_clock(); rrc.tti_clock();
} }
@ -276,4 +281,9 @@ void enb_stack_lte::notify_background_task_result(srslte::move_task_t task)
task(); task();
} }
void enb_stack_lte::defer_task(srslte::move_task_t task)
{
deferred_stack_tasks.push_back(std::move(task));
}
} // namespace srsenb } // namespace srsenb

@ -127,6 +127,7 @@ public:
void enqueue_background_task(std::function<void(uint32_t)> f) final; void enqueue_background_task(std::function<void(uint32_t)> f) final;
void notify_background_task_result(srslte::move_task_t task) final; void notify_background_task_result(srslte::move_task_t task) final;
void defer_callback(uint32_t duration_ms, std::function<void()> func) final; void defer_callback(uint32_t duration_ms, std::function<void()> func) final;
void defer_task(srslte::move_task_t task) final;
private: private:
void run_thread() final; void run_thread() final;
@ -156,16 +157,6 @@ private:
srslte::log_ref nas_log{"NAS"}; srslte::log_ref nas_log{"NAS"};
srslte::log_ref pool_log{"POOL"}; srslte::log_ref pool_log{"POOL"};
// stack components
srsue::mac mac;
srslte::mac_pcap mac_pcap;
srslte::nas_pcap nas_pcap;
srslte::rlc rlc;
srslte::pdcp pdcp;
srsue::rrc rrc;
srsue::nas nas;
std::unique_ptr<usim_base> usim;
// RAT-specific interfaces // RAT-specific interfaces
phy_interface_stack_lte* phy = nullptr; phy_interface_stack_lte* phy = nullptr;
gw_interface_stack* gw = nullptr; gw_interface_stack* gw = nullptr;
@ -173,12 +164,23 @@ private:
// Thread // Thread
static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD
srslte::task_multiqueue pending_tasks; srslte::task_multiqueue pending_tasks;
int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1; int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, stack_queue_id = -1, background_queue_id = -1;
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks
std::vector<srslte::move_task_t> deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking
srslte::block_queue<stack_metrics_t> pending_stack_metrics; srslte::block_queue<stack_metrics_t> pending_stack_metrics;
// TTI stats // TTI stats
srslte::tprof<srslte::sliding_window_stats_ms> tti_tprof; srslte::tprof<srslte::sliding_window_stats_ms> tti_tprof;
// stack components
srsue::mac mac;
srslte::mac_pcap mac_pcap;
srslte::nas_pcap nas_pcap;
srslte::rlc rlc;
srslte::pdcp pdcp;
srsue::rrc rrc;
srsue::nas nas;
std::unique_ptr<usim_base> usim;
}; };
} // namespace srsue } // namespace srsue

@ -50,7 +50,7 @@ ue_stack_lte::ue_stack_lte() :
ue_queue_id = pending_tasks.add_queue(); ue_queue_id = pending_tasks.add_queue();
sync_queue_id = pending_tasks.add_queue(); sync_queue_id = pending_tasks.add_queue();
gw_queue_id = pending_tasks.add_queue(); gw_queue_id = pending_tasks.add_queue();
mac_queue_id = pending_tasks.add_queue(); stack_queue_id = pending_tasks.add_queue();
background_queue_id = pending_tasks.add_queue(); background_queue_id = pending_tasks.add_queue();
background_tasks.start(); background_tasks.start();
@ -288,6 +288,12 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump)
} }
current_tti = tti_point{tti}; current_tti = tti_point{tti};
// Perform pending stack deferred tasks
for (auto& task : deferred_stack_tasks) {
task();
}
deferred_stack_tasks.clear();
// perform tasks for the received TTI range // perform tasks for the received TTI range
for (uint32_t i = 0; i < tti_jump; ++i) { for (uint32_t i = 0; i < tti_jump; ++i) {
uint32_t next_tti = TTI_SUB(tti, (tti_jump - i - 1)); uint32_t next_tti = TTI_SUB(tti, (tti_jump - i - 1));
@ -332,6 +338,11 @@ void ue_stack_lte::defer_callback(uint32_t duration_ms, std::function<void()> fu
timers.defer_callback(duration_ms, func); timers.defer_callback(duration_ms, func);
} }
void ue_stack_lte::defer_task(srslte::move_task_t task)
{
deferred_stack_tasks.push_back(std::move(task));
}
/******************** /********************
* RRC Interface * RRC Interface
*******************/ *******************/

@ -212,8 +212,8 @@ public:
log->step(tti); log->step(tti);
log->debug("Start TTI\n"); log->debug("Start TTI\n");
// Make sure to step SS timers // Make sure to step SS
step_timer(); step_stack();
// inform UE about new TTI // inform UE about new TTI
ue->set_current_tti(tti); ue->set_current_tti(tti);
@ -907,7 +907,7 @@ public:
ue->new_tb(dl_grant, (const uint8_t*)pdu->msg); ue->new_tb(dl_grant, (const uint8_t*)pdu->msg);
} }
void step_timer() { stack.timers.step_all(); } void step_stack() { stack.run_tti(); }
void add_srb(const ttcn3_helpers::timing_info_t timing, const uint32_t lcid, const pdcp_config_t pdcp_config) void add_srb(const ttcn3_helpers::timing_info_t timing, const uint32_t lcid, const pdcp_config_t pdcp_config)
{ {

Loading…
Cancel
Save