Refactor of socket manager class

- use of inheritance to simplify testing
- removal of global network manager
- pass of custon socket manager to s1ap and gtpu ctors
- overhauled the registration of socket fd,callback in socket manager
master
Francisco 4 years ago committed by Francisco Paisana
parent 4aac7ac238
commit c24e382c19

@ -45,11 +45,11 @@ template <typename R, typename... Args>
class oper_table_t class oper_table_t
{ {
public: public:
constexpr oper_table_t() = default; constexpr oper_table_t() = default;
virtual R call(void* src, Args&&... args) const = 0; virtual R call(void* src, Args... args) const = 0;
virtual void move(void* src, void* dest) const = 0; virtual void move(void* src, void* dest) const = 0;
virtual void dtor(void* src) const = 0; virtual void dtor(void* src) const = 0;
virtual bool is_in_small_buffer() const = 0; virtual bool is_in_small_buffer() const = 0;
}; };
//! specialization of move/call/destroy operations for when the "move_callback<R(Args...)>" is empty //! specialization of move/call/destroy operations for when the "move_callback<R(Args...)>" is empty
@ -58,7 +58,7 @@ class empty_table_t : public oper_table_t<R, Args...>
{ {
public: public:
constexpr empty_table_t() = default; constexpr empty_table_t() = default;
R call(void* src, Args&&... args) const final R call(void* src, Args... args) const final
{ {
srsran_terminate("ERROR: bad function call (cause: function ptr is empty)"); srsran_terminate("ERROR: bad function call (cause: function ptr is empty)");
} }
@ -73,7 +73,7 @@ class smallbuffer_table_t : public oper_table_t<R, Args...>
{ {
public: public:
constexpr smallbuffer_table_t() = default; constexpr smallbuffer_table_t() = default;
R call(void* src, Args&&... args) const final { return (*static_cast<FunT*>(src))(std::forward<Args>(args)...); } R call(void* src, Args... args) const final { return (*static_cast<FunT*>(src))(std::forward<Args>(args)...); }
void move(void* src, void* dest) const final void move(void* src, void* dest) const final
{ {
::new (dest) FunT(std::move(*static_cast<FunT*>(src))); ::new (dest) FunT(std::move(*static_cast<FunT*>(src)));
@ -89,7 +89,7 @@ class heap_table_t : public oper_table_t<R, Args...>
{ {
public: public:
constexpr heap_table_t() = default; constexpr heap_table_t() = default;
R call(void* src, Args&&... args) const final { return (**static_cast<FunT**>(src))(std::forward<Args>(args)...); } R call(void* src, Args... args) const final { return (**static_cast<FunT**>(src))(std::forward<Args>(args)...); }
void move(void* src, void* dest) const final void move(void* src, void* dest) const final
{ {
*static_cast<FunT**>(dest) = *static_cast<FunT**>(src); *static_cast<FunT**>(dest) = *static_cast<FunT**>(src);
@ -163,7 +163,7 @@ public:
return *this; return *this;
} }
R operator()(Args&&... args) const noexcept { return oper_ptr->call(&buffer, std::forward<Args>(args)...); } R operator()(Args... args) const noexcept { return oper_ptr->call(&buffer, std::forward<Args>(args)...); }
bool is_empty() const { return oper_ptr == &empty_table; } bool is_empty() const { return oper_ptr == &empty_table; }
bool is_in_small_buffer() const { return oper_ptr->is_in_small_buffer(); } bool is_in_small_buffer() const { return oper_ptr->is_in_small_buffer(); }

@ -14,20 +14,18 @@
#define SRSRAN_RX_SOCKET_HANDLER_H #define SRSRAN_RX_SOCKET_HANDLER_H
#include "srsran/common/buffer_pool.h" #include "srsran/common/buffer_pool.h"
#include "srsran/common/multiqueue.h"
#include "srsran/common/threads.h" #include "srsran/common/threads.h"
#include <arpa/inet.h> #include <arpa/inet.h>
#include <functional>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/sctp.h> #include <netinet/sctp.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <netinet/udp.h> #include <netinet/udp.h>
#include <queue>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> // for the pipe
namespace srsran { namespace srsran {
@ -97,40 +95,46 @@ bool sctp_init_server(unique_socket* socket, net_utils::socket_type socktype, co
* Rx multisocket handler * Rx multisocket handler
***************************/ ***************************/
class socket_manager_itf
{
public:
/// Callback called when socket fd (passed as argument) has data
using recv_callback_t = srsran::move_callback<bool(int)>;
explicit socket_manager_itf(srslog::basic_logger& logger_) : logger(logger_) {}
socket_manager_itf(socket_manager_itf&&) = delete;
socket_manager_itf(const socket_manager_itf&) = delete;
socket_manager_itf& operator=(const socket_manager_itf&) = delete;
socket_manager_itf& operator=(socket_manager_itf&&) = delete;
virtual ~socket_manager_itf() = default;
/// Register (fd, callback). callback is called within socket thread when fd has data.
virtual bool add_socket_handler(int fd, recv_callback_t handler) = 0;
/// remove registered socket fd
virtual bool remove_socket(int fd) = 0;
protected:
srslog::basic_logger& logger;
};
/** /**
* Description - Instantiates a thread that will block waiting for IO from multiple sockets, via a select * Description - Instantiates a thread that will block waiting for IO from multiple sockets, via a select
* The user can register their own (socket fd, data handler) in this class via the * The user can register their own (socket fd, data handler) in this class via the
* add_socket_handler(fd, task) API or its other variants * add_socket_handler(fd, task) API or its other variants
*/ */
class rx_multisocket_handler final : public thread class socket_manager final : public thread, public socket_manager_itf
{ {
using recv_callback_t = socket_manager_itf::recv_callback_t;
public: public:
// polymorphic callback to handle the socket recv socket_manager();
class recv_task ~socket_manager() final;
{
public:
virtual ~recv_task() = default;
virtual bool operator()(int fd) = 0; // returns false, if socket needs to be removed
};
using task_callback_t = std::unique_ptr<recv_task>;
using recvfrom_callback_t = std::function<void(srsran::unique_byte_buffer_t, const sockaddr_in&)>;
using sctp_recv_callback_t =
std::function<void(srsran::unique_byte_buffer_t, const sockaddr_in&, const sctp_sndrcvinfo&, int)>;
rx_multisocket_handler();
rx_multisocket_handler(rx_multisocket_handler&&) = delete;
rx_multisocket_handler(const rx_multisocket_handler&) = delete;
rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete;
rx_multisocket_handler& operator=(rx_multisocket_handler&&) = delete;
~rx_multisocket_handler() final;
void stop(); void stop();
bool remove_socket_nonblocking(int fd, bool signal_completion = false); bool remove_socket_nonblocking(int fd, bool signal_completion = false);
bool remove_socket(int fd); bool remove_socket(int fd) final;
bool add_socket_handler(int fd, task_callback_t handler); bool add_socket_handler(int fd, recv_callback_t handler) final;
// convenience methods for recv using buffer pool
bool add_socket_pdu_handler(int fd, recvfrom_callback_t pdu_task);
bool add_socket_sctp_pdu_handler(int fd, sctp_recv_callback_t task);
void run_thread() override; void run_thread() override;
@ -144,22 +148,43 @@ private:
int new_fd = -1; int new_fd = -1;
bool signal_rm_complete = false; bool signal_rm_complete = false;
}; };
std::map<int, rx_multisocket_handler::task_callback_t>::iterator std::map<int, recv_callback_t>::iterator remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd);
remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd);
// args
srslog::basic_logger& logger;
// state // state
std::mutex socket_mutex; std::mutex socket_mutex;
std::map<int, task_callback_t> active_sockets; std::map<int, recv_callback_t> active_sockets;
bool running = false; bool running = false;
int pipefd[2] = {}; int pipefd[2] = {-1, -1};
std::vector<int> rem_fd_tmp_list; std::vector<int> rem_fd_tmp_list;
std::condition_variable rem_cvar; std::condition_variable rem_cvar;
}; };
rx_multisocket_handler& get_stack_socket_manager(); /// Function signature for SDU byte buffers received from SCTP socket
using sctp_recv_callback_t =
srsran::move_callback<void(srsran::unique_byte_buffer_t, const sockaddr_in&, const sctp_sndrcvinfo&, int)>;
/// Function signature for SDU byte buffers received from any sockaddr_in-based socket
using recvfrom_callback_t = srsran::move_callback<void(srsran::unique_byte_buffer_t, const sockaddr_in&)>;
/**
* Helper function that creates a callback that is called when a SCTP socket has data, and does the following tasks:
* 1. receive SDU byte buffer from SCTP socket and associated metadata - sockaddr_in, sctp_sndrcvinfo, flags
* 2. dispatches the received SDU+metadata+rx_callback into the "queue"
* 3. potentially on a separate thread, the SDU+metadata+callback are popped from the queue, and callback is called with
* the SDU+metadata as arguments
* @param logger logger used by recv_callback_t to log any failure/reception of an SDU
* @param queue queue to which the SDU+metadata+callback are going to be dispatched
* @param rx_callback callback that is run when a new SDU arrives, from the thread that calls queue.pop()
* @return callback void(int) that can be registered in socket_manager
*/
socket_manager_itf::recv_callback_t
make_sctp_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, sctp_recv_callback_t rx_callback);
/**
* Similar to make_sctp_sdu_handler, but for any sockaddr_in-based socket type
*/
socket_manager_itf::recv_callback_t
make_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, recvfrom_callback_t rx_callback);
} // namespace srsran } // namespace srsran

@ -15,6 +15,7 @@
#include <netinet/sctp.h> #include <netinet/sctp.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> // for the pipe
#define rxSockError(fmt, ...) logger.error("RxSockets: " fmt, ##__VA_ARGS__) #define rxSockError(fmt, ...) logger.error("RxSockets: " fmt, ##__VA_ARGS__)
#define rxSockWarn(fmt, ...) logger.warning("RxSockets: " fmt, ##__VA_ARGS__) #define rxSockWarn(fmt, ...) logger.warning("RxSockets: " fmt, ##__VA_ARGS__)
@ -321,110 +322,23 @@ bool sctp_init_server(unique_socket* socket, net_utils::socket_type socktype, co
} // namespace net_utils } // namespace net_utils
/***************************************************************
* Rx Multisocket Task Types
**************************************************************/
/**
* Description: Specialization of recv_task for the case the received data is
* in the form of unique_byte_buffer, and a recvfrom(...) call is used
*/
class recvfrom_pdu_task final : public rx_multisocket_handler::recv_task
{
public:
using callback_t = std::function<void(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from)>;
explicit recvfrom_pdu_task(srslog::basic_logger& logger, callback_t func_) : logger(logger), func(std::move(func_)) {}
bool operator()(int fd) override
{
srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer();
if (pdu == nullptr) {
logger.error("Unable to allocate byte buffer");
return true;
}
sockaddr_in from = {};
socklen_t fromlen = sizeof(from);
ssize_t n_recv = recvfrom(fd, pdu->msg, pdu->get_tailroom(), 0, (struct sockaddr*)&from, &fromlen);
if (n_recv == -1 and errno != EAGAIN) {
logger.error("Error reading from socket: %s", strerror(errno));
return true;
}
if (n_recv == -1 and errno == EAGAIN) {
logger.debug("Socket timeout reached");
return true;
}
pdu->N_bytes = static_cast<uint32_t>(n_recv);
func(std::move(pdu), from);
return true;
}
private:
srslog::basic_logger& logger;
callback_t func;
};
class sctp_recvmsg_pdu_task final : public rx_multisocket_handler::recv_task
{
public:
using callback_t = std::function<
void(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags)>;
explicit sctp_recvmsg_pdu_task(srslog::basic_logger& logger, callback_t func_) :
logger(logger), func(std::move(func_))
{}
bool operator()(int fd) override
{
// inside rx_sockets thread. Read socket
srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer();
if (pdu == nullptr) {
logger.error("Unable to allocate byte buffer");
return true;
}
sockaddr_in from = {};
socklen_t fromlen = sizeof(from);
sctp_sndrcvinfo sri = {};
int flags = 0;
ssize_t n_recv = sctp_recvmsg(fd, pdu->msg, pdu->get_tailroom(), (struct sockaddr*)&from, &fromlen, &sri, &flags);
if (n_recv == -1 and errno != EAGAIN) {
logger.error("Error reading from SCTP socket: %s", strerror(errno));
return true;
}
if (n_recv == -1 and errno == EAGAIN) {
logger.debug("Socket timeout reached");
return true;
}
bool ret = true;
pdu->N_bytes = static_cast<uint32_t>(n_recv);
// SCTP notifications handled in callback.
func(std::move(pdu), from, sri, flags);
return ret;
}
private:
srslog::basic_logger& logger;
callback_t func;
};
/*************************************************************** /***************************************************************
* Rx Multisocket Handler * Rx Multisocket Handler
**************************************************************/ **************************************************************/
rx_multisocket_handler::rx_multisocket_handler() : thread("RXsockets"), logger(srslog::fetch_basic_logger("COMN")) socket_manager::socket_manager() : thread("RXsockets"), socket_manager_itf(srslog::fetch_basic_logger("COMN"))
{ {
// register control pipe fd // register control pipe fd
srsran_assert(pipe(pipefd) != -1, "Failed to open control pipe"); srsran_assert(pipe(pipefd) != -1, "Failed to open control pipe");
start(thread_prio); start(thread_prio);
} }
rx_multisocket_handler::~rx_multisocket_handler() socket_manager::~socket_manager()
{ {
stop(); stop();
} }
void rx_multisocket_handler::stop() void socket_manager::stop()
{ {
if (running) { if (running) {
// close thread // close thread
@ -449,27 +363,7 @@ void rx_multisocket_handler::stop()
} }
} }
/** bool socket_manager::add_socket_handler(int fd, recv_callback_t handler)
* Convenience method for read PDUs from socket
*/
bool rx_multisocket_handler::add_socket_pdu_handler(int fd, recvfrom_callback_t pdu_task)
{
std::unique_ptr<srsran::rx_multisocket_handler::recv_task> task;
task.reset(new srsran::recvfrom_pdu_task(logger, std::move(pdu_task)));
return add_socket_handler(fd, std::move(task));
}
/**
* Convenience method for reading PDUs from SCTP socket
*/
bool rx_multisocket_handler::add_socket_sctp_pdu_handler(int fd, sctp_recv_callback_t pdu_task)
{
srsran::rx_multisocket_handler::task_callback_t task;
task.reset(new srsran::sctp_recvmsg_pdu_task(logger, std::move(pdu_task)));
return add_socket_handler(fd, std::move(task));
}
bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler)
{ {
std::lock_guard<std::mutex> lock(socket_mutex); std::lock_guard<std::mutex> lock(socket_mutex);
if (fd < 0) { if (fd < 0) {
@ -481,7 +375,7 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler)
return false; return false;
} }
active_sockets.insert(std::pair<const int, task_callback_t>(fd, std::move(handler))); active_sockets.insert(std::make_pair(fd, std::move(handler)));
// this unlocks the reading thread to add new connections // this unlocks the reading thread to add new connections
ctrl_cmd_t msg; ctrl_cmd_t msg;
@ -496,7 +390,7 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler)
return true; return true;
} }
bool rx_multisocket_handler::remove_socket_nonblocking(int fd, bool signal_completion) bool socket_manager::remove_socket_nonblocking(int fd, bool signal_completion)
{ {
std::lock_guard<std::mutex> lock(socket_mutex); std::lock_guard<std::mutex> lock(socket_mutex);
auto it = active_sockets.find(fd); auto it = active_sockets.find(fd);
@ -516,7 +410,7 @@ bool rx_multisocket_handler::remove_socket_nonblocking(int fd, bool signal_compl
return true; return true;
} }
bool rx_multisocket_handler::remove_socket(int fd) bool socket_manager::remove_socket(int fd)
{ {
bool result = remove_socket_nonblocking(fd, true); bool result = remove_socket_nonblocking(fd, true);
@ -531,8 +425,8 @@ bool rx_multisocket_handler::remove_socket(int fd)
return result; return result;
} }
std::map<int, rx_multisocket_handler::task_callback_t>::iterator std::map<int, socket_manager::recv_callback_t>::iterator
rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd) socket_manager::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd)
{ {
if (fd < 0) { if (fd < 0) {
rxSockError("fd to be removed is not valid"); rxSockError("fd to be removed is not valid");
@ -547,7 +441,7 @@ rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set,
return it; return it;
} }
void rx_multisocket_handler::run_thread() void socket_manager::run_thread()
{ {
running = true; running = true;
fd_set total_fd_set, read_fd_set; fd_set total_fd_set, read_fd_set;
@ -576,13 +470,13 @@ void rx_multisocket_handler::run_thread()
// call read callback for all SCTP/TCP/UDP connections // call read callback for all SCTP/TCP/UDP connections
for (auto handler_it = active_sockets.begin(); handler_it != active_sockets.end();) { for (auto handler_it = active_sockets.begin(); handler_it != active_sockets.end();) {
int fd = handler_it->first; int fd = handler_it->first;
recv_task* callback = handler_it->second.get(); recv_callback_t& callback = handler_it->second;
if (not FD_ISSET(fd, &read_fd_set)) { if (not FD_ISSET(fd, &read_fd_set)) {
++handler_it; ++handler_it;
continue; continue;
} }
bool socket_valid = callback->operator()(fd); bool socket_valid = callback(fd);
if (not socket_valid) { if (not socket_valid) {
rxSockInfo("The socket fd=%d has been closed by peer", fd); rxSockInfo("The socket fd=%d has been closed by peer", fd);
handler_it = remove_socket_unprotected(fd, &total_fd_set, &max_fd); handler_it = remove_socket_unprotected(fd, &total_fd_set, &max_fd);
@ -626,10 +520,115 @@ void rx_multisocket_handler::run_thread()
} }
} }
rx_multisocket_handler& get_stack_socket_manager() /***************************************************************
* Rx Multisocket Task Types
**************************************************************/
class sctp_recvmsg_pdu_task
{
public:
using callback_t = sctp_recv_callback_t;
explicit sctp_recvmsg_pdu_task(srslog::basic_logger& logger, srsran::task_queue_handle& queue_, callback_t func_) :
logger(logger), queue(queue_), func(std::move(func_))
{}
bool operator()(int fd)
{
// inside rx_sockets thread. Read socket
srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer();
if (pdu == nullptr) {
logger.error("Unable to allocate byte buffer");
return true;
}
sockaddr_in from = {};
socklen_t fromlen = sizeof(from);
sctp_sndrcvinfo sri = {};
int flags = 0;
ssize_t n_recv = sctp_recvmsg(fd, pdu->msg, pdu->get_tailroom(), (struct sockaddr*)&from, &fromlen, &sri, &flags);
if (n_recv == -1 and errno != EAGAIN) {
logger.error("Error reading from SCTP socket: %s", strerror(errno));
return true;
}
if (n_recv == -1 and errno == EAGAIN) {
logger.debug("Socket timeout reached");
return true;
}
bool ret = true;
pdu->N_bytes = static_cast<uint32_t>(n_recv);
// Defer handling of received packet to provided queue
// SCTP notifications handled in callback.
queue.push(std::bind(
[this, from, sri, flags](srsran::unique_byte_buffer_t& sdu) { func(std::move(sdu), from, sri, flags); },
std::move(pdu)));
return ret;
}
private:
srslog::basic_logger& logger;
srsran::task_queue_handle& queue;
callback_t func;
};
socket_manager_itf::recv_callback_t
make_sctp_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, sctp_recv_callback_t rx_callback)
{
return socket_manager_itf::recv_callback_t(sctp_recvmsg_pdu_task(logger, queue, std::move(rx_callback)));
}
/**
* Description: Functor for the case the received data is
* in the form of unique_byte_buffer, and a recvfrom(...) call is used
*/
class recvfrom_pdu_task
{
public:
using callback_t = recvfrom_callback_t;
explicit recvfrom_pdu_task(srslog::basic_logger& logger, srsran::task_queue_handle& queue_, callback_t func_) :
logger(logger), queue(queue_), func(std::move(func_))
{}
bool operator()(int fd)
{
srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer();
if (pdu == nullptr) {
logger.error("Unable to allocate byte buffer");
return true;
}
sockaddr_in from = {};
socklen_t fromlen = sizeof(from);
ssize_t n_recv = recvfrom(fd, pdu->msg, pdu->get_tailroom(), 0, (struct sockaddr*)&from, &fromlen);
if (n_recv == -1 and errno != EAGAIN) {
logger.error("Error reading from socket: %s", strerror(errno));
return true;
}
if (n_recv == -1 and errno == EAGAIN) {
logger.debug("Socket timeout reached");
return true;
}
pdu->N_bytes = static_cast<uint32_t>(n_recv);
// Defer handling of received packet to provided queue
queue.push(
std::bind([this, from](srsran::unique_byte_buffer_t& sdu) { func(std::move(sdu), from); }, std::move(pdu)));
return true;
}
private:
srslog::basic_logger& logger;
srsran::task_queue_handle& queue;
callback_t func;
};
socket_manager_itf::recv_callback_t
make_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, recvfrom_callback_t rx_callback)
{ {
static rx_multisocket_handler handler; return socket_manager_itf::recv_callback_t(recvfrom_pdu_task(logger, queue, std::move(rx_callback)));
return handler;
} }
} // namespace srsran } // namespace srsran

@ -11,15 +11,31 @@
*/ */
#include "srsran/common/network_utils.h" #include "srsran/common/network_utils.h"
#include "srsran/common/task_scheduler.h"
#include "srsran/common/test_common.h"
#include <atomic>
#include <iostream> #include <iostream>
#define TESTASSERT(cond) \ struct rx_thread_tester {
do { \ srsran::task_scheduler task_sched;
if (!(cond)) { \ srsran::task_queue_handle task_queue;
std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ std::atomic<bool> stop_token;
return -1; \ std::thread t;
} \
} while (0) rx_thread_tester() :
task_queue(task_sched.make_task_queue()), t([this]() {
while (not stop_token.load()) {
task_sched.run_pending_tasks();
std::this_thread::yield();
}
})
{}
~rx_thread_tester()
{
stop_token.store(true);
t.join();
}
};
int test_socket_handler() int test_socket_handler()
{ {
@ -27,10 +43,10 @@ int test_socket_handler()
int counter = 0; int counter = 0;
srsran::unique_socket server_socket, client_socket, client_socket2; srsran::unique_socket server_socket, client_socket, client_socket2;
srsran::rx_multisocket_handler sockhandler; srsran::socket_manager sockhandler;
int server_port = 36412; int server_port = 36412;
const char* server_addr = "127.0.100.1"; const char* server_addr = "127.0.100.1";
using namespace srsran::net_utils; using namespace srsran::net_utils;
TESTASSERT(sctp_init_server(&server_socket, socket_type::seqpacket, server_addr, server_port)); TESTASSERT(sctp_init_server(&server_socket, socket_type::seqpacket, server_addr, server_port));
@ -50,7 +66,9 @@ int test_socket_handler()
counter++; counter++;
} }
}; };
sockhandler.add_socket_sctp_pdu_handler(server_socket.fd(), pdu_handler); rx_thread_tester rx_tester;
sockhandler.add_socket_handler(server_socket.fd(),
srsran::make_sctp_sdu_handler(logger, rx_tester.task_queue, pdu_handler));
uint8_t buf[128] = {}; uint8_t buf[128] = {};
int32_t nof_counts = 5; int32_t nof_counts = 5;

