std::functions do not accept move-only captures. So I had to create a wrapper earlier. In this PR, I cleaned a bit the previous API for the code to be a bit more readable

master
Francisco Paisana 5 years ago
parent c9f092e8e9
commit ac4d3b9624

@ -225,6 +225,40 @@ private:
uint32_t nof_threads_waiting = 0; uint32_t nof_threads_waiting = 0;
}; };
/***********************************************************
* Specialization for tasks with content that is move-only
**********************************************************/
template <typename Capture>
class moveable_task_t
{
public:
moveable_task_t() = default;
template <typename Func>
moveable_task_t(Func&& f) : func(std::forward<Func>(f))
{
}
template <typename Func>
moveable_task_t(Func&& f, Capture&& c) :
func([this, f]() { f(std::move(capture)); }),
capture(std::forward<Capture>(c))
{
}
void operator()() { func(); }
private:
std::function<void()> func;
Capture capture;
};
template <typename Func, typename Capture>
moveable_task_t<Capture> bind_task(Func&& f, Capture&& c)
{
return moveable_task_t<Capture>{std::forward<Func>(f), std::forward<Capture>(c)};
}
using multiqueue_task_handler = multiqueue_handler<moveable_task_t<srslte::unique_byte_buffer_t> >;
} // namespace srslte } // namespace srslte
#endif // SRSLTE_MULTIQUEUE_H #endif // SRSLTE_MULTIQUEUE_H

@ -147,20 +147,8 @@ private:
// state // state
bool started = false; bool started = false;
// NOTE: we use this struct instead of a std::function bc lambdas can't capture by move in C++11 using task_t = srslte::moveable_task_t<srslte::unique_byte_buffer_t>;
struct task_t { srslte::multiqueue_task_handler pending_tasks;
std::function<void(task_t*)> func;
srslte::unique_byte_buffer_t pdu;
task_t() = default;
explicit task_t(std::function<void(task_t*)> f_) : func(std::move(f_)) {}
task_t(std::function<void(task_t*)> f_, srslte::unique_byte_buffer_t pdu_) :
func(std::move(f_)),
pdu(std::move(pdu_))
{
}
void operator()() { func(this); }
};
srslte::multiqueue_handler<task_t> pending_tasks;
int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1; int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1;
}; };

@ -157,7 +157,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
void enb_stack_lte::tti_clock() void enb_stack_lte::tti_clock()
{ {
pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { tti_clock_impl(); }}); pending_tasks.push(sync_queue_id, [this]() { tti_clock_impl(); });
} }
void enb_stack_lte::tti_clock_impl() void enb_stack_lte::tti_clock_impl()
@ -169,7 +169,7 @@ void enb_stack_lte::tti_clock_impl()
void enb_stack_lte::stop() void enb_stack_lte::stop()
{ {
if (started) { if (started) {
pending_tasks.push(enb_queue_id, task_t{[this](task_t*) { stop_impl(); }}); pending_tasks.push(enb_queue_id, [this]() { stop_impl(); });
wait_thread_finish(); wait_thread_finish();
} }
} }
@ -223,11 +223,11 @@ void enb_stack_lte::handle_mme_rx_packet(srslte::unique_byte_buffer_t pdu,
int flags) int flags)
{ {
// Defer the handling of MME packet to eNB stack main thread // Defer the handling of MME packet to eNB stack main thread
auto task_handler = [this, from, sri, flags](task_t* t) { auto task_handler = [this, from, sri, flags](srslte::unique_byte_buffer_t t) {
s1ap.handle_mme_rx_msg(std::move(t->pdu), from, sri, flags); s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags);
}; };
// Defer the handling of MME packet to main stack thread // Defer the handling of MME packet to main stack thread
pending_tasks.push(mme_queue_id, task_t{task_handler, std::move(pdu)}); pending_tasks.push(mme_queue_id, srslte::bind_task(task_handler, std::move(pdu)));
} }
void enb_stack_lte::add_mme_socket(int fd) void enb_stack_lte::add_mme_socket(int fd)

@ -160,16 +160,8 @@ private:
// Thread // Thread
static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD
using task_t = srslte::moveable_task_t<srslte::unique_byte_buffer_t>;
// NOTE: we use this struct instead of a std::function bc lambdas can't capture by move in C++11 srslte::multiqueue_task_handler pending_tasks;
struct task_t {
std::function<void(task_t*)> func;
srslte::unique_byte_buffer_t pdu;
task_t() = default;
explicit task_t(std::function<void(task_t*)> f_) : func(std::move(f_)) {}
void operator()() { func(this); }
};
srslte::multiqueue_handler<task_t> pending_tasks;
int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1; int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1;
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks
}; };

