removed thread from GTPU MCH

master
Francisco Paisana 5 years ago
parent 1400777639
commit 2512e0fd79

@ -347,7 +347,8 @@ public:
class stack_interface_gtpu_lte class stack_interface_gtpu_lte
{ {
public: public:
virtual void add_gtpu_socket(int fd) = 0; virtual void add_gtpu_socket(int fd) = 0;
virtual void add_gtpu_mch_socket(int fd) = 0;
}; };
} // namespace srsenb } // namespace srsenb

@ -97,6 +97,7 @@ public:
void add_mme_socket(int fd) override; void add_mme_socket(int fd) override;
void remove_mme_socket(int fd) override; void remove_mme_socket(int fd) override;
void add_gtpu_socket(int fd) override; void add_gtpu_socket(int fd) override;
void add_gtpu_mch_socket(int fd) override;
private: private:
static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD

@ -60,52 +60,50 @@ public:
// stack interface // stack interface
void handle_gtpu_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr); void handle_gtpu_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr);
void handle_gtpu_mch_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr);
private: private:
static const int THREAD_PRIO = 65; static const int THREAD_PRIO = 65;
static const int GTPU_PORT = 2152; static const int GTPU_PORT = 2152;
srslte::byte_buffer_pool* pool; srslte::byte_buffer_pool* pool = nullptr;
stack_interface_gtpu_lte* stack = nullptr; stack_interface_gtpu_lte* stack = nullptr;
bool running; bool enable_mbsfn = false;
bool run_enable;
bool enable_mbsfn;
std::string gtp_bind_addr; std::string gtp_bind_addr;
std::string mme_addr; std::string mme_addr;
srsenb::pdcp_interface_gtpu *pdcp; srsenb::pdcp_interface_gtpu* pdcp = nullptr;
srslte::log *gtpu_log; srslte::log* gtpu_log = nullptr;
// Class to create // Class to create
class mch_thread : public thread { class mch_handler
{
public: public:
mch_thread() : initiated(false), running(false), run_enable(false), pool(NULL), lcid_counter(0), thread("MCH") {} explicit mch_handler(gtpu* gtpu_) : parent(gtpu_) {}
bool init(std::string m1u_multiaddr_, std::string m1u_if_addr_, pdcp_interface_gtpu *pdcp_, srslte::log *gtpu_log_); ~mch_handler();
void stop(); mch_handler(const mch_handler&) = delete;
private: mch_handler(mch_handler&&) = delete;
void run_thread(); mch_handler& operator=(const mch_handler&) = delete;
mch_handler& operator=(mch_handler&&) = delete;
bool initiated; bool init(std::string m1u_multiaddr_, std::string m1u_if_addr_);
bool running; void handle_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr);
bool run_enable;
static const int MCH_THREAD_PRIO = 65; private:
gtpu* parent = nullptr;
pdcp_interface_gtpu *pdcp; pdcp_interface_gtpu* pdcp = nullptr;
srslte::log *gtpu_log; srslte::log* gtpu_log = nullptr;
int m1u_sd; std::string m1u_multiaddr;
int lcid_counter; std::string m1u_if_addr;
std::string m1u_multiaddr;
std::string m1u_if_addr; bool initiated = false;
int m1u_sd = -1;
srslte::byte_buffer_pool *pool; int lcid_counter = 0;
}; };
// MCH thread insteance // MCH thread insteance
mch_thread mchthread; mch_handler mch;
typedef struct{ typedef struct {
uint32_t teids_in[SRSENB_N_RADIO_BEARERS]; uint32_t teids_in[SRSENB_N_RADIO_BEARERS];
uint32_t teids_out[SRSENB_N_RADIO_BEARERS]; uint32_t teids_out[SRSENB_N_RADIO_BEARERS];
uint32_t spgw_addrs[SRSENB_N_RADIO_BEARERS]; uint32_t spgw_addrs[SRSENB_N_RADIO_BEARERS];
@ -113,7 +111,7 @@ private:
std::map<uint16_t, bearer_map> rnti_bearers; std::map<uint16_t, bearer_map> rnti_bearers;
// Socket file descriptor // Socket file descriptor
int fd; int fd = -1;
void echo_response(in_addr_t addr, in_port_t port, uint16_t seq); void echo_response(in_addr_t addr, in_port_t port, uint16_t seq);

@ -172,6 +172,8 @@ void enb_stack_lte::stop()
void enb_stack_lte::stop_impl() void enb_stack_lte::stop_impl()
{ {
rx_sockets->stop();
s1ap.stop(); s1ap.stop();
gtpu.stop(); gtpu.stop();
mac.stop(); mac.stop();
@ -186,8 +188,6 @@ void enb_stack_lte::stop_impl()
mac_pcap.close(); mac_pcap.close();
} }
rx_sockets->stop();
// erasing the queues is the last thing, bc we need them to call stop_impl() // erasing the queues is the last thing, bc we need them to call stop_impl()
pending_tasks.erase_queue(sync_queue_id); pending_tasks.erase_queue(sync_queue_id);
pending_tasks.erase_queue(enb_queue_id); pending_tasks.erase_queue(enb_queue_id);
@ -252,4 +252,13 @@ void enb_stack_lte::add_gtpu_socket(int fd)
rx_sockets->add_socket_pdu_handler(fd, gtpu_rx_handler); rx_sockets->add_socket_pdu_handler(fd, gtpu_rx_handler);
} }
void enb_stack_lte::add_gtpu_mch_socket(int fd)
{
auto gtpu_mch_handler = [this](srslte::unique_byte_buffer_t pdu, const sockaddr_in& from) {
auto task_handler = [this, from](task_t* t) { gtpu.handle_gtpu_mch_rx_packet(std::move(t->pdu), from); };
pending_tasks.push(gtpu_queue_id, task_t{task_handler, std::move(pdu)});
};
rx_sockets->add_socket_pdu_handler(fd, gtpu_mch_handler);
}
} // namespace srsenb } // namespace srsenb