@ -105,15 +105,13 @@ private:
void run_thread() override; void run_thread() override;
void stop_impl(); void stop_impl();
void tti_clock_impl(); void tti_clock_impl();
void handle_mme_rx_packet(srsran::unique_byte_buffer_t pdu,
const sockaddr_in& from,
const sctp_sndrcvinfo& sri,
int flags);
// args // args
stack_args_t args = {}; stack_args_t args = {};
rrc_cfg_t rrc_cfg = {}; rrc_cfg_t rrc_cfg = {};
srsran::socket_manager rx_sockets;
srslog::basic_logger& mac_logger; srslog::basic_logger& mac_logger;
srslog::basic_logger& rlc_logger; srslog::basic_logger& rlc_logger;
srslog::basic_logger& pdcp_logger; srslog::basic_logger& pdcp_logger;

@ -52,7 +52,9 @@ public:
static const uint32_t ts1_reloc_prep_timeout_ms = 10000; static const uint32_t ts1_reloc_prep_timeout_ms = 10000;
static const uint32_t ts1_reloc_overall_timeout_ms = 10000; static const uint32_t ts1_reloc_overall_timeout_ms = 10000;
s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger); s1ap(srsran::task_sched_handle task_sched_,
srslog::basic_logger& logger,
srsran::socket_manager_itf* rx_socket_handler);
int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_); int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_);
void stop(); void stop();
void get_metrics(s1ap_metrics_t& m); void get_metrics(s1ap_metrics_t& m);
@ -109,11 +111,12 @@ private:
static const int NONUE_STREAM_ID = 0; static const int NONUE_STREAM_ID = 0;
// args // args
rrc_interface_s1ap* rrc = nullptr; rrc_interface_s1ap* rrc = nullptr;
s1ap_args_t args; s1ap_args_t args;
srslog::basic_logger& logger; srslog::basic_logger& logger;
srsran::task_sched_handle task_sched; srsran::task_sched_handle task_sched;
srsran::task_queue_handle mme_task_queue; srsran::task_queue_handle mme_task_queue;
srsran::socket_manager_itf* rx_socket_handler;
srsran::unique_socket mme_socket; srsran::unique_socket mme_socket;
struct sockaddr_in mme_addr = {}; // MME address struct sockaddr_in mme_addr = {}; // MME address

