use global rx multisocket handler, and remove stack-s1ap interface

master
Francisco 4 years ago committed by Francisco Paisana
parent 39de2efa69
commit 4aac7ac238

@ -117,7 +117,7 @@ public:
using sctp_recv_callback_t =
std::function<void(srsran::unique_byte_buffer_t, const sockaddr_in&, const sctp_sndrcvinfo&, int)>;
rx_multisocket_handler(std::string name_, srslog::basic_logger& logger, int thread_prio = 65);
rx_multisocket_handler();
rx_multisocket_handler(rx_multisocket_handler&&) = delete;
rx_multisocket_handler(const rx_multisocket_handler&) = delete;
rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete;
@ -135,6 +135,8 @@ public:
void run_thread() override;
private:
const int thread_prio = 65;
// used to unlock select
struct ctrl_cmd_t {
enum class cmd_id_t { EXIT, NEW_FD, RM_FD };
@ -146,7 +148,6 @@ private:
remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd);
// args
std::string name;
srslog::basic_logger& logger;
// state
@ -158,6 +159,8 @@ private:
std::condition_variable rem_cvar;
};
rx_multisocket_handler& get_stack_socket_manager();
} // namespace srsran
#endif // SRSRAN_RX_SOCKET_HANDLER_H

@ -123,7 +123,8 @@ public:
{
sched->defer_callback(duration_ms, std::forward<F>(func));
}
void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); }
void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); }
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); }
private:
task_scheduler* sched;

@ -20,12 +20,6 @@
namespace srsenb {
class stack_interface_phy_lte;
class stack_interface_s1ap_lte
{
public:
virtual void add_mme_socket(int fd) = 0;
virtual void remove_mme_socket(int fd) = 0;
};
class stack_interface_gtpu_lte
{

@ -16,10 +16,10 @@
#include <sys/socket.h>
#include <sys/types.h>
#define rxSockError(fmt, ...) logger.error("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockWarn(fmt, ...) logger.warning("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockInfo(fmt, ...) logger.info("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockDebug(fmt, ...) logger.debug("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockError(fmt, ...) logger.error("RxSockets: " fmt, ##__VA_ARGS__)
#define rxSockWarn(fmt, ...) logger.warning("RxSockets: " fmt, ##__VA_ARGS__)
#define rxSockInfo(fmt, ...) logger.info("RxSockets: " fmt, ##__VA_ARGS__)
#define rxSockDebug(fmt, ...) logger.debug("RxSockets: " fmt, ##__VA_ARGS__)
namespace srsran {
@ -412,14 +412,10 @@ private:
* Rx Multisocket Handler
**************************************************************/
rx_multisocket_handler::rx_multisocket_handler(std::string name_, srslog::basic_logger& logger, int thread_prio) :
thread(name_), name(std::move(name_)), logger(logger)
rx_multisocket_handler::rx_multisocket_handler() : thread("RXsockets"), logger(srslog::fetch_basic_logger("COMN"))
{
// register control pipe fd
if (pipe(pipefd) == -1) {
rxSockInfo("Failed to open control pipe");
return;
}
srsran_assert(pipe(pipefd) != -1, "Failed to open control pipe");
start(thread_prio);
}
@ -630,4 +626,10 @@ void rx_multisocket_handler::run_thread()
}
}
rx_multisocket_handler& get_stack_socket_manager()
{
static rx_multisocket_handler handler;
return handler;
}
} // namespace srsran

@ -28,7 +28,7 @@ int test_socket_handler()
int counter = 0;
srsran::unique_socket server_socket, client_socket, client_socket2;
srsran::rx_multisocket_handler sockhandler("RXSOCKETS", logger);
srsran::rx_multisocket_handler sockhandler;
int server_port = 36412;
const char* server_addr = "127.0.100.1";
using namespace srsran::net_utils;

