created task scheduler class to deal with timers, thread pool, multiqueue, internal tasks

master
Francisco Paisana 5 years ago
parent 89b6e0f714
commit 4f5e65781f

@ -93,6 +93,7 @@ class task_handler_interface
public: public:
virtual srslte::timer_handler::unique_timer get_unique_timer() = 0; virtual srslte::timer_handler::unique_timer get_unique_timer() = 0;
virtual srslte::task_multiqueue::queue_handler make_task_queue() = 0; virtual srslte::task_multiqueue::queue_handler make_task_queue() = 0;
virtual srslte::task_multiqueue::queue_handler make_task_queue(uint32_t queue_size) = 0;
virtual void defer_callback(uint32_t duration_ms, std::function<void()> func) = 0; virtual void defer_callback(uint32_t duration_ms, std::function<void()> func) = 0;
virtual void defer_task(srslte::move_task_t func) = 0; virtual void defer_task(srslte::move_task_t func) = 0;
virtual void enqueue_background_task(std::function<void(uint32_t)> task) = 0; virtual void enqueue_background_task(std::function<void(uint32_t)> task) = 0;

@ -102,6 +102,7 @@ public:
} }
bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } bool try_push(const myobj& value) { return parent->try_push(queue_id, value); }
std::pair<bool, myobj> try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } std::pair<bool, myobj> try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); }
size_t size() { return parent->size(queue_id); }
private: private:
multiqueue_handler<myobj>* parent = nullptr; multiqueue_handler<myobj>* parent = nullptr;
@ -293,6 +294,7 @@ public:
} }
queue_handler get_queue_handler() { return {this, add_queue()}; } queue_handler get_queue_handler() { return {this, add_queue()}; }
queue_handler get_queue_handler(uint32_t size) { return {this, add_queue(size)}; }
private: private:
bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } bool is_queue_active_(int qidx) const { return running and queues[qidx].active; }