@ -42,7 +42,7 @@ ue_stack_lte::ue_stack_lte() :
pending_tasks(1024), pending_tasks(1024),
background_tasks(2) background_tasks(2)
{ {
ue_queue_id = pending_tasks.add_queue(); ue_queue_id = pending_tasks.add_queue();
sync_queue_id = pending_tasks.add_queue(); sync_queue_id = pending_tasks.add_queue();
gw_queue_id = pending_tasks.add_queue(); gw_queue_id = pending_tasks.add_queue();
mac_queue_id = pending_tasks.add_queue(); mac_queue_id = pending_tasks.add_queue();
@ -139,7 +139,7 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_)
void ue_stack_lte::stop() void ue_stack_lte::stop()
{ {
if (running) { if (running) {
pending_tasks.try_push(ue_queue_id, task_t{[this](task_t*) { stop_impl(); }}); pending_tasks.try_push(ue_queue_id, [this]() { stop_impl(); });
wait_thread_finish(); wait_thread_finish();
} }
} }
@ -167,9 +167,8 @@ void ue_stack_lte::stop_impl()
bool ue_stack_lte::switch_on() bool ue_stack_lte::switch_on()
{ {
if (running) { if (running) {
pending_tasks.try_push(ue_queue_id, task_t{[this](task_t*) { pending_tasks.try_push(ue_queue_id,
nas.start_attach_request(nullptr, srslte::establishment_cause_t::mo_data); [this]() { nas.start_attach_request(nullptr, srslte::establishment_cause_t::mo_data); });
}});
return true; return true;
} }
return false; return false;
@ -243,11 +242,11 @@ void ue_stack_lte::run_thread()
*/ */
void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bool blocking) void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bool blocking)
{ {
task_t task{}; auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t sdu) {
task.pdu = std::move(sdu); pdcp.write_sdu(lcid, std::move(sdu), blocking);
task.func = [this, lcid, blocking](task_t* task_ctxt) { pdcp.write_sdu(lcid, std::move(task_ctxt->pdu), blocking); }; };
std::pair<bool, task_t> ret = pending_tasks.try_push(gw_queue_id, std::move(task)); bool ret = pending_tasks.try_push(gw_queue_id, srslte::bind_task(task, std::move(sdu))).first;
if (not ret.first) { if (not ret) {
pdcp_log.warning("GW SDU with lcid=%d was discarded.\n", lcid); pdcp_log.warning("GW SDU with lcid=%d was discarded.\n", lcid);
} }
} }
@ -261,17 +260,17 @@ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bo
*/ */
void ue_stack_lte::in_sync() void ue_stack_lte::in_sync()
{ {
pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.in_sync(); }}); pending_tasks.push(sync_queue_id, [this]() { rrc.in_sync(); });
} }
void ue_stack_lte::out_of_sync() void ue_stack_lte::out_of_sync()
{ {
pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.out_of_sync(); }}); pending_tasks.push(sync_queue_id, [this]() { rrc.out_of_sync(); });
} }
void ue_stack_lte::run_tti(uint32_t tti) void ue_stack_lte::run_tti(uint32_t tti)
{ {
pending_tasks.push(sync_queue_id, task_t{[this, tti](task_t*) { run_tti_impl(tti); }}); pending_tasks.push(sync_queue_id, [this, tti]() { run_tti_impl(tti); });
} }
void ue_stack_lte::run_tti_impl(uint32_t tti) void ue_stack_lte::run_tti_impl(uint32_t tti)
@ -288,7 +287,7 @@ void ue_stack_lte::run_tti_impl(uint32_t tti)
void ue_stack_lte::process_pdus() void ue_stack_lte::process_pdus()
{ {
pending_tasks.push(mac_queue_id, task_t{[this](task_t*) { mac.process_pdus(); }}); pending_tasks.push(mac_queue_id, [this]() { mac.process_pdus(); });
} }
void ue_stack_lte::wait_ra_completion(uint16_t rnti) void ue_stack_lte::wait_ra_completion(uint16_t rnti)
@ -310,8 +309,7 @@ void ue_stack_lte::start_cell_search()
phy_interface_rrc_lte::phy_cell_t found_cell; phy_interface_rrc_lte::phy_cell_t found_cell;
phy_interface_rrc_lte::cell_search_ret_t ret = phy->cell_search(&found_cell); phy_interface_rrc_lte::cell_search_ret_t ret = phy->cell_search(&found_cell);
// notify back RRC // notify back RRC
pending_tasks.push(background_queue_id, pending_tasks.push(background_queue_id, [this, found_cell, ret]() { rrc.cell_search_completed(ret, found_cell); });
task_t{[this, found_cell, ret](task_t*) { rrc.cell_search_completed(ret, found_cell); }});
}); });
} }

Loading…
Cancel
Save