@ -35,7 +35,6 @@ namespace srsenb {
class enb_stack_lte final : public enb_stack_base,
public stack_interface_phy_lte,
public stack_interface_s1ap_lte,
public stack_interface_gtpu_lte,
public srsran::thread
{
@ -97,9 +96,6 @@ public:
}
void tti_clock() override;
/* STACK-S1AP interface*/
void add_mme_socket(int fd) override;
void remove_mme_socket(int fd) override;
void add_gtpu_s1u_socket_handler(int fd) override;
void add_gtpu_m1u_socket_handler(int fd) override;
@ -135,9 +131,6 @@ private:
srsran::task_scheduler task_sched;
srsran::task_queue_handle enb_task_queue, gtpu_task_queue, mme_task_queue, sync_task_queue;
// components that layers depend on (need to be destroyed after layers)
std::unique_ptr<srsran::rx_multisocket_handler> rx_sockets;
srsenb::mac mac;
srsenb::rlc rlc;
srsenb::pdcp pdcp;

@ -53,7 +53,7 @@ public:
static const uint32_t ts1_reloc_overall_timeout_ms = 10000;
s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger);
int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interface_s1ap_lte* stack_);
int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_);
void stop();
void get_metrics(s1ap_metrics_t& m);
@ -109,11 +109,11 @@ private:
static const int NONUE_STREAM_ID = 0;
// args
rrc_interface_s1ap* rrc = nullptr;
s1ap_args_t args;
srslog::basic_logger& logger;
srsenb::stack_interface_s1ap_lte* stack = nullptr;
srsran::task_sched_handle task_sched;
rrc_interface_s1ap* rrc = nullptr;
s1ap_args_t args;
srslog::basic_logger& logger;
srsran::task_sched_handle task_sched;
srsran::task_queue_handle mme_task_queue;
srsran::unique_socket mme_socket;
struct sockaddr_in mme_addr = {}; // MME address

@ -109,9 +109,6 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
s1ap.start_pcap(&s1ap_pcap);
}
// Init Rx socket handler
rx_sockets.reset(new srsran::rx_multisocket_handler("ENBSOCKETS", stack_logger));
// add sync queue
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
@ -126,7 +123,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
stack_logger.error("Couldn't initialize RRC");
return SRSRAN_ERROR;
}
if (s1ap.init(args.s1ap, &rrc, this) != SRSRAN_SUCCESS) {
if (s1ap.init(args.s1ap, &rrc) != SRSRAN_SUCCESS) {
stack_logger.error("Couldn't initialize S1AP");
return SRSRAN_ERROR;
}
@ -169,7 +166,7 @@ void enb_stack_lte::stop()
void enb_stack_lte::stop_impl()
{
rx_sockets->stop();
srsran::get_stack_socket_manager().stop();
s1ap.stop();
gtpu.stop();
@ -241,21 +238,6 @@ void enb_stack_lte::handle_mme_rx_packet(srsran::unique_byte_buffer_t pdu,
mme_task_queue.push(std::bind(task_handler, std::move(pdu)));
}
void enb_stack_lte::add_mme_socket(int fd)
{
// Pass MME Rx packet handler functor to socket handler to run in socket thread
auto mme_rx_handler =
[this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) {
handle_mme_rx_packet(std::move(pdu), from, sri, flags);
};
rx_sockets->add_socket_sctp_pdu_handler(fd, mme_rx_handler);
}
void enb_stack_lte::remove_mme_socket(int fd)
{
rx_sockets->remove_socket(fd);
}
void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd)
{
auto gtpu_s1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) {
@ -264,7 +246,7 @@ void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd)
};
gtpu_task_queue.push(std::bind(task_handler, std::move(pdu)));
};
rx_sockets->add_socket_pdu_handler(fd, gtpu_s1u_handler);
srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_s1u_handler);
}
void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd)
@ -275,7 +257,7 @@ void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd)
};
gtpu_task_queue.push(std::bind(task_handler, std::move(pdu)));
};
rx_sockets->add_socket_pdu_handler(fd, gtpu_m1u_handler);
srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_m1u_handler);
}
} // namespace srsenb