@ -0,0 +1,161 @@
/*
* Copyright 2013-2020 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#ifndef SRSLTE_TASK_SCHEDULER_H
#define SRSLTE_TASK_SCHEDULER_H
#include "interfaces_common.h"
#include "multiqueue.h"
#include "thread_pool.h"
namespace srslte {
class task_scheduler : public srslte::task_handler_interface
{
public:
explicit task_scheduler(uint32_t default_extern_tasks_size = 512,
uint32_t nof_background_threads = 0,
uint32_t nof_timers_prealloc = 100) :
external_tasks{default_extern_tasks_size},
timers{nof_timers_prealloc},
background_tasks{nof_background_threads}
{
background_queue_id = external_tasks.add_queue();
// Start background thread
if (background_tasks.nof_workers() > 0) {
background_tasks.start();
}
}
srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); }
//! Creates new queue for tasks coming from external thread
srslte::task_multiqueue::queue_handler make_task_queue() final { return external_tasks.get_queue_handler(); }
srslte::task_multiqueue::queue_handler make_task_queue(uint32_t size) final
{
return external_tasks.get_queue_handler(size);
}
//! Delays a task processing by duration_ms
void defer_callback(uint32_t duration_ms, std::function<void()> func) final
{
timers.defer_callback(duration_ms, func);
}
//! Enqueues internal task to be run in next tic
void defer_task(srslte::move_task_t func) final { internal_tasks.push_back(std::move(func)); }
//! Delegates a task to a thread pool that runs in the background
void enqueue_background_task(std::function<void(uint32_t)> f) final
{
if (background_tasks.nof_workers() > 0) {
background_tasks.push_task(std::move(f));
} else {
external_tasks.push(background_queue_id,
std::bind([](const std::function<void(uint32_t)>& task) { task(-1); }, std::move(f)));
}
}
//! Defer the handling of the result of a background task to next tic
void notify_background_task_result(srslte::move_task_t task) final
{
// run the notification in next tic
external_tasks.push(background_queue_id, std::move(task));
}
//! Updates timers, and run any pending internal tasks.
// CAUTION: Should be called in main thread
void tic()
{
timers.step_all();
run_all_internal_tasks();
}
//! Processes the next task in the multiqueue.
// CAUTION: This is a blocking call
bool run_next_external_task()
{
srslte::move_task_t task{};
if (external_tasks.wait_pop(&task) >= 0) {
task();
return true;
}
return false;
}
//! Processes the next task in the multiqueue if it exists.
bool try_run_next_external_task()
{
srslte::move_task_t task{};
if (external_tasks.try_pop(&task) >= 0) {
task();
return true;
}
return false;
}
srslte::timer_handler* get_timer_handler() { return &timers; }
private:
void run_all_internal_tasks()
{
// Perform pending stack deferred tasks
// Note: Keep it indexed-based, bc a task may enqueue another task, which may cause vector reallocation,
// and iterator invalidation
for (size_t i = 0; i < internal_tasks.size(); ++i) {
internal_tasks[i]();
}
internal_tasks.clear();
}
int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background
srslte::task_multiqueue external_tasks;
srslte::timer_handler timers;
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks
std::vector<srslte::move_task_t> internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking
};
//! Handle to provide to classes/functions running within main thread
class task_sched_handle
{
public:
task_sched_handle(task_scheduler* sched_) : sched(sched_) {}
srslte::timer_handler::unique_timer get_unique_timer() { return sched->get_unique_timer(); }
void enqueue_background_task(std::function<void(uint32_t)> f) { sched->enqueue_background_task(std::move(f)); }
void notify_background_task_result(srslte::move_task_t task)
{
sched->notify_background_task_result(std::move(task));
}
void defer_callback(uint32_t duration_ms, std::function<void()> func)
{
sched->defer_callback(duration_ms, std::move(func));
}
void defer_task(srslte::move_task_t func) { sched->defer_task(std::move(func)); }
private:
task_scheduler* sched;
};
} // namespace srslte
#endif // SRSLTE_TASK_SCHEDULER_H

@ -106,6 +106,7 @@ public:
void push_task(const task_t& task); void push_task(const task_t& task);
void push_task(task_t&& task); void push_task(task_t&& task);
uint32_t nof_pending_tasks(); uint32_t nof_pending_tasks();
size_t nof_workers() const { return workers.size(); }
private: private:
class worker_t : public thread class worker_t : public thread

@ -22,6 +22,7 @@
#ifndef SRSUE_DUMMY_CLASSES_H #ifndef SRSUE_DUMMY_CLASSES_H
#define SRSUE_DUMMY_CLASSES_H #define SRSUE_DUMMY_CLASSES_H
#include "srslte/common/task_scheduler.h"
#include "srslte/interfaces/ue_interfaces.h" #include "srslte/interfaces/ue_interfaces.h"
namespace srsue { namespace srsue {
@ -29,44 +30,37 @@ namespace srsue {
class stack_test_dummy : public stack_interface_rrc class stack_test_dummy : public stack_interface_rrc
{ {
public: public:
stack_test_dummy() { stack_queue_id = pending_tasks.add_queue(); } stack_test_dummy() {}
srslte::timer_handler::unique_timer get_unique_timer() override { return timers.get_unique_timer(); } srslte::timer_handler::unique_timer get_unique_timer() override { return task_sched.get_unique_timer(); }
void start_cell_search() override {} void start_cell_search() override {}
void start_cell_select(const phy_interface_rrc_lte::phy_cell_t* cell) override {} void start_cell_select(const phy_interface_rrc_lte::phy_cell_t* cell) override {}
srslte::tti_point get_current_tti() override { return srslte::tti_point{timers.get_cur_time() % 10240}; } srslte::tti_point get_current_tti() override
srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } {
return srslte::tti_point{task_sched.get_timer_handler()->get_cur_time() % 10240};
}
srslte::task_multiqueue::queue_handler make_task_queue() final { return task_sched.make_task_queue(); }
srslte::task_multiqueue::queue_handler make_task_queue(uint32_t len) final { return task_sched.make_task_queue(len); }
void enqueue_background_task(std::function<void(uint32_t)> f) override { f(0); } void enqueue_background_task(std::function<void(uint32_t)> f) override { f(0); }
void notify_background_task_result(srslte::move_task_t task) override { task(); } void notify_background_task_result(srslte::move_task_t task) override { task(); }
void defer_callback(uint32_t duration_ms, std::function<void()> func) final void defer_callback(uint32_t duration_ms, std::function<void()> func) final
{ {
timers.defer_callback(duration_ms, func); task_sched.defer_callback(duration_ms, std::move(func));
} }
void defer_task(srslte::move_task_t task) final { pending_tasks.push(stack_queue_id, std::move(task)); } void defer_task(srslte::move_task_t task) final { task_sched.defer_task(std::move(task)); }
// Testing utility functions // Testing utility functions
void call_on_every_tti(srslte::move_task_t t) { tti_callbacks.push_back(std::move(t)); }
void process_tasks()
{
// Make sure to process any stack pending tasks
srslte::move_task_t task;
while (pending_tasks.try_pop(&task) >= 0) {
task();
}
}
void run_tti() void run_tti()
{ {
process_tasks(); // update clock and run internal tasks
for (auto& t : tti_callbacks) { task_sched.tic();
t();
// Runs all pending external tasks
while (task_sched.try_run_next_external_task()) {
} }
timers.step_all();
} }
srslte::timer_handler timers{100}; srslte::task_scheduler task_sched{512, 0, 100};
srslte::task_multiqueue pending_tasks;
std::vector<srslte::move_task_t> tti_callbacks;
int stack_queue_id = -1;
}; };
class rlc_dummy_interface : public rlc_interface_mac class rlc_dummy_interface : public rlc_interface_mac

@ -98,6 +98,10 @@ add_executable(expected_test expected_test.cc)
target_link_libraries(expected_test srslte_common) target_link_libraries(expected_test srslte_common)
add_test(expected_test expected_test) add_test(expected_test expected_test)
add_executable(task_scheduler_test task_scheduler_test.cc)
target_link_libraries(task_scheduler_test srslte_common)
add_test(task_scheduler_test task_scheduler_test)
if(ENABLE_5GNR) if(ENABLE_5GNR)
add_executable(pnf_dummy pnf_dummy.cc) add_executable(pnf_dummy pnf_dummy.cc)
target_link_libraries(pnf_dummy srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) target_link_libraries(pnf_dummy srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES})

@ -0,0 +1,85 @@
/*
* Copyright 2013-2020 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include "srslte/common/task_scheduler.h"
#include "srslte/common/test_common.h"
enum class task_result { null, internal, external, timer };
int test_task_scheduler_no_pool()
{
srslte::task_scheduler task_sched{5, 0};
task_result state = task_result::null;
// TEST: deferring task does not run the task until the next tic
task_sched.defer_task([&state]() { state = task_result::internal; });
TESTASSERT(state == task_result::null);
task_sched.tic();
TESTASSERT(state == task_result::internal);
// TEST: check delaying of task
state = task_result::null;
int dur = 5;
task_sched.defer_callback(dur, [&state]() { state = task_result::timer; });
for (int i = 0; i < dur; ++i) {
TESTASSERT(state == task_result::null);
task_sched.tic();
}
TESTASSERT(state == task_result::timer);
// TEST: background task is run, despite there are no pool workers
state = task_result::null;
task_sched.enqueue_background_task([&task_sched, &state](uint32_t worker_id) {
task_sched.notify_background_task_result([&state]() { state = task_result::external; });
});
TESTASSERT(state == task_result::null);
task_sched.tic();
TESTASSERT(state == task_result::null);
task_sched.run_next_external_task(); // runs background task
TESTASSERT(state == task_result::null);
task_sched.run_next_external_task(); // runs notification
TESTASSERT(state == task_result::external);
return SRSLTE_SUCCESS;
}
int test_task_scheduler_with_pool()
{
srslte::task_scheduler task_sched{5, 2};
task_result state = task_result::null;
task_sched.enqueue_background_task([&task_sched, &state](uint32_t worker_id) {
task_sched.notify_background_task_result([&state]() { state = task_result::external; });
});
TESTASSERT(state == task_result::null);
task_sched.tic();
TESTASSERT(state == task_result::null);
task_sched.run_next_external_task(); // waits and runs notification
TESTASSERT(state == task_result::external);
return SRSLTE_SUCCESS;
}
int main()
{
TESTASSERT(test_task_scheduler_no_pool() == SRSLTE_SUCCESS);
TESTASSERT(test_task_scheduler_with_pool() == SRSLTE_SUCCESS);
}

@ -43,10 +43,10 @@ int test_rx(std::vector<pdcp_test_event_t> events,
srslte::pdcp_discard_timer_t::infinity}; srslte::pdcp_discard_timer_t::infinity};
pdcp_lte_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log); pdcp_lte_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log);
srslte::pdcp_entity_lte* pdcp_rx = &pdcp_hlp_rx.pdcp; srslte::pdcp_entity_lte* pdcp_rx = &pdcp_hlp_rx.pdcp;
gw_dummy* gw_rx = &pdcp_hlp_rx.gw; gw_dummy* gw_rx = &pdcp_hlp_rx.gw;
rrc_dummy* rrc_rx = &pdcp_hlp_rx.rrc; rrc_dummy* rrc_rx = &pdcp_hlp_rx.rrc;
srslte::timer_handler* timers_rx = &pdcp_hlp_rx.stack.timers; srsue::stack_test_dummy* stack = &pdcp_hlp_rx.stack;
pdcp_hlp_rx.set_pdcp_initial_state(init_state); pdcp_hlp_rx.set_pdcp_initial_state(init_state);
// Generate test message and encript/decript SDU. // Generate test message and encript/decript SDU.
@ -55,7 +55,7 @@ int test_rx(std::vector<pdcp_test_event_t> events,
// Decript and integrity check the PDU // Decript and integrity check the PDU
pdcp_rx->write_pdu(std::move(event.pkt)); pdcp_rx->write_pdu(std::move(event.pkt));
for (uint32_t i = 0; i < event.ticks; ++i) { for (uint32_t i = 0; i < event.ticks; ++i) {
timers_rx->step_all(); stack->run_tti();
} }
} }

@ -38,10 +38,10 @@ int test_tx_sdu_discard(const pdcp_initial_state& init_state,
srslte::pdcp_t_reordering_t::ms500, srslte::pdcp_t_reordering_t::ms500,
discard_timeout}; discard_timeout};
pdcp_nr_test_helper pdcp_hlp(cfg, sec_cfg, log); pdcp_nr_test_helper pdcp_hlp(cfg, sec_cfg, log);
srslte::pdcp_entity_nr* pdcp = &pdcp_hlp.pdcp; srslte::pdcp_entity_nr* pdcp = &pdcp_hlp.pdcp;
rlc_dummy* rlc = &pdcp_hlp.rlc; rlc_dummy* rlc = &pdcp_hlp.rlc;
srslte::timer_handler* timers = &pdcp_hlp.stack.timers; srsue::stack_test_dummy* stack = &pdcp_hlp.stack;
pdcp_hlp.set_pdcp_initial_state(init_state); pdcp_hlp.set_pdcp_initial_state(init_state);
@ -51,7 +51,7 @@ int test_tx_sdu_discard(const pdcp_initial_state& init_state,
pdcp->write_sdu(std::move(sdu), true); pdcp->write_sdu(std::move(sdu), true);
for (uint32_t i = 0; i < static_cast<uint32_t>(cfg.discard_timer) - 1; ++i) { for (uint32_t i = 0; i < static_cast<uint32_t>(cfg.discard_timer) - 1; ++i) {
timers->step_all(); stack->run_tti();
} }
TESTASSERT(rlc->discard_count == 0); TESTASSERT(rlc->discard_count == 0);
@ -63,7 +63,7 @@ int test_tx_sdu_discard(const pdcp_initial_state& init_state,
} }
// Last timer step // Last timer step
timers->step_all(); stack->run_tti();
// Check if RLC was notified of SDU discard // Check if RLC was notified of SDU discard
if (imediate_notify) { if (imediate_notify) {

@ -42,10 +42,10 @@ int test_rx(std::vector<pdcp_test_event_t> events,
srslte::pdcp_t_reordering_t::ms500, srslte::pdcp_t_reordering_t::ms500,
srslte::pdcp_discard_timer_t::infinity}; srslte::pdcp_discard_timer_t::infinity};
pdcp_nr_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log); pdcp_nr_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log);
srslte::pdcp_entity_nr* pdcp_rx = &pdcp_hlp_rx.pdcp; srslte::pdcp_entity_nr* pdcp_rx = &pdcp_hlp_rx.pdcp;
gw_dummy* gw_rx = &pdcp_hlp_rx.gw; gw_dummy* gw_rx = &pdcp_hlp_rx.gw;
srslte::timer_handler* timers_rx = &pdcp_hlp_rx.stack.timers; srsue::stack_test_dummy* stack = &pdcp_hlp_rx.stack;
pdcp_hlp_rx.set_pdcp_initial_state(init_state); pdcp_hlp_rx.set_pdcp_initial_state(init_state);
// Generate test message and encript/decript SDU. // Generate test message and encript/decript SDU.
@ -54,7 +54,7 @@ int test_rx(std::vector<pdcp_test_event_t> events,
// Decript and integrity check the PDU // Decript and integrity check the PDU
pdcp_rx->write_pdu(std::move(event.pkt)); pdcp_rx->write_pdu(std::move(event.pkt));
for (uint32_t i = 0; i < event.ticks; ++i) { for (uint32_t i = 0; i < event.ticks; ++i) {
timers_rx->step_all(); stack->run_tti();
} }
} }

@ -112,6 +112,7 @@ public:
/* Stack-MAC interface */ /* Stack-MAC interface */
srslte::timer_handler::unique_timer get_unique_timer() final; srslte::timer_handler::unique_timer get_unique_timer() final;
srslte::task_multiqueue::queue_handler make_task_queue() final; srslte::task_multiqueue::queue_handler make_task_queue() final;
srslte::task_multiqueue::queue_handler make_task_queue(uint32_t qsize) final;
void defer_callback(uint32_t duration_ms, std::function<void()> func) final; void defer_callback(uint32_t duration_ms, std::function<void()> func) final;
void enqueue_background_task(std::function<void(uint32_t)> task) final; void enqueue_background_task(std::function<void(uint32_t)> task) final;
void notify_background_task_result(srslte::move_task_t task) final; void notify_background_task_result(srslte::move_task_t task) final;

