diff --git a/lib/include/srslte/interfaces/enb_interfaces.h b/lib/include/srslte/interfaces/enb_interfaces.h index a158441e1..cf60c85e0 100644 --- a/lib/include/srslte/interfaces/enb_interfaces.h +++ b/lib/include/srslte/interfaces/enb_interfaces.h @@ -347,7 +347,8 @@ public: class stack_interface_gtpu_lte { public: - virtual void add_gtpu_socket(int fd) = 0; + virtual void add_gtpu_socket(int fd) = 0; + virtual void add_gtpu_mch_socket(int fd) = 0; }; } // namespace srsenb diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index aabcc6048..5fa058fd6 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -97,6 +97,7 @@ public: void add_mme_socket(int fd) override; void remove_mme_socket(int fd) override; void add_gtpu_socket(int fd) override; + void add_gtpu_mch_socket(int fd) override; private: static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD diff --git a/srsenb/hdr/stack/upper/gtpu.h b/srsenb/hdr/stack/upper/gtpu.h index ca5f8db60..fb2fa7cba 100644 --- a/srsenb/hdr/stack/upper/gtpu.h +++ b/srsenb/hdr/stack/upper/gtpu.h @@ -60,52 +60,50 @@ public: // stack interface void handle_gtpu_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr); + void handle_gtpu_mch_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr); private: static const int THREAD_PRIO = 65; static const int GTPU_PORT = 2152; - srslte::byte_buffer_pool* pool; + srslte::byte_buffer_pool* pool = nullptr; stack_interface_gtpu_lte* stack = nullptr; - bool running; - bool run_enable; - - bool enable_mbsfn; + bool enable_mbsfn = false; std::string gtp_bind_addr; std::string mme_addr; - srsenb::pdcp_interface_gtpu *pdcp; - srslte::log *gtpu_log; + srsenb::pdcp_interface_gtpu* pdcp = nullptr; + srslte::log* gtpu_log = nullptr; // Class to create - class mch_thread : public thread { + class mch_handler + { public: - mch_thread() : initiated(false), running(false), run_enable(false), pool(NULL), lcid_counter(0), thread("MCH") {} - bool init(std::string m1u_multiaddr_, std::string m1u_if_addr_, pdcp_interface_gtpu *pdcp_, srslte::log *gtpu_log_); - void stop(); - private: - void run_thread(); - - bool initiated; - bool running; - bool run_enable; + explicit mch_handler(gtpu* gtpu_) : parent(gtpu_) {} + ~mch_handler(); + mch_handler(const mch_handler&) = delete; + mch_handler(mch_handler&&) = delete; + mch_handler& operator=(const mch_handler&) = delete; + mch_handler& operator=(mch_handler&&) = delete; + bool init(std::string m1u_multiaddr_, std::string m1u_if_addr_); + void handle_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr); - static const int MCH_THREAD_PRIO = 65; - - pdcp_interface_gtpu *pdcp; - srslte::log *gtpu_log; - int m1u_sd; - int lcid_counter; - std::string m1u_multiaddr; - std::string m1u_if_addr; - - srslte::byte_buffer_pool *pool; + private: + gtpu* parent = nullptr; + pdcp_interface_gtpu* pdcp = nullptr; + srslte::log* gtpu_log = nullptr; + std::string m1u_multiaddr; + std::string m1u_if_addr; + + bool initiated = false; + int m1u_sd = -1; + int lcid_counter = 0; }; // MCH thread insteance - mch_thread mchthread; + mch_handler mch; - typedef struct{ + typedef struct { uint32_t teids_in[SRSENB_N_RADIO_BEARERS]; uint32_t teids_out[SRSENB_N_RADIO_BEARERS]; uint32_t spgw_addrs[SRSENB_N_RADIO_BEARERS]; @@ -113,7 +111,7 @@ private: std::map rnti_bearers; // Socket file descriptor - int fd; + int fd = -1; void echo_response(in_addr_t addr, in_port_t port, uint16_t seq); diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 0546a5112..959c48e29 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -172,6 +172,8 @@ void enb_stack_lte::stop() void enb_stack_lte::stop_impl() { + rx_sockets->stop(); + s1ap.stop(); gtpu.stop(); mac.stop(); @@ -186,8 +188,6 @@ void enb_stack_lte::stop_impl() mac_pcap.close(); } - rx_sockets->stop(); - // erasing the queues is the last thing, bc we need them to call stop_impl() pending_tasks.erase_queue(sync_queue_id); pending_tasks.erase_queue(enb_queue_id); @@ -252,4 +252,13 @@ void enb_stack_lte::add_gtpu_socket(int fd) rx_sockets->add_socket_pdu_handler(fd, gtpu_rx_handler); } +void enb_stack_lte::add_gtpu_mch_socket(int fd) +{ + auto gtpu_mch_handler = [this](srslte::unique_byte_buffer_t pdu, const sockaddr_in& from) { + auto task_handler = [this, from](task_t* t) { gtpu.handle_gtpu_mch_rx_packet(std::move(t->pdu), from); }; + pending_tasks.push(gtpu_queue_id, task_t{task_handler, std::move(pdu)}); + }; + rx_sockets->add_socket_pdu_handler(fd, gtpu_mch_handler); +} + } // namespace srsenb diff --git a/srsenb/src/stack/upper/gtpu.cc b/srsenb/src/stack/upper/gtpu.cc index 31f036148..e602890a5 100644 --- a/srsenb/src/stack/upper/gtpu.cc +++ b/srsenb/src/stack/upper/gtpu.cc @@ -30,13 +30,9 @@ using namespace srslte; namespace srsenb { -gtpu::gtpu() : mchthread() +gtpu::gtpu() : mch(this) { - pdcp = NULL; - gtpu_log = NULL; - pool = NULL; - - pthread_mutex_init(&mutex, NULL); + pthread_mutex_init(&mutex, nullptr); } bool gtpu::init(std::string gtp_bind_addr_, @@ -83,35 +79,20 @@ bool gtpu::init(std::string gtp_bind_addr_, return false; } - run_enable = true; - running = true; stack->add_gtpu_socket(fd); - // Start MCH thread if enabled + // Start MCH socket if enabled enable_mbsfn = enable_mbsfn_; if (enable_mbsfn) { - mchthread.init(m1u_multiaddr_, m1u_if_addr_, pdcp, gtpu_log); + if (not mch.init(m1u_multiaddr_, m1u_if_addr_)) { + return false; + } } return true; } void gtpu::stop() { - if(enable_mbsfn){ - mchthread.stop(); - } - - if (run_enable) { - run_enable = false; - // Wait thread to exit gracefully otherwise might leave a mutex locked - int cnt=0; - while(running && cnt<100) { - usleep(10000); - cnt++; - } - } - running = false; - if (fd) { close(fd); } @@ -254,6 +235,11 @@ void gtpu::handle_gtpu_rx_packet(srslte::unique_byte_buffer_t pdu, const sockadd } } +void gtpu::handle_gtpu_mch_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr) +{ + mch.handle_rx_packet(std::move(pdu), addr); +} + void gtpu::echo_response(in_addr_t addr, in_port_t port, uint16_t seq) { gtpu_log->info("TX GTPU Echo Response, Seq: %d\n", seq); @@ -294,21 +280,28 @@ void gtpu::rntilcid_to_teidin(uint16_t rnti, uint16_t lcid, uint32_t *teidin) *teidin = (rnti << 16) | lcid; } - /**************************************************************************** -* Class to run the MCH thread -***************************************************************************/ -bool gtpu::mch_thread::init(std::string m1u_multiaddr_, std::string m1u_if_addr_, pdcp_interface_gtpu *pdcp, srslte::log *gtpu_log) + * Class to run the MCH thread + ***************************************************************************/ + +gtpu::mch_handler::~mch_handler() { - pool = byte_buffer_pool::get_instance(); - this->pdcp = pdcp; - this->gtpu_log = gtpu_log; - m1u_multiaddr = m1u_multiaddr_; - m1u_if_addr = m1u_if_addr_; + if (initiated) { + close(m1u_sd); + initiated = false; + } +} - struct sockaddr_in bindaddr; +bool gtpu::mch_handler::init(std::string m1u_multiaddr_, std::string m1u_if_addr_) +{ + m1u_multiaddr = std::move(m1u_multiaddr_); + m1u_if_addr = std::move(m1u_if_addr_); + pdcp = parent->pdcp; + gtpu_log = parent->gtpu_log; // Set up sink socket + struct sockaddr_in bindaddr { + }; m1u_sd = socket(AF_INET, SOCK_DGRAM, 0); if (m1u_sd < 0) { gtpu_log->error("Failed to create M1-U sink socket\n"); @@ -316,101 +309,42 @@ bool gtpu::mch_thread::init(std::string m1u_multiaddr_, std::string m1u_if_addr_ } /* Bind socket */ - bzero((char *)&bindaddr, sizeof(struct sockaddr_in)); bindaddr.sin_family = AF_INET; - bindaddr.sin_addr.s_addr = htonl(INADDR_ANY); //Multicast sockets require bind to INADDR_ANY - bindaddr.sin_port = htons(GTPU_PORT+1); - size_t addrlen = sizeof(bindaddr); - - if (bind(m1u_sd, (struct sockaddr *) &bindaddr, sizeof(bindaddr)) < 0) { + bindaddr.sin_addr.s_addr = htonl(INADDR_ANY); // Multicast sockets require bind to INADDR_ANY + bindaddr.sin_port = htons(GTPU_PORT + 1); + if (bind(m1u_sd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) < 0) { gtpu_log->error("Failed to bind multicast socket\n"); return false; } /* Send an ADD MEMBERSHIP message via setsockopt */ - struct ip_mreq mreq; - mreq.imr_multiaddr.s_addr = inet_addr(m1u_multiaddr.c_str()); //Multicast address of the service - mreq.imr_interface.s_addr = inet_addr(m1u_if_addr.c_str()); //Address of the IF the socket will listen to. - if (setsockopt(m1u_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, - &mreq, sizeof(mreq)) < 0) { + struct ip_mreq mreq { + }; + mreq.imr_multiaddr.s_addr = inet_addr(m1u_multiaddr.c_str()); // Multicast address of the service + mreq.imr_interface.s_addr = inet_addr(m1u_if_addr.c_str()); // Address of the IF the socket will listen to. + if (setsockopt(m1u_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { gtpu_log->error("Register musticast group for M1-U\n"); gtpu_log->error("M1-U infterface IP: %s, M1-U Multicast Address %s\n", m1u_if_addr.c_str(),m1u_multiaddr.c_str()); return false; } gtpu_log->info("M1-U initialized\n"); - initiated = true; + initiated = true; lcid_counter = 1; - // Start thread - start(MCH_THREAD_PRIO); + // Register socket in stack rx sockets thread + parent->stack->add_gtpu_mch_socket(m1u_sd); + return true; } -void gtpu::mch_thread::run_thread() +void gtpu::mch_handler::handle_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr) { - if (!initiated) { - ERROR("Fatal error running mch_thread without initialization\n"); - return; - } + gtpu_log->debug("Received %d bytes from M1-U interface\n", pdu->N_bytes); - unique_byte_buffer_t pdu = allocate_unique_buffer(*pool); - int n; - socklen_t addrlen; - sockaddr_in src_addr; - - bzero((char *)&src_addr, sizeof(src_addr)); - src_addr.sin_family = AF_INET; - src_addr.sin_addr.s_addr = htonl(INADDR_ANY); - src_addr.sin_port = htons(GTPU_PORT+1); - addrlen = sizeof(src_addr); - - run_enable = true; - running=true; - - // Warning: Use mutex here if creating multiple services each with a different thread - uint16_t lcid = lcid_counter; - lcid_counter++; - - while(run_enable) { - - pdu->clear(); - do{ - n = recvfrom(m1u_sd, pdu->msg, SRSENB_MAX_BUFFER_SIZE_BYTES - SRSENB_BUFFER_HEADER_OFFSET, 0, (struct sockaddr *) &src_addr, &addrlen); - } while (n == -1 && errno == EAGAIN); - gtpu_log->debug("Received %d bytes from M1-U interface\n", n); - - pdu->N_bytes = (uint32_t) n; - - gtpu_header_t header; - gtpu_read_header(pdu.get(), &header, gtpu_log); - pdcp->write_sdu(SRSLTE_MRNTI, lcid, std::move(pdu)); - do { - pdu = allocate_unique_buffer(*pool); - if (!pdu.get()) { - gtpu_log->console("GTPU Buffer pool empty. Trying again...\n"); - usleep(10000); - } - } while (!pdu.get()); - } - running = false; -} - -void gtpu::mch_thread::stop() -{ - if (run_enable) { - run_enable = false; - // Wait thread to exit gracefully otherwise might leave a mutex locked - int cnt = 0; - while(running && cnt < 100) { - usleep(10000); - cnt++; - } - if (running) { - thread_cancel(); - } - wait_thread_finish(); - } + gtpu_header_t header; + gtpu_read_header(pdu.get(), &header, gtpu_log); + pdcp->write_sdu(SRSLTE_MRNTI, lcid_counter++, std::move(pdu)); } } // namespace srsenb