@ -34,7 +34,7 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) :
mac(&task_sched, mac_logger), mac(&task_sched, mac_logger),
rlc(rlc_logger), rlc(rlc_logger),
gtpu(&task_sched, gtpu_logger), gtpu(&task_sched, gtpu_logger),
s1ap(&task_sched, s1ap_logger), s1ap(&task_sched, s1ap_logger, &rx_sockets),
rrc(&task_sched), rrc(&task_sched),
mac_pcap(), mac_pcap(),
pending_stack_metrics(64) pending_stack_metrics(64)
@ -166,7 +166,7 @@ void enb_stack_lte::stop()
void enb_stack_lte::stop_impl() void enb_stack_lte::stop_impl()
{ {
srsran::get_stack_socket_manager().stop(); rx_sockets.stop();
s1ap.stop(); s1ap.stop();
gtpu.stop(); gtpu.stop();
@ -225,39 +225,21 @@ void enb_stack_lte::run_thread()
} }
} }
void enb_stack_lte::handle_mme_rx_packet(srsran::unique_byte_buffer_t pdu,
const sockaddr_in& from,
const sctp_sndrcvinfo& sri,
int flags)
{
// Defer the handling of MME packet to eNB stack main thread
auto task_handler = [this, from, sri, flags](srsran::unique_byte_buffer_t& t) {
s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags);
};
// Defer the handling of MME packet to main stack thread
mme_task_queue.push(std::bind(task_handler, std::move(pdu)));
}
void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd) void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd)
{ {
auto gtpu_s1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) { auto gtpu_s1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) {
auto task_handler = [this, from](srsran::unique_byte_buffer_t& t) { gtpu.handle_gtpu_s1u_rx_packet(std::move(pdu), from);
gtpu.handle_gtpu_s1u_rx_packet(std::move(t), from);
};
gtpu_task_queue.push(std::bind(task_handler, std::move(pdu)));
}; };
srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_s1u_handler);
rx_sockets.add_socket_handler(fd, srsran::make_sdu_handler(gtpu_logger, gtpu_task_queue, gtpu_s1u_handler));
} }
void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd) void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd)
{ {
auto gtpu_m1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) { auto gtpu_m1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) {
auto task_handler = [this, from](srsran::unique_byte_buffer_t& t) { gtpu.handle_gtpu_m1u_rx_packet(std::move(pdu), from);
gtpu.handle_gtpu_m1u_rx_packet(std::move(t), from);
};
gtpu_task_queue.push(std::bind(task_handler, std::move(pdu)));
}; };
srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_m1u_handler); rx_sockets.add_socket_handler(fd, srsran::make_sdu_handler(gtpu_logger, gtpu_task_queue, gtpu_m1u_handler));
} }
} // namespace srsenb } // namespace srsenb