@ -82,10 +82,14 @@ public:
// Task Handling interface // Task Handling interface
srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); }
srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); }
void enqueue_background_task(std::function<void(uint32_t)> f) final; srslte::task_multiqueue::queue_handler make_task_queue(uint32_t qsize) final
void notify_background_task_result(srslte::move_task_t task) final; {
void defer_callback(uint32_t duration_ms, std::function<void()> func) final; return pending_tasks.get_queue_handler(qsize);
void defer_task(srslte::move_task_t task) final; }
void enqueue_background_task(std::function<void(uint32_t)> f) final;
void notify_background_task_result(srslte::move_task_t task) final;
void defer_callback(uint32_t duration_ms, std::function<void()> func) final;
void defer_task(srslte::move_task_t task) final;
private: private:
void run_thread() final; void run_thread() final;

@ -266,6 +266,11 @@ srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue()
return pending_tasks.get_queue_handler(); return pending_tasks.get_queue_handler();
} }
srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue(uint32_t qsize)
{
return pending_tasks.get_queue_handler(qsize);
}
void enb_stack_lte::defer_callback(uint32_t duration_ms, std::function<void()> func) void enb_stack_lte::defer_callback(uint32_t duration_ms, std::function<void()> func)
{ {
timers.defer_callback(duration_ms, func); timers.defer_callback(duration_ms, func);

@ -42,6 +42,7 @@
#include "srslte/common/buffer_pool.h" #include "srslte/common/buffer_pool.h"
#include "srslte/common/log_filter.h" #include "srslte/common/log_filter.h"
#include "srslte/common/multiqueue.h" #include "srslte/common/multiqueue.h"
#include "srslte/common/task_scheduler.h"
#include "srslte/common/thread_pool.h" #include "srslte/common/thread_pool.h"
#include "srslte/interfaces/ue_interfaces.h" #include "srslte/interfaces/ue_interfaces.h"
@ -125,12 +126,16 @@ public:
tti_point get_current_tti() final { return current_tti; } tti_point get_current_tti() final { return current_tti; }
// Task Handling interface // Task Handling interface
srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } srslte::timer_handler::unique_timer get_unique_timer() final { return task_sched.get_unique_timer(); }
srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } srslte::task_multiqueue::queue_handler make_task_queue() final { return task_sched.make_task_queue(); }
void enqueue_background_task(std::function<void(uint32_t)> f) final; srslte::task_multiqueue::queue_handler make_task_queue(uint32_t queue_size) final
void notify_background_task_result(srslte::move_task_t task) final; {
void defer_callback(uint32_t duration_ms, std::function<void()> func) final; return task_sched.make_task_queue(queue_size);
void defer_task(srslte::move_task_t task) final; }
void enqueue_background_task(std::function<void(uint32_t)> f) final;
void notify_background_task_result(srslte::move_task_t task) final;
void defer_callback(uint32_t duration_ms, std::function<void()> func) final;
void defer_task(srslte::move_task_t task) final;
private: private:
void run_thread() final; void run_thread() final;
@ -146,9 +151,6 @@ private:
srslte::tti_point current_tti; srslte::tti_point current_tti;
// timers
srslte::timer_handler timers;
// UE stack logging // UE stack logging
srslte::logger* logger = nullptr; srslte::logger* logger = nullptr;
srslte::log_ref stack_log{"STCK"}; ///< our own log filter srslte::log_ref stack_log{"STCK"}; ///< our own log filter
@ -165,12 +167,10 @@ private:
gw_interface_stack* gw = nullptr; gw_interface_stack* gw = nullptr;
// Thread // Thread
static const int STACK_MAIN_THREAD_PRIO = 4; // Next lower priority after PHY workers static const int STACK_MAIN_THREAD_PRIO = 4; // Next lower priority after PHY workers
srslte::task_multiqueue pending_tasks; srslte::block_queue<stack_metrics_t> pending_stack_metrics;
int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, stack_queue_id = -1, background_queue_id = -1; task_scheduler task_sched;
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks srslte::task_multiqueue::queue_handler sync_task_queue, ue_task_queue, gw_queue_id;
std::vector<srslte::move_task_t> deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking
srslte::block_queue<stack_metrics_t> pending_stack_metrics;
// TTI stats // TTI stats
srslte::tprof<srslte::sliding_window_stats_ms> tti_tprof; srslte::tprof<srslte::sliding_window_stats_ms> tti_tprof;

