diff --git a/lib/include/srslte/common/multiqueue.h b/lib/include/srslte/common/multiqueue.h index 32e26beb9..61710a5d9 100644 --- a/lib/include/srslte/common/multiqueue.h +++ b/lib/include/srslte/common/multiqueue.h @@ -225,6 +225,40 @@ private: uint32_t nof_threads_waiting = 0; }; +/*********************************************************** + * Specialization for tasks with content that is move-only + **********************************************************/ + +template +class moveable_task_t +{ +public: + moveable_task_t() = default; + template + moveable_task_t(Func&& f) : func(std::forward(f)) + { + } + template + moveable_task_t(Func&& f, Capture&& c) : + func([this, f]() { f(std::move(capture)); }), + capture(std::forward(c)) + { + } + void operator()() { func(); } + +private: + std::function func; + Capture capture; +}; + +template +moveable_task_t bind_task(Func&& f, Capture&& c) +{ + return moveable_task_t{std::forward(f), std::forward(c)}; +} + +using multiqueue_task_handler = multiqueue_handler >; + } // namespace srslte #endif // SRSLTE_MULTIQUEUE_H diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index d2bf7f302..2959d168c 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -147,20 +147,8 @@ private: // state bool started = false; - // NOTE: we use this struct instead of a std::function bc lambdas can't capture by move in C++11 - struct task_t { - std::function func; - srslte::unique_byte_buffer_t pdu; - task_t() = default; - explicit task_t(std::function f_) : func(std::move(f_)) {} - task_t(std::function f_, srslte::unique_byte_buffer_t pdu_) : - func(std::move(f_)), - pdu(std::move(pdu_)) - { - } - void operator()() { func(this); } - }; - srslte::multiqueue_handler pending_tasks; + using task_t = srslte::moveable_task_t; + srslte::multiqueue_task_handler pending_tasks; int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1; }; diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 7ec681d04..2026cb2e2 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -157,7 +157,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_) void enb_stack_lte::tti_clock() { - pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { tti_clock_impl(); }}); + pending_tasks.push(sync_queue_id, [this]() { tti_clock_impl(); }); } void enb_stack_lte::tti_clock_impl() @@ -169,7 +169,7 @@ void enb_stack_lte::tti_clock_impl() void enb_stack_lte::stop() { if (started) { - pending_tasks.push(enb_queue_id, task_t{[this](task_t*) { stop_impl(); }}); + pending_tasks.push(enb_queue_id, [this]() { stop_impl(); }); wait_thread_finish(); } } @@ -223,11 +223,11 @@ void enb_stack_lte::handle_mme_rx_packet(srslte::unique_byte_buffer_t pdu, int flags) { // Defer the handling of MME packet to eNB stack main thread - auto task_handler = [this, from, sri, flags](task_t* t) { - s1ap.handle_mme_rx_msg(std::move(t->pdu), from, sri, flags); + auto task_handler = [this, from, sri, flags](srslte::unique_byte_buffer_t t) { + s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags); }; // Defer the handling of MME packet to main stack thread - pending_tasks.push(mme_queue_id, task_t{task_handler, std::move(pdu)}); + pending_tasks.push(mme_queue_id, srslte::bind_task(task_handler, std::move(pdu))); } void enb_stack_lte::add_mme_socket(int fd) diff --git a/srsue/hdr/stack/ue_stack_lte.h b/srsue/hdr/stack/ue_stack_lte.h index 32f3e0a89..e09931233 100644 --- a/srsue/hdr/stack/ue_stack_lte.h +++ b/srsue/hdr/stack/ue_stack_lte.h @@ -160,16 +160,8 @@ private: // Thread static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD - - // NOTE: we use this struct instead of a std::function bc lambdas can't capture by move in C++11 - struct task_t { - std::function func; - srslte::unique_byte_buffer_t pdu; - task_t() = default; - explicit task_t(std::function f_) : func(std::move(f_)) {} - void operator()() { func(this); } - }; - srslte::multiqueue_handler pending_tasks; + using task_t = srslte::moveable_task_t; + srslte::multiqueue_task_handler pending_tasks; int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1; srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks }; diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index 652ef97bf..7018e398e 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -42,7 +42,7 @@ ue_stack_lte::ue_stack_lte() : pending_tasks(1024), background_tasks(2) { - ue_queue_id = pending_tasks.add_queue(); + ue_queue_id = pending_tasks.add_queue(); sync_queue_id = pending_tasks.add_queue(); gw_queue_id = pending_tasks.add_queue(); mac_queue_id = pending_tasks.add_queue(); @@ -139,7 +139,7 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_) void ue_stack_lte::stop() { if (running) { - pending_tasks.try_push(ue_queue_id, task_t{[this](task_t*) { stop_impl(); }}); + pending_tasks.try_push(ue_queue_id, [this]() { stop_impl(); }); wait_thread_finish(); } } @@ -167,9 +167,8 @@ void ue_stack_lte::stop_impl() bool ue_stack_lte::switch_on() { if (running) { - pending_tasks.try_push(ue_queue_id, task_t{[this](task_t*) { - nas.start_attach_request(nullptr, srslte::establishment_cause_t::mo_data); - }}); + pending_tasks.try_push(ue_queue_id, + [this]() { nas.start_attach_request(nullptr, srslte::establishment_cause_t::mo_data); }); return true; } return false; @@ -243,11 +242,11 @@ void ue_stack_lte::run_thread() */ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bool blocking) { - task_t task{}; - task.pdu = std::move(sdu); - task.func = [this, lcid, blocking](task_t* task_ctxt) { pdcp.write_sdu(lcid, std::move(task_ctxt->pdu), blocking); }; - std::pair ret = pending_tasks.try_push(gw_queue_id, std::move(task)); - if (not ret.first) { + auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t sdu) { + pdcp.write_sdu(lcid, std::move(sdu), blocking); + }; + bool ret = pending_tasks.try_push(gw_queue_id, srslte::bind_task(task, std::move(sdu))).first; + if (not ret) { pdcp_log.warning("GW SDU with lcid=%d was discarded.\n", lcid); } } @@ -261,17 +260,17 @@ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bo */ void ue_stack_lte::in_sync() { - pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.in_sync(); }}); + pending_tasks.push(sync_queue_id, [this]() { rrc.in_sync(); }); } void ue_stack_lte::out_of_sync() { - pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.out_of_sync(); }}); + pending_tasks.push(sync_queue_id, [this]() { rrc.out_of_sync(); }); } void ue_stack_lte::run_tti(uint32_t tti) { - pending_tasks.push(sync_queue_id, task_t{[this, tti](task_t*) { run_tti_impl(tti); }}); + pending_tasks.push(sync_queue_id, [this, tti]() { run_tti_impl(tti); }); } void ue_stack_lte::run_tti_impl(uint32_t tti) @@ -288,7 +287,7 @@ void ue_stack_lte::run_tti_impl(uint32_t tti) void ue_stack_lte::process_pdus() { - pending_tasks.push(mac_queue_id, task_t{[this](task_t*) { mac.process_pdus(); }}); + pending_tasks.push(mac_queue_id, [this]() { mac.process_pdus(); }); } void ue_stack_lte::wait_ra_completion(uint16_t rnti) @@ -310,8 +309,7 @@ void ue_stack_lte::start_cell_search() phy_interface_rrc_lte::phy_cell_t found_cell; phy_interface_rrc_lte::cell_search_ret_t ret = phy->cell_search(&found_cell); // notify back RRC - pending_tasks.push(background_queue_id, - task_t{[this, found_cell, ret](task_t*) { rrc.cell_search_completed(ret, found_cell); }}); + pending_tasks.push(background_queue_id, [this, found_cell, ret]() { rrc.cell_search_completed(ret, found_cell); }); }); }