diff --git a/lib/include/srsran/common/network_utils.h b/lib/include/srsran/common/network_utils.h index 3c7446047..f3dbeab35 100644 --- a/lib/include/srsran/common/network_utils.h +++ b/lib/include/srsran/common/network_utils.h @@ -117,7 +117,7 @@ public: using sctp_recv_callback_t = std::function; - rx_multisocket_handler(std::string name_, srslog::basic_logger& logger, int thread_prio = 65); + rx_multisocket_handler(); rx_multisocket_handler(rx_multisocket_handler&&) = delete; rx_multisocket_handler(const rx_multisocket_handler&) = delete; rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete; @@ -135,6 +135,8 @@ public: void run_thread() override; private: + const int thread_prio = 65; + // used to unlock select struct ctrl_cmd_t { enum class cmd_id_t { EXIT, NEW_FD, RM_FD }; @@ -146,7 +148,6 @@ private: remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd); // args - std::string name; srslog::basic_logger& logger; // state @@ -158,6 +159,8 @@ private: std::condition_variable rem_cvar; }; +rx_multisocket_handler& get_stack_socket_manager(); + } // namespace srsran #endif // SRSRAN_RX_SOCKET_HANDLER_H diff --git a/lib/include/srsran/common/task_scheduler.h b/lib/include/srsran/common/task_scheduler.h index ea1785ae3..5bba56fee 100644 --- a/lib/include/srsran/common/task_scheduler.h +++ b/lib/include/srsran/common/task_scheduler.h @@ -123,7 +123,8 @@ public: { sched->defer_callback(duration_ms, std::forward(func)); } - void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); } + void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); } + srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); } private: task_scheduler* sched; diff --git a/lib/include/srsran/interfaces/enb_interfaces.h b/lib/include/srsran/interfaces/enb_interfaces.h index 7b42ed9fb..47e4898d5 100644 --- a/lib/include/srsran/interfaces/enb_interfaces.h +++ b/lib/include/srsran/interfaces/enb_interfaces.h @@ -20,12 +20,6 @@ namespace srsenb { class stack_interface_phy_lte; -class stack_interface_s1ap_lte -{ -public: - virtual void add_mme_socket(int fd) = 0; - virtual void remove_mme_socket(int fd) = 0; -}; class stack_interface_gtpu_lte { diff --git a/lib/src/common/network_utils.cc b/lib/src/common/network_utils.cc index d55fc8720..88d4852e7 100644 --- a/lib/src/common/network_utils.cc +++ b/lib/src/common/network_utils.cc @@ -16,10 +16,10 @@ #include #include -#define rxSockError(fmt, ...) logger.error("%s: " fmt, name.c_str(), ##__VA_ARGS__) -#define rxSockWarn(fmt, ...) logger.warning("%s: " fmt, name.c_str(), ##__VA_ARGS__) -#define rxSockInfo(fmt, ...) logger.info("%s: " fmt, name.c_str(), ##__VA_ARGS__) -#define rxSockDebug(fmt, ...) logger.debug("%s: " fmt, name.c_str(), ##__VA_ARGS__) +#define rxSockError(fmt, ...) logger.error("RxSockets: " fmt, ##__VA_ARGS__) +#define rxSockWarn(fmt, ...) logger.warning("RxSockets: " fmt, ##__VA_ARGS__) +#define rxSockInfo(fmt, ...) logger.info("RxSockets: " fmt, ##__VA_ARGS__) +#define rxSockDebug(fmt, ...) logger.debug("RxSockets: " fmt, ##__VA_ARGS__) namespace srsran { @@ -412,14 +412,10 @@ private: * Rx Multisocket Handler **************************************************************/ -rx_multisocket_handler::rx_multisocket_handler(std::string name_, srslog::basic_logger& logger, int thread_prio) : - thread(name_), name(std::move(name_)), logger(logger) +rx_multisocket_handler::rx_multisocket_handler() : thread("RXsockets"), logger(srslog::fetch_basic_logger("COMN")) { // register control pipe fd - if (pipe(pipefd) == -1) { - rxSockInfo("Failed to open control pipe"); - return; - } + srsran_assert(pipe(pipefd) != -1, "Failed to open control pipe"); start(thread_prio); } @@ -630,4 +626,10 @@ void rx_multisocket_handler::run_thread() } } +rx_multisocket_handler& get_stack_socket_manager() +{ + static rx_multisocket_handler handler; + return handler; +} + } // namespace srsran diff --git a/lib/test/common/network_utils_test.cc b/lib/test/common/network_utils_test.cc index e8240998c..9e3c1ec2b 100644 --- a/lib/test/common/network_utils_test.cc +++ b/lib/test/common/network_utils_test.cc @@ -28,7 +28,7 @@ int test_socket_handler() int counter = 0; srsran::unique_socket server_socket, client_socket, client_socket2; - srsran::rx_multisocket_handler sockhandler("RXSOCKETS", logger); + srsran::rx_multisocket_handler sockhandler; int server_port = 36412; const char* server_addr = "127.0.100.1"; using namespace srsran::net_utils; diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index 2c954383b..609658572 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -35,7 +35,6 @@ namespace srsenb { class enb_stack_lte final : public enb_stack_base, public stack_interface_phy_lte, - public stack_interface_s1ap_lte, public stack_interface_gtpu_lte, public srsran::thread { @@ -97,9 +96,6 @@ public: } void tti_clock() override; - /* STACK-S1AP interface*/ - void add_mme_socket(int fd) override; - void remove_mme_socket(int fd) override; void add_gtpu_s1u_socket_handler(int fd) override; void add_gtpu_m1u_socket_handler(int fd) override; @@ -135,9 +131,6 @@ private: srsran::task_scheduler task_sched; srsran::task_queue_handle enb_task_queue, gtpu_task_queue, mme_task_queue, sync_task_queue; - // components that layers depend on (need to be destroyed after layers) - std::unique_ptr rx_sockets; - srsenb::mac mac; srsenb::rlc rlc; srsenb::pdcp pdcp; diff --git a/srsenb/hdr/stack/upper/s1ap.h b/srsenb/hdr/stack/upper/s1ap.h index 53edebddd..c6eed8f6f 100644 --- a/srsenb/hdr/stack/upper/s1ap.h +++ b/srsenb/hdr/stack/upper/s1ap.h @@ -53,7 +53,7 @@ public: static const uint32_t ts1_reloc_overall_timeout_ms = 10000; s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger); - int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interface_s1ap_lte* stack_); + int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_); void stop(); void get_metrics(s1ap_metrics_t& m); @@ -109,11 +109,11 @@ private: static const int NONUE_STREAM_ID = 0; // args - rrc_interface_s1ap* rrc = nullptr; - s1ap_args_t args; - srslog::basic_logger& logger; - srsenb::stack_interface_s1ap_lte* stack = nullptr; - srsran::task_sched_handle task_sched; + rrc_interface_s1ap* rrc = nullptr; + s1ap_args_t args; + srslog::basic_logger& logger; + srsran::task_sched_handle task_sched; + srsran::task_queue_handle mme_task_queue; srsran::unique_socket mme_socket; struct sockaddr_in mme_addr = {}; // MME address diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index ecca7314b..dd57f4b98 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -109,9 +109,6 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_) s1ap.start_pcap(&s1ap_pcap); } - // Init Rx socket handler - rx_sockets.reset(new srsran::rx_multisocket_handler("ENBSOCKETS", stack_logger)); - // add sync queue sync_task_queue = task_sched.make_task_queue(args.sync_queue_size); @@ -126,7 +123,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_) stack_logger.error("Couldn't initialize RRC"); return SRSRAN_ERROR; } - if (s1ap.init(args.s1ap, &rrc, this) != SRSRAN_SUCCESS) { + if (s1ap.init(args.s1ap, &rrc) != SRSRAN_SUCCESS) { stack_logger.error("Couldn't initialize S1AP"); return SRSRAN_ERROR; } @@ -169,7 +166,7 @@ void enb_stack_lte::stop() void enb_stack_lte::stop_impl() { - rx_sockets->stop(); + srsran::get_stack_socket_manager().stop(); s1ap.stop(); gtpu.stop(); @@ -241,21 +238,6 @@ void enb_stack_lte::handle_mme_rx_packet(srsran::unique_byte_buffer_t pdu, mme_task_queue.push(std::bind(task_handler, std::move(pdu))); } -void enb_stack_lte::add_mme_socket(int fd) -{ - // Pass MME Rx packet handler functor to socket handler to run in socket thread - auto mme_rx_handler = - [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) { - handle_mme_rx_packet(std::move(pdu), from, sri, flags); - }; - rx_sockets->add_socket_sctp_pdu_handler(fd, mme_rx_handler); -} - -void enb_stack_lte::remove_mme_socket(int fd) -{ - rx_sockets->remove_socket(fd); -} - void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd) { auto gtpu_s1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) { @@ -264,7 +246,7 @@ void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd) }; gtpu_task_queue.push(std::bind(task_handler, std::move(pdu))); }; - rx_sockets->add_socket_pdu_handler(fd, gtpu_s1u_handler); + srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_s1u_handler); } void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd) @@ -275,7 +257,7 @@ void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd) }; gtpu_task_queue.push(std::bind(task_handler, std::move(pdu))); }; - rx_sockets->add_socket_pdu_handler(fd, gtpu_m1u_handler); + srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_m1u_handler); } } // namespace srsenb diff --git a/srsenb/src/stack/upper/s1ap.cc b/srsenb/src/stack/upper/s1ap.cc index a19533798..8f93c29a1 100644 --- a/srsenb/src/stack/upper/s1ap.cc +++ b/srsenb/src/stack/upper/s1ap.cc @@ -209,7 +209,7 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const s1ap_ptr->mme_connect_timer.duration() / 1000); srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n", s1ap_ptr->mme_connect_timer.duration() / 1000); - s1ap_ptr->stack->remove_mme_socket(s1ap_ptr->mme_socket.get_socket()); + srsran::get_stack_socket_manager().remove_socket(s1ap_ptr->mme_socket.get_socket()); s1ap_ptr->mme_socket.close(); procInfo("S1AP socket closed."); s1ap_ptr->mme_connect_timer.run(); @@ -223,13 +223,14 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const s1ap::s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger) : s1setup_proc(this), logger(logger), task_sched(task_sched_) -{} +{ + mme_task_queue = task_sched.make_task_queue(); +} -int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interface_s1ap_lte* stack_) +int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_) { - rrc = rrc_; - args = args_; - stack = stack_; + rrc = rrc_; + args = args_; build_tai_cgi(); @@ -422,6 +423,24 @@ bool s1ap::is_mme_connected() /* S1AP connection helpers ********************************************************************************/ +/// Callback that will run inside the Sockets thread, and is going to be called whenever a SDU is received from the MME +struct sctp_rx_packet_handler { + s1ap* s1ap_ptr; + srsran::task_queue_handle* task_queue; + + sctp_rx_packet_handler(s1ap* ptr, srsran::task_queue_handle& task_queue_) : s1ap_ptr(ptr), task_queue(&task_queue_) {} + + void operator()(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) + { + // Defer the handling of MME packet to eNB stack main thread + auto packet_handler = [this, from, sri, flags](srsran::unique_byte_buffer_t& t) { + s1ap_ptr->handle_mme_rx_msg(std::move(t), from, sri, flags); + }; + // Defer the handling of MME packet to main stack thread + task_queue->push(std::bind(packet_handler, std::move(pdu))); + } +}; + bool s1ap::connect_mme() { using namespace srsran::net_utils; @@ -440,7 +459,8 @@ bool s1ap::connect_mme() logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd()); // Assign a handler to rx MME packets (going to run in a different thread) - stack->add_mme_socket(mme_socket.fd()); + srsran::get_stack_socket_manager().add_socket_sctp_pdu_handler(mme_socket.fd(), + sctp_rx_packet_handler(this, mme_task_queue)); logger.info("SCTP socket established with MME"); return true; @@ -499,13 +519,13 @@ bool s1ap::handle_mme_rx_msg(srsran::unique_byte_buffer_t pdu, if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) { logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id); srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id); - stack->remove_mme_socket(mme_socket.get_socket()); + srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket()); mme_socket.close(); } else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE && notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) { logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id); srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id); - stack->remove_mme_socket(mme_socket.get_socket()); + srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket()); mme_socket.close(); } } else if (pdu->N_bytes == 0) { diff --git a/srsenb/test/upper/s1ap_test.cc b/srsenb/test/upper/s1ap_test.cc index 76106313a..18406c244 100644 --- a/srsenb/test/upper/s1ap_test.cc +++ b/srsenb/test/upper/s1ap_test.cc @@ -17,13 +17,6 @@ using namespace srsenb; -class stack_dummy : public srsenb::stack_interface_s1ap_lte -{ -public: - void add_mme_socket(int fd) {} - void remove_mme_socket(int fd) {} -}; - struct mme_dummy { mme_dummy(const char* addr_str_, int port_) : addr_str(addr_str_), port(port_) { @@ -171,7 +164,6 @@ void test_s1ap_erab_setup(test_event event) srslog::basic_logger& logger = srslog::fetch_basic_logger("S1AP"); s1ap s1ap_obj(&task_sched, logger); rrc_tester rrc; - stack_dummy stack; asn1::s1ap::s1ap_pdu_c s1ap_pdu; srsran::unique_byte_buffer_t sdu; @@ -190,7 +182,7 @@ void test_s1ap_erab_setup(test_event event) args.mme_addr = mme_addr_str; args.enb_name = "srsenb01"; - TESTASSERT(s1ap_obj.init(args, &rrc, &stack) == SRSRAN_SUCCESS); + TESTASSERT(s1ap_obj.init(args, &rrc) == SRSRAN_SUCCESS); run_s1_setup(s1ap_obj, mme); add_rnti(s1ap_obj, mme);