@ -99,10 +99,14 @@ public:
// Task Handling interface // Task Handling interface
srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); }
srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); }
void enqueue_background_task(std::function<void(uint32_t)> f) final; srslte::task_multiqueue::queue_handler make_task_queue(uint32_t qsize) final
void notify_background_task_result(srslte::move_task_t task) final; {
void defer_callback(uint32_t duration_ms, std::function<void()> func) final; return pending_tasks.get_queue_handler(qsize);
void defer_task(srslte::move_task_t task) final; }
void enqueue_background_task(std::function<void(uint32_t)> f) final;
void notify_background_task_result(srslte::move_task_t task) final;
void defer_callback(uint32_t duration_ms, std::function<void()> func) final;
void defer_task(srslte::move_task_t task) final;
private: private:
void run_thread() final; void run_thread() final;

@ -32,7 +32,6 @@ using namespace srslte;
namespace srsue { namespace srsue {
ue_stack_lte::ue_stack_lte() : ue_stack_lte::ue_stack_lte() :
timers(64),
running(false), running(false),
args(), args(),
logger(nullptr), logger(nullptr),
@ -44,17 +43,12 @@ ue_stack_lte::ue_stack_lte() :
pdcp(this, "PDCP"), pdcp(this, "PDCP"),
nas(this), nas(this),
thread("STACK"), thread("STACK"),
pending_tasks(512), task_sched(512, 2, 64),
background_tasks(2),
tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD) tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD)
{ {
ue_queue_id = pending_tasks.add_queue(); ue_task_queue = task_sched.make_task_queue();
gw_queue_id = pending_tasks.add_queue(); gw_queue_id = task_sched.make_task_queue();
stack_queue_id = pending_tasks.add_queue();
background_queue_id = pending_tasks.add_queue();
// sync_queue is added in init() // sync_queue is added in init()
background_tasks.start();
} }
ue_stack_lte::~ue_stack_lte() ue_stack_lte::~ue_stack_lte()
@ -126,10 +120,10 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_)
} }
// add sync queue // add sync queue
sync_queue_id = pending_tasks.add_queue(args.sync_queue_size); sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
mac.init(phy, &rlc, &rrc, this); mac.init(phy, &rlc, &rrc, this);
rlc.init(&pdcp, &rrc, &timers, 0 /* RB_ID_SRB0 */); rlc.init(&pdcp, &rrc, task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */);
pdcp.init(&rlc, &rrc, gw); pdcp.init(&rlc, &rrc, gw);
nas.init(usim.get(), &rrc, gw, args.nas); nas.init(usim.get(), &rrc, gw, args.nas);
rrc.init(phy, &mac, &rlc, &pdcp, &nas, usim.get(), gw, args.rrc); rrc.init(phy, &mac, &rlc, &pdcp, &nas, usim.get(), gw, args.rrc);
@ -143,7 +137,7 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_)
void ue_stack_lte::stop() void ue_stack_lte::stop()
{ {
if (running) { if (running) {
pending_tasks.try_push(ue_queue_id, [this]() { stop_impl(); }); ue_task_queue.try_push([this]() { stop_impl(); });
wait_thread_finish(); wait_thread_finish();
} }
} }
@ -171,8 +165,7 @@ void ue_stack_lte::stop_impl()
bool ue_stack_lte::switch_on() bool ue_stack_lte::switch_on()
{ {
if (running) { if (running) {
pending_tasks.try_push(ue_queue_id, ue_task_queue.try_push([this]() { nas.start_attach_proc(nullptr, srslte::establishment_cause_t::mo_sig); });
[this]() { nas.start_attach_proc(nullptr, srslte::establishment_cause_t::mo_sig); });
return true; return true;
} }
return false; return false;
@ -214,7 +207,7 @@ bool ue_stack_lte::disable_data()
bool ue_stack_lte::get_metrics(stack_metrics_t* metrics) bool ue_stack_lte::get_metrics(stack_metrics_t* metrics)
{ {
// use stack thread to query metrics // use stack thread to query metrics
pending_tasks.try_push(ue_queue_id, [this]() { ue_task_queue.try_push([this]() {
stack_metrics_t metrics{}; stack_metrics_t metrics{};
mac.get_metrics(metrics.mac); mac.get_metrics(metrics.mac);
rlc.get_metrics(metrics.rlc); rlc.get_metrics(metrics.rlc);
@ -230,10 +223,7 @@ bool ue_stack_lte::get_metrics(stack_metrics_t* metrics)
void ue_stack_lte::run_thread() void ue_stack_lte::run_thread()
{ {
while (running) { while (running) {
srslte::move_task_t task{}; task_sched.run_next_external_task();
if (pending_tasks.wait_pop(&task) >= 0) {
task();
}
} }
} }
@ -256,7 +246,7 @@ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bo
auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t& sdu) { auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t& sdu) {
pdcp.write_sdu(lcid, std::move(sdu), blocking); pdcp.write_sdu(lcid, std::move(sdu), blocking);
}; };
bool ret = pending_tasks.try_push(gw_queue_id, std::bind(task, std::move(sdu))).first; bool ret = gw_queue_id.try_push(std::bind(task, std::move(sdu))).first;
if (not ret) { if (not ret) {
pdcp_log->warning("GW SDU with lcid=%d was discarded.\n", lcid); pdcp_log->warning("GW SDU with lcid=%d was discarded.\n", lcid);
} }
@ -271,17 +261,17 @@ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bo
*/ */
void ue_stack_lte::in_sync() void ue_stack_lte::in_sync()
{ {
pending_tasks.push(sync_queue_id, [this]() { rrc.in_sync(); }); sync_task_queue.push([this]() { rrc.in_sync(); });
} }
void ue_stack_lte::out_of_sync() void ue_stack_lte::out_of_sync()
{ {
pending_tasks.push(sync_queue_id, [this]() { rrc.out_of_sync(); }); sync_task_queue.push([this]() { rrc.out_of_sync(); });
} }
void ue_stack_lte::run_tti(uint32_t tti, uint32_t tti_jump) void ue_stack_lte::run_tti(uint32_t tti, uint32_t tti_jump)
{ {
pending_tasks.push(sync_queue_id, [this, tti, tti_jump]() { run_tti_impl(tti, tti_jump); }); sync_task_queue.push([this, tti, tti_jump]() { run_tti_impl(tti, tti_jump); });
} }
void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump) void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump)
@ -291,17 +281,11 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump)
} }
current_tti = tti_point{tti}; current_tti = tti_point{tti};
// Perform pending stack deferred tasks
for (auto& task : deferred_stack_tasks) {
task();
}
deferred_stack_tasks.clear();
// perform tasks for the received TTI range // perform tasks for the received TTI range
for (uint32_t i = 0; i < tti_jump; ++i) { for (uint32_t i = 0; i < tti_jump; ++i) {
uint32_t next_tti = TTI_SUB(tti, (tti_jump - i - 1)); uint32_t next_tti = TTI_SUB(tti, (tti_jump - i - 1));
mac.run_tti(next_tti); mac.run_tti(next_tti);
timers.step_all(); task_sched.tic();
} }
rrc.run_tti(); rrc.run_tti();
nas.run_tti(); nas.run_tti();
@ -316,8 +300,8 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump)
} }
// print warning if PHY pushes new TTI messages faster than we process them // print warning if PHY pushes new TTI messages faster than we process them
if (pending_tasks.size(sync_queue_id) > SYNC_QUEUE_WARN_THRESHOLD) { if (sync_task_queue.size() > SYNC_QUEUE_WARN_THRESHOLD) {
stack_log->warning("Detected slow task processing (sync_queue_len=%zd).\n", pending_tasks.size(sync_queue_id)); stack_log->warning("Detected slow task processing (sync_queue_len=%zd).\n", sync_task_queue.size());
} }
} }
@ -327,23 +311,23 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump)
void ue_stack_lte::enqueue_background_task(std::function<void(uint32_t)> f) void ue_stack_lte::enqueue_background_task(std::function<void(uint32_t)> f)
{ {
background_tasks.push_task(std::move(f)); task_sched.enqueue_background_task(std::move(f));
} }
void ue_stack_lte::notify_background_task_result(srslte::move_task_t task) void ue_stack_lte::notify_background_task_result(srslte::move_task_t task)
{ {
// run the notification in the stack thread // run the notification in the stack thread
pending_tasks.push(background_queue_id, std::move(task)); task_sched.notify_background_task_result(std::move(task));
} }
void ue_stack_lte::defer_callback(uint32_t duration_ms, std::function<void()> func) void ue_stack_lte::defer_callback(uint32_t duration_ms, std::function<void()> func)
{ {
timers.defer_callback(duration_ms, func); task_sched.defer_callback(duration_ms, std::move(func));
} }
void ue_stack_lte::defer_task(srslte::move_task_t task) void ue_stack_lte::defer_task(srslte::move_task_t task)
{ {
deferred_stack_tasks.push_back(std::move(task)); task_sched.defer_task(std::move(task));
} }
/******************** /********************
@ -352,21 +336,21 @@ void ue_stack_lte::defer_task(srslte::move_task_t task)
void ue_stack_lte::start_cell_search() void ue_stack_lte::start_cell_search()
{ {
background_tasks.push_task([this](uint32_t worker_id) { task_sched.enqueue_background_task([this](uint32_t worker_id) {
phy_interface_rrc_lte::phy_cell_t found_cell; phy_interface_rrc_lte::phy_cell_t found_cell;
phy_interface_rrc_lte::cell_search_ret_t ret = phy->cell_search(&found_cell); phy_interface_rrc_lte::cell_search_ret_t ret = phy->cell_search(&found_cell);
// notify back RRC // notify back RRC
pending_tasks.push(background_queue_id, [this, found_cell, ret]() { rrc.cell_search_completed(ret, found_cell); }); task_sched.notify_background_task_result([this, found_cell, ret]() { rrc.cell_search_completed(ret, found_cell); });
}); });
} }
void ue_stack_lte::start_cell_select(const phy_interface_rrc_lte::phy_cell_t* phy_cell) void ue_stack_lte::start_cell_select(const phy_interface_rrc_lte::phy_cell_t* phy_cell)
{ {
phy_interface_rrc_lte::phy_cell_t cell_copy = *phy_cell; phy_interface_rrc_lte::phy_cell_t cell_copy = *phy_cell;
background_tasks.push_task([this, cell_copy](uint32_t worker_id) { task_sched.enqueue_background_task([this, cell_copy](uint32_t worker_id) {
bool ret = phy->cell_select(&cell_copy); bool ret = phy->cell_select(&cell_copy);
// notify back RRC // notify back RRC
pending_tasks.push(background_queue_id, [this, ret]() { rrc.cell_select_completed(ret); }); task_sched.notify_background_task_result([this, ret]() { rrc.cell_select_completed(ret); });
}); });
} }

@ -199,12 +199,12 @@ void ue_stack_nr::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, boo
*/ */
void ue_stack_nr::in_sync() void ue_stack_nr::in_sync()
{ {
// pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.in_sync(); }}); // pending_tasks.push(sync_task_queue, task_t{[this](task_t*) { rrc.in_sync(); }});
} }
void ue_stack_nr::out_of_sync() void ue_stack_nr::out_of_sync()
{ {
// pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.out_of_sync(); }}); // pending_tasks.push(sync_task_queue, task_t{[this](task_t*) { rrc.out_of_sync(); }});
} }
void ue_stack_nr::run_tti(uint32_t tti) void ue_stack_nr::run_tti(uint32_t tti)