@ -30,13 +30,9 @@
using namespace srslte; using namespace srslte;
namespace srsenb { namespace srsenb {
gtpu::gtpu() : mchthread() gtpu::gtpu() : mch(this)
{ {
pdcp = NULL; pthread_mutex_init(&mutex, nullptr);
gtpu_log = NULL;
pool = NULL;
pthread_mutex_init(&mutex, NULL);
} }
bool gtpu::init(std::string gtp_bind_addr_, bool gtpu::init(std::string gtp_bind_addr_,
@ -83,35 +79,20 @@ bool gtpu::init(std::string gtp_bind_addr_,
return false; return false;
} }
run_enable = true;
running = true;
stack->add_gtpu_socket(fd); stack->add_gtpu_socket(fd);
// Start MCH thread if enabled // Start MCH socket if enabled
enable_mbsfn = enable_mbsfn_; enable_mbsfn = enable_mbsfn_;
if (enable_mbsfn) { if (enable_mbsfn) {
mchthread.init(m1u_multiaddr_, m1u_if_addr_, pdcp, gtpu_log); if (not mch.init(m1u_multiaddr_, m1u_if_addr_)) {
return false;
}
} }
return true; return true;
} }
void gtpu::stop() void gtpu::stop()
{ {
if(enable_mbsfn){
mchthread.stop();
}
if (run_enable) {
run_enable = false;
// Wait thread to exit gracefully otherwise might leave a mutex locked
int cnt=0;
while(running && cnt<100) {
usleep(10000);
cnt++;
}
}
running = false;
if (fd) { if (fd) {
close(fd); close(fd);
} }
@ -254,6 +235,11 @@ void gtpu::handle_gtpu_rx_packet(srslte::unique_byte_buffer_t pdu, const sockadd
} }
} }
void gtpu::handle_gtpu_mch_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr)
{
mch.handle_rx_packet(std::move(pdu), addr);
}
void gtpu::echo_response(in_addr_t addr, in_port_t port, uint16_t seq) void gtpu::echo_response(in_addr_t addr, in_port_t port, uint16_t seq)
{ {
gtpu_log->info("TX GTPU Echo Response, Seq: %d\n", seq); gtpu_log->info("TX GTPU Echo Response, Seq: %d\n", seq);
@ -294,21 +280,28 @@ void gtpu::rntilcid_to_teidin(uint16_t rnti, uint16_t lcid, uint32_t *teidin)
*teidin = (rnti << 16) | lcid; *teidin = (rnti << 16) | lcid;
} }
/**************************************************************************** /****************************************************************************
* Class to run the MCH thread * Class to run the MCH thread
***************************************************************************/ ***************************************************************************/
bool gtpu::mch_thread::init(std::string m1u_multiaddr_, std::string m1u_if_addr_, pdcp_interface_gtpu *pdcp, srslte::log *gtpu_log)
gtpu::mch_handler::~mch_handler()
{ {
pool = byte_buffer_pool::get_instance(); if (initiated) {
this->pdcp = pdcp; close(m1u_sd);
this->gtpu_log = gtpu_log; initiated = false;
m1u_multiaddr = m1u_multiaddr_; }
m1u_if_addr = m1u_if_addr_; }
struct sockaddr_in bindaddr; bool gtpu::mch_handler::init(std::string m1u_multiaddr_, std::string m1u_if_addr_)
{
m1u_multiaddr = std::move(m1u_multiaddr_);
m1u_if_addr = std::move(m1u_if_addr_);
pdcp = parent->pdcp;
gtpu_log = parent->gtpu_log;
// Set up sink socket // Set up sink socket
struct sockaddr_in bindaddr {
};
m1u_sd = socket(AF_INET, SOCK_DGRAM, 0); m1u_sd = socket(AF_INET, SOCK_DGRAM, 0);
if (m1u_sd < 0) { if (m1u_sd < 0) {
gtpu_log->error("Failed to create M1-U sink socket\n"); gtpu_log->error("Failed to create M1-U sink socket\n");
@ -316,101 +309,42 @@ bool gtpu::mch_thread::init(std::string m1u_multiaddr_, std::string m1u_if_addr_
} }
/* Bind socket */ /* Bind socket */
bzero((char *)&bindaddr, sizeof(struct sockaddr_in));
bindaddr.sin_family = AF_INET; bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY); //Multicast sockets require bind to INADDR_ANY bindaddr.sin_addr.s_addr = htonl(INADDR_ANY); // Multicast sockets require bind to INADDR_ANY
bindaddr.sin_port = htons(GTPU_PORT+1); bindaddr.sin_port = htons(GTPU_PORT + 1);
size_t addrlen = sizeof(bindaddr); if (bind(m1u_sd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) < 0) {
if (bind(m1u_sd, (struct sockaddr *) &bindaddr, sizeof(bindaddr)) < 0) {
gtpu_log->error("Failed to bind multicast socket\n"); gtpu_log->error("Failed to bind multicast socket\n");
return false; return false;
} }
/* Send an ADD MEMBERSHIP message via setsockopt */ /* Send an ADD MEMBERSHIP message via setsockopt */
struct ip_mreq mreq; struct ip_mreq mreq {
mreq.imr_multiaddr.s_addr = inet_addr(m1u_multiaddr.c_str()); //Multicast address of the service };
mreq.imr_interface.s_addr = inet_addr(m1u_if_addr.c_str()); //Address of the IF the socket will listen to. mreq.imr_multiaddr.s_addr = inet_addr(m1u_multiaddr.c_str()); // Multicast address of the service
if (setsockopt(m1u_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, mreq.imr_interface.s_addr = inet_addr(m1u_if_addr.c_str()); // Address of the IF the socket will listen to.
&mreq, sizeof(mreq)) < 0) { if (setsockopt(m1u_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
gtpu_log->error("Register musticast group for M1-U\n"); gtpu_log->error("Register musticast group for M1-U\n");
gtpu_log->error("M1-U infterface IP: %s, M1-U Multicast Address %s\n", m1u_if_addr.c_str(),m1u_multiaddr.c_str()); gtpu_log->error("M1-U infterface IP: %s, M1-U Multicast Address %s\n", m1u_if_addr.c_str(),m1u_multiaddr.c_str());
return false; return false;
} }
gtpu_log->info("M1-U initialized\n"); gtpu_log->info("M1-U initialized\n");
initiated = true; initiated = true;
lcid_counter = 1; lcid_counter = 1;
// Start thread // Register socket in stack rx sockets thread
start(MCH_THREAD_PRIO); parent->stack->add_gtpu_mch_socket(m1u_sd);
return true; return true;
} }
void gtpu::mch_thread::run_thread() void gtpu::mch_handler::handle_rx_packet(srslte::unique_byte_buffer_t pdu, const sockaddr_in& addr)
{ {
if (!initiated) { gtpu_log->debug("Received %d bytes from M1-U interface\n", pdu->N_bytes);
ERROR("Fatal error running mch_thread without initialization\n");
return;
}
unique_byte_buffer_t pdu = allocate_unique_buffer(*pool); gtpu_header_t header;
int n; gtpu_read_header(pdu.get(), &header, gtpu_log);
socklen_t addrlen; pdcp->write_sdu(SRSLTE_MRNTI, lcid_counter++, std::move(pdu));
sockaddr_in src_addr;
bzero((char *)&src_addr, sizeof(src_addr));
src_addr.sin_family = AF_INET;
src_addr.sin_addr.s_addr = htonl(INADDR_ANY);
src_addr.sin_port = htons(GTPU_PORT+1);
addrlen = sizeof(src_addr);
run_enable = true;
running=true;
// Warning: Use mutex here if creating multiple services each with a different thread
uint16_t lcid = lcid_counter;
lcid_counter++;
while(run_enable) {
pdu->clear();
do{
n = recvfrom(m1u_sd, pdu->msg, SRSENB_MAX_BUFFER_SIZE_BYTES - SRSENB_BUFFER_HEADER_OFFSET, 0, (struct sockaddr *) &src_addr, &addrlen);
} while (n == -1 && errno == EAGAIN);
gtpu_log->debug("Received %d bytes from M1-U interface\n", n);
pdu->N_bytes = (uint32_t) n;
gtpu_header_t header;
gtpu_read_header(pdu.get(), &header, gtpu_log);
pdcp->write_sdu(SRSLTE_MRNTI, lcid, std::move(pdu));
do {
pdu = allocate_unique_buffer(*pool);
if (!pdu.get()) {
gtpu_log->console("GTPU Buffer pool empty. Trying again...\n");
usleep(10000);
}
} while (!pdu.get());
}
running = false;
}
void gtpu::mch_thread::stop()
{
if (run_enable) {
run_enable = false;
// Wait thread to exit gracefully otherwise might leave a mutex locked
int cnt = 0;
while(running && cnt < 100) {
usleep(10000);
cnt++;
}
if (running) {
thread_cancel();
}
wait_thread_finish();
}
} }
} // namespace srsenb } // namespace srsenb

Loading…
Cancel
Save