@ -209,7 +209,7 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const
s1ap_ptr->mme_connect_timer.duration() / 1000);
srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n",
s1ap_ptr->mme_connect_timer.duration() / 1000);
s1ap_ptr->stack->remove_mme_socket(s1ap_ptr->mme_socket.get_socket());
srsran::get_stack_socket_manager().remove_socket(s1ap_ptr->mme_socket.get_socket());
s1ap_ptr->mme_socket.close();
procInfo("S1AP socket closed.");
s1ap_ptr->mme_connect_timer.run();
@ -223,13 +223,14 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const
s1ap::s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger) :
s1setup_proc(this), logger(logger), task_sched(task_sched_)
{}
{
mme_task_queue = task_sched.make_task_queue();
}
int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interface_s1ap_lte* stack_)
int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_)
{
rrc = rrc_;
args = args_;
stack = stack_;
rrc = rrc_;
args = args_;
build_tai_cgi();
@ -422,6 +423,24 @@ bool s1ap::is_mme_connected()
/* S1AP connection helpers
********************************************************************************/
/// Callback that will run inside the Sockets thread, and is going to be called whenever a SDU is received from the MME
struct sctp_rx_packet_handler {
s1ap* s1ap_ptr;
srsran::task_queue_handle* task_queue;
sctp_rx_packet_handler(s1ap* ptr, srsran::task_queue_handle& task_queue_) : s1ap_ptr(ptr), task_queue(&task_queue_) {}
void operator()(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags)
{
// Defer the handling of MME packet to eNB stack main thread
auto packet_handler = [this, from, sri, flags](srsran::unique_byte_buffer_t& t) {
s1ap_ptr->handle_mme_rx_msg(std::move(t), from, sri, flags);
};
// Defer the handling of MME packet to main stack thread
task_queue->push(std::bind(packet_handler, std::move(pdu)));
}
};
bool s1ap::connect_mme()
{
using namespace srsran::net_utils;
@ -440,7 +459,8 @@ bool s1ap::connect_mme()
logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd());
// Assign a handler to rx MME packets (going to run in a different thread)
stack->add_mme_socket(mme_socket.fd());
srsran::get_stack_socket_manager().add_socket_sctp_pdu_handler(mme_socket.fd(),
sctp_rx_packet_handler(this, mme_task_queue));
logger.info("SCTP socket established with MME");
return true;
@ -499,13 +519,13 @@ bool s1ap::handle_mme_rx_msg(srsran::unique_byte_buffer_t pdu,
if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) {
logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id);
stack->remove_mme_socket(mme_socket.get_socket());
srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket());
mme_socket.close();
} else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE &&
notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) {
logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id);
stack->remove_mme_socket(mme_socket.get_socket());
srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket());
mme_socket.close();
}
} else if (pdu->N_bytes == 0) {

@ -17,13 +17,6 @@
using namespace srsenb;
class stack_dummy : public srsenb::stack_interface_s1ap_lte
{
public:
void add_mme_socket(int fd) {}
void remove_mme_socket(int fd) {}
};
struct mme_dummy {
mme_dummy(const char* addr_str_, int port_) : addr_str(addr_str_), port(port_)
{
@ -171,7 +164,6 @@ void test_s1ap_erab_setup(test_event event)
srslog::basic_logger& logger = srslog::fetch_basic_logger("S1AP");
s1ap s1ap_obj(&task_sched, logger);
rrc_tester rrc;
stack_dummy stack;
asn1::s1ap::s1ap_pdu_c s1ap_pdu;
srsran::unique_byte_buffer_t sdu;
@ -190,7 +182,7 @@ void test_s1ap_erab_setup(test_event event)
args.mme_addr = mme_addr_str;
args.enb_name = "srsenb01";
TESTASSERT(s1ap_obj.init(args, &rrc, &stack) == SRSRAN_SUCCESS);
TESTASSERT(s1ap_obj.init(args, &rrc) == SRSRAN_SUCCESS);
run_s1_setup(s1ap_obj, mme);
add_rnti(s1ap_obj, mme);

Loading…
Cancel
Save