@ -342,26 +342,11 @@ public:
mac_h = mac_; mac_h = mac_;
phy_h = phy_; phy_h = phy_;
} }
bool events_exist()
{
for (int i = 0; i < pending_tasks.nof_queues(); ++i) {
if (not pending_tasks.empty(i)) {
return true;
}
}
return false;
}
void run_tti(uint32_t tti) void run_tti(uint32_t tti)
{ {
mac_h->run_tti(tti); mac_h->run_tti(tti);
// flush all events // flush all events
if (events_exist()) { stack_test_dummy::run_tti();
srslte::move_task_t task{};
if (pending_tasks.wait_pop(&task) >= 0) {
task();
}
}
timers.step_all();
} }
private: private:

@ -123,7 +123,7 @@ int ttcn3_syssim::init(const all_args_t& args_)
// Init SS layers // Init SS layers
pdus.init(this, log); pdus.init(this, log);
rlc.init(&pdcp, this, &stack.timers, 0 /* RB_ID_SRB0 */); rlc.init(&pdcp, this, stack.task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */);
pdcp.init(&rlc, this, this); pdcp.init(&rlc, this, this);
return SRSLTE_SUCCESS; return SRSLTE_SUCCESS;

@ -168,7 +168,9 @@ public:
{ {
running = true; running = true;
while (running) { while (running) {
timers.step_all(); task_sched.tic();
while (task_sched.try_run_next_external_task()) {
}
nas->run_tti(); nas->run_tti();
} }
} }

@ -187,7 +187,9 @@ public:
void run_tti(uint32_t tti_) void run_tti(uint32_t tti_)
{ {
stack->timers.step_all(); stack->task_sched.tic();
while (stack->task_sched.try_run_next_external_task()) {
}
rrc::run_tti(); rrc::run_tti();
} }

Loading…
Cancel
Save