@ -209,7 +209,7 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const
s1ap_ptr->mme_connect_timer.duration() / 1000); s1ap_ptr->mme_connect_timer.duration() / 1000);
srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n", srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n",
s1ap_ptr->mme_connect_timer.duration() / 1000); s1ap_ptr->mme_connect_timer.duration() / 1000);
srsran::get_stack_socket_manager().remove_socket(s1ap_ptr->mme_socket.get_socket()); s1ap_ptr->rx_socket_handler->remove_socket(s1ap_ptr->mme_socket.get_socket());
s1ap_ptr->mme_socket.close(); s1ap_ptr->mme_socket.close();
procInfo("S1AP socket closed."); procInfo("S1AP socket closed.");
s1ap_ptr->mme_connect_timer.run(); s1ap_ptr->mme_connect_timer.run();
@ -221,8 +221,10 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const
* S1AP class * S1AP class
*********************************************************/ *********************************************************/
s1ap::s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger) : s1ap::s1ap(srsran::task_sched_handle task_sched_,
s1setup_proc(this), logger(logger), task_sched(task_sched_) srslog::basic_logger& logger,
srsran::socket_manager_itf* rx_socket_handler_) :
s1setup_proc(this), logger(logger), task_sched(task_sched_), rx_socket_handler(rx_socket_handler_)
{ {
mme_task_queue = task_sched.make_task_queue(); mme_task_queue = task_sched.make_task_queue();
} }
@ -423,24 +425,6 @@ bool s1ap::is_mme_connected()
/* S1AP connection helpers /* S1AP connection helpers
********************************************************************************/ ********************************************************************************/
/// Callback that will run inside the Sockets thread, and is going to be called whenever a SDU is received from the MME
struct sctp_rx_packet_handler {
s1ap* s1ap_ptr;
srsran::task_queue_handle* task_queue;
sctp_rx_packet_handler(s1ap* ptr, srsran::task_queue_handle& task_queue_) : s1ap_ptr(ptr), task_queue(&task_queue_) {}
void operator()(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags)
{
// Defer the handling of MME packet to eNB stack main thread
auto packet_handler = [this, from, sri, flags](srsran::unique_byte_buffer_t& t) {
s1ap_ptr->handle_mme_rx_msg(std::move(t), from, sri, flags);
};
// Defer the handling of MME packet to main stack thread
task_queue->push(std::bind(packet_handler, std::move(pdu)));
}
};
bool s1ap::connect_mme() bool s1ap::connect_mme()
{ {
using namespace srsran::net_utils; using namespace srsran::net_utils;
@ -458,9 +442,14 @@ bool s1ap::connect_mme()
} }
logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd()); logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd());
// Assign a handler to rx MME packets (going to run in a different thread) // Assign a handler to rx MME packets
srsran::get_stack_socket_manager().add_socket_sctp_pdu_handler(mme_socket.fd(), auto rx_callback =
sctp_rx_packet_handler(this, mme_task_queue)); [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) {
// Defer the handling of MME packet to eNB stack main thread
handle_mme_rx_msg(std::move(pdu), from, sri, flags);
};
rx_socket_handler->add_socket_handler(mme_socket.fd(),
srsran::make_sctp_sdu_handler(logger, mme_task_queue, rx_callback));
logger.info("SCTP socket established with MME"); logger.info("SCTP socket established with MME");
return true; return true;
@ -519,13 +508,13 @@ bool s1ap::handle_mme_rx_msg(srsran::unique_byte_buffer_t pdu,
if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) { if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) {
logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id); logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id); srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id);
srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket()); rx_socket_handler->remove_socket(mme_socket.get_socket());
mme_socket.close(); mme_socket.close();
} else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE && } else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE &&
notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) { notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) {
logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id); logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id); srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id);
srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket()); rx_socket_handler->remove_socket(mme_socket.get_socket());
mme_socket.close(); mme_socket.close();
} }
} else if (pdu->N_bytes == 0) { } else if (pdu->N_bytes == 0) {

@ -62,6 +62,34 @@ struct mme_dummy {
srsran::unique_byte_buffer_t last_sdu; srsran::unique_byte_buffer_t last_sdu;
}; };
struct dummy_socket_manager : public srsran::socket_manager_itf {
dummy_socket_manager() : srsran::socket_manager_itf(srslog::fetch_basic_logger("TEST")) {}
/// Register (fd, callback). callback is called within socket thread when fd has data.
bool add_socket_handler(int fd, recv_callback_t handler) final
{
if (s1u_fd > 0) {
return false;
}
s1u_fd = fd;
callback = std::move(handler);
return true;
}
/// remove registered socket fd
bool remove_socket(int fd) final
{
if (s1u_fd < 0) {
return false;
}
s1u_fd = -1;
return true;
}
int s1u_fd;
recv_callback_t callback;
};
struct rrc_tester : public rrc_dummy { struct rrc_tester : public rrc_dummy {
void modify_erabs(uint16_t rnti, void modify_erabs(uint16_t rnti,
const asn1::s1ap::erab_modify_request_s& msg, const asn1::s1ap::erab_modify_request_s& msg,
@ -162,7 +190,8 @@ void test_s1ap_erab_setup(test_event event)
{ {
srsran::task_scheduler task_sched; srsran::task_scheduler task_sched;
srslog::basic_logger& logger = srslog::fetch_basic_logger("S1AP"); srslog::basic_logger& logger = srslog::fetch_basic_logger("S1AP");
s1ap s1ap_obj(&task_sched, logger); dummy_socket_manager rx_sockets;
s1ap s1ap_obj(&task_sched, logger, &rx_sockets);
rrc_tester rrc; rrc_tester rrc;
asn1::s1ap::s1ap_pdu_c s1ap_pdu; asn1::s1ap::s1ap_pdu_c s1ap_pdu;
srsran::unique_byte_buffer_t sdu; srsran::unique_byte_buffer_t sdu;

Loading…
Cancel
Save