e2_agent: add e2 setup procedure that reconnects on failure

master
Piotr Gawlowicz 2 years ago committed by Justin Tallon
parent 82d2fd1973
commit 346a814498

@ -225,7 +225,7 @@ bool sctp_subscribe_to_events(int fd)
evnts.sctp_data_io_event = 1;
evnts.sctp_shutdown_event = 1;
evnts.sctp_address_event = 1;
if (setsockopt(fd, SOL_SOCKET, SCTP_EVENTS, &evnts, sizeof(evnts)) != 0) {
if (setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts)) != 0) {
srslog::fetch_basic_logger(LOGSERVICE).error("Failed to subscribe to SCTP_SHUTDOWN event: %s", strerror(errno));
perror("Could not register socket to SCTP events\n");
close(fd);

@ -370,6 +370,14 @@ enable = false
#auto_target_papr = 8
#ema_alpha = 0.0143
# Expert configuration options
#
# ric_ip: IP address of the RIC controller
# ric_port: Port of the RIC controller
# ric_bind_ip: Local IP address to bind for RIC connection
# ric_bind_port: Local port to bind for RIC connection
# max_ric_setup_retries: Maximum amount of retries to setup the RIC connection. If this value is exceeded, an alarm is written to the log. -1 means infinity.
# ric_connect_timer: Connection Retry Timer for RIC connection (seconds)
#####################################################################
[e2_agent]
enable = true
@ -377,6 +385,8 @@ enable = true
#ric_port = 36421
#ric_bind_ip = 127.0.0.1
#ric_bind_port = 36425
#max_ric_setup_retries = -1
#ric_connect_timer = 10
#####################################################################
# Expert configuration options

@ -234,6 +234,8 @@ void parse_args(all_args_t* args, int argc, char* argv[])
("e2_agent.ric_port", bpo::value<uint32_t>(&args->e2_agent.ric_port)->default_value(36421), "RIC port")
("e2_agent.ric_bind_ip", bpo::value<string>(&args->e2_agent.ric_bind_ip)->default_value("127.0.0.1"), "Local IP address to bind for RIC connection")
("e2_agent.ric_bind_port", bpo::value<uint32_t>(&args->e2_agent.ric_bind_port)->default_value(36425), "Local port to bind for RIC connection")
("e2_agent.max_ric_setup_retries", bpo::value<int32_t>(&args->e2_agent.max_ric_setup_retries)->default_value(-1), "Max RIC setup retries")
("e2_agent.ric_connect_timer", bpo::value<uint32_t>(&args->e2_agent.ric_connect_timer)->default_value(10), "Connection Retry Timer for RIC connection (seconds)")
/* Expert section */
("expert.metrics_period_secs", bpo::value<float>(&args->general.metrics_period_secs)->default_value(1.0), "Periodicity for metrics in seconds.")

@ -16,10 +16,12 @@
#include "srsgnb/hdr/stack/ric/e2ap.h"
#include "srsran/common/network_utils.h"
#include "srsran/common/stack_procedure.h"
#include "srsran/common/task_scheduler.h"
#include "srsran/common/threads.h"
#include "srsran/interfaces/e2_metrics_interface.h"
#include "srsran/srsran.h"
static const int e2ap_ppid = 70;
enum e2_msg_type_t {
@ -37,6 +39,8 @@ struct e2_agent_args_t {
uint32_t ric_port;
std::string ric_bind_ip;
uint32_t ric_bind_port;
int32_t max_ric_setup_retries;
uint32_t ric_connect_timer;
};
namespace srsenb {
@ -51,6 +55,7 @@ public:
void stop();
void run_thread();
void tic();
bool is_ric_connected();
// Send messages to RIC
bool send_sctp(srsran::unique_byte_buffer_t& buf);
@ -61,7 +66,8 @@ public:
// Handle messages received from RIC
bool
handle_e2_rx_msg(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags);
handle_ric_rx_msg(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags);
bool handle_e2_rx_pdu(srsran::byte_buffer_t* pdu);
bool handle_e2_init_msg(asn1::e2ap::init_msg_s& init_msg);
bool handle_e2_successful_outcome(asn1::e2ap::successful_outcome_s& successful_outcome);
bool handle_e2_unsuccessful_outcome(asn1::e2ap::unsuccessful_outcome_s& unsuccessful_outcome);
@ -76,16 +82,52 @@ public:
bool handle_reset_request(reset_request_s& reset_request);
private:
bool connect_ric();
bool setup_e2();
e2_agent_args_t _args;
srsran::task_scheduler task_sched;
srsran::task_queue_handle ric_rece_task_queue;
srsran::unique_socket ric_socket;
srsran::socket_manager rx_sockets;
srslog::basic_logger& logger;
struct sockaddr_in ric_addr = {}; // RIC address
bool running = false;
struct sockaddr_in ric_addr = {}; // RIC address
bool running = false;
bool ric_connected = false;
srsran::unique_timer ric_connect_timer;
srsran::unique_timer e2_setup_timeout;
srsenb::e2_interface_metrics* gnb_metrics = nullptr;
e2ap e2ap_;
// procedures
class e2_setup_proc_t
{
public:
struct e2setupresult {
bool success = false;
enum class cause_t { timeout, failure } cause;
};
struct e2connectresult {
bool success = false;
};
explicit e2_setup_proc_t(e2_agent* e2_agent_) : e2_agent_ptr(e2_agent_) {}
srsran::proc_outcome_t init();
srsran::proc_outcome_t step() { return srsran::proc_outcome_t::yield; }
srsran::proc_outcome_t react(const e2connectresult& event);
srsran::proc_outcome_t react(const e2setupresult& event);
void then(const srsran::proc_state_t& result);
const char* name() const { return "RIC Connection"; }
uint16_t connect_count = 0;
private:
srsran::proc_outcome_t start_ric_connection();
e2_agent* e2_agent_ptr = nullptr;
};
srsran::proc_t<e2_setup_proc_t> e2_setup_proc;
};
} // namespace srsenb

@ -12,74 +12,226 @@
#include "srsgnb/hdr/stack/ric/e2_agent.h"
#include "srsran/asn1/e2ap.h"
#include "srsran/common/standard_streams.h"
using namespace srsenb;
/*********************************************************
* RIC Connection
*********************************************************/
srsran::proc_outcome_t e2_agent::e2_setup_proc_t::init()
{
e2_agent_ptr->logger.info("Starting new RIC connection.");
connect_count++;
return start_ric_connection();
}
srsran::proc_outcome_t e2_agent::e2_setup_proc_t::start_ric_connection()
{
if (not e2_agent_ptr->running) {
e2_agent_ptr->logger.info("E2 Agent is not running anymore.");
return srsran::proc_outcome_t::error;
}
if (e2_agent_ptr->ric_connected) {
e2_agent_ptr->logger.info("E2 Agent is already connected to RIC");
return srsran::proc_outcome_t::success;
}
auto connect_callback = [this]() {
bool connected = e2_agent_ptr->connect_ric();
auto notify_result = [this, connected]() {
e2_setup_proc_t::e2connectresult res;
res.success = connected;
e2_agent_ptr->e2_setup_proc.trigger(res);
};
e2_agent_ptr->task_sched.notify_background_task_result(notify_result);
};
srsran::get_background_workers().push_task(connect_callback);
e2_agent_ptr->logger.debug("Connection to RIC requested.");
return srsran::proc_outcome_t::yield;
}
srsran::proc_outcome_t e2_agent::e2_setup_proc_t::react(const srsenb::e2_agent::e2_setup_proc_t::e2connectresult& event)
{
if (event.success) {
e2_agent_ptr->logger.info("Connected to RIC. Sending setup request.");
e2_agent_ptr->e2_setup_timeout.run();
if (not e2_agent_ptr->setup_e2()) {
e2_agent_ptr->logger.error("E2 setup failed. Exiting...");
srsran::console("E2 setup failed\n");
e2_agent_ptr->running = false;
return srsran::proc_outcome_t::error;
}
e2_agent_ptr->logger.info("E2 setup request sent. Waiting for response.");
return srsran::proc_outcome_t::yield;
}
e2_agent_ptr->logger.info("Could not connected to RIC. Aborting");
return srsran::proc_outcome_t::error;
}
srsran::proc_outcome_t e2_agent::e2_setup_proc_t::react(const srsenb::e2_agent::e2_setup_proc_t::e2setupresult& event)
{
if (e2_agent_ptr->e2_setup_timeout.is_running()) {
e2_agent_ptr->e2_setup_timeout.stop();
}
if (event.success) {
e2_agent_ptr->logger.info("E2 Setup procedure completed successfully");
return srsran::proc_outcome_t::success;
}
e2_agent_ptr->logger.error("E2 Setup failed.");
srsran::console("E2 setup failed\n");
return srsran::proc_outcome_t::error;
}
void e2_agent::e2_setup_proc_t::then(const srsran::proc_state_t& result)
{
if (result.is_error()) {
e2_agent_ptr->logger.info("Failed to initiate RIC connection. Attempting reconnection in %d seconds",
e2_agent_ptr->ric_connect_timer.duration() / 1000);
srsran::console("Failed to initiate RIC connection. Attempting reconnection in %d seconds\n",
e2_agent_ptr->ric_connect_timer.duration() / 1000);
e2_agent_ptr->rx_sockets.remove_socket(e2_agent_ptr->ric_socket.get_socket());
e2_agent_ptr->ric_socket.close();
e2_agent_ptr->logger.info("R2 Agent socket closed.");
e2_agent_ptr->ric_connect_timer.run();
if (e2_agent_ptr->_args.max_ric_setup_retries > 0 && connect_count > e2_agent_ptr->_args.max_ric_setup_retries) {
srsran_terminate("Error connecting to RIC");
}
// Try again with in 10 seconds
} else {
connect_count = 0;
}
}
/*********************************************************
* E2 Agent class
*********************************************************/
e2_agent::e2_agent(srslog::basic_logger& logger, e2_interface_metrics* _gnb_metrics) :
task_sched(), logger(logger), rx_sockets(), thread("E2_AGENT_THREAD"), e2ap_(logger, this, _gnb_metrics, &task_sched)
task_sched(),
logger(logger),
rx_sockets(),
thread("E2_AGENT_THREAD"),
e2ap_(logger, this, _gnb_metrics, &task_sched),
e2_setup_proc(this)
{
gnb_metrics = _gnb_metrics;
ric_rece_task_queue = task_sched.make_task_queue();
}
bool e2_agent::init(e2_agent_args_t args)
{
printf("E2_AGENT: Init\n");
_args = args;
// Setup RIC reconnection timer
ric_connect_timer = task_sched.get_unique_timer();
auto ric_connect_run = [this](uint32_t tid) {
if (e2_setup_proc.is_busy()) {
logger.error("Failed to initiate RIC Setup procedure: procedure is busy.");
}
e2_setup_proc.launch();
};
ric_connect_timer.set(_args.ric_connect_timer * 1000, ric_connect_run);
// Setup timeout
e2_setup_timeout = task_sched.get_unique_timer();
uint32_t ric_setup_timeout_val = 5000;
e2_setup_timeout.set(ric_setup_timeout_val, [this](uint32_t tid) {
e2_setup_proc_t::e2setupresult res;
res.success = false;
res.cause = e2_setup_proc_t::e2setupresult::cause_t::timeout;
e2_setup_proc.trigger(res);
});
start(0);
running = true;
// starting RIC connection
if (not e2_setup_proc.launch()) {
logger.error("Failed to initiate RIC Setup procedure: error launching procedure.");
}
return SRSRAN_SUCCESS;
}
void e2_agent::stop()
{
running = false;
wait_thread_finish();
}
void e2_agent::tic()
{
// get tick every 1ms to advance timers
task_sched.tic();
}
bool e2_agent::is_ric_connected()
{
return ric_connected;
}
bool e2_agent::connect_ric()
{
using namespace srsran::net_utils;
logger.info("Connecting to RIC %s:%d", _args.ric_ip.c_str(), _args.ric_port);
// Open SCTP socket
if (not ric_socket.open_socket(addr_family::ipv4, socket_type::seqpacket, protocol_type::SCTP)) {
return false;
}
printf("RIC SCTP socket opened. fd=%d\n", ric_socket.fd());
// Subscribe to shutdown events
if (not ric_socket.sctp_subscribe_to_events()) {
ric_socket.close();
return false;
}
// Set SRTO_MAX
if (not ric_socket.sctp_set_rto_opts(6000)) {
return false;
}
// Set SCTP init options
if (not ric_socket.sctp_set_init_msg_opts(3, 5000)) {
return false;
}
// Bind socket
if (not ric_socket.bind_addr(args.ric_bind_ip.c_str(), args.ric_bind_port)) {
if (not ric_socket.bind_addr(_args.ric_bind_ip.c_str(), _args.ric_bind_port)) {
ric_socket.close();
return false;
}
logger.info("SCTP socket opened. fd=%d", ric_socket.fd());
// Connect to the AMF address
if (not ric_socket.connect_to(args.ric_ip.c_str(), args.ric_port, &ric_addr)) {
if (not ric_socket.connect_to(_args.ric_ip.c_str(), _args.ric_port, &ric_addr)) {
ric_socket.close();
return false;
}
logger.info("SCTP socket connected with RIC. fd=%d", ric_socket.fd());
// Assign a handler to rx RIC packets
ric_rece_task_queue = task_sched.make_task_queue();
auto rx_callback =
[this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) {
handle_e2_rx_msg(std::move(pdu), from, sri, flags);
handle_ric_rx_msg(std::move(pdu), from, sri, flags);
};
rx_sockets.add_socket_handler(ric_socket.fd(),
srsran::make_sctp_sdu_handler(logger, ric_rece_task_queue, rx_callback));
printf("SCTP socket connected with RIC. fd=%d \n", ric_socket.fd());
running = true;
start(0);
return SRSRAN_SUCCESS;
}
void e2_agent::stop()
{
running = false;
wait_thread_finish();
logger.info("SCTP socket connected established with RIC");
return true;
}
void e2_agent::tic()
bool e2_agent::setup_e2()
{
// get tick every 1ms to advance timers
task_sched.tic();
return send_e2_msg(E2_SETUP_REQUEST);
}
void e2_agent::run_thread()
{
using namespace asn1::e2ap;
while (running) {
if (e2ap_.send_setup_request()) {
send_e2_msg(E2_SETUP_REQUEST);
printf("e2 setup request sent\n");
}
task_sched.run_next_task();
}
}
@ -132,6 +284,11 @@ bool e2_agent::send_e2_msg(e2_msg_type_t msg_type)
bool e2_agent::queue_send_e2ap_pdu(e2_ap_pdu_c e2ap_pdu)
{
if (not ric_connected) {
logger.error("Aborting sending msg. Cause: RIC is not connected.");
return false;
}
auto send_e2ap_pdu_task = [this, e2ap_pdu]() { send_e2ap_pdu(e2ap_pdu); };
ric_rece_task_queue.push(send_e2ap_pdu_task);
return true;
@ -158,12 +315,66 @@ bool e2_agent::send_e2ap_pdu(e2_ap_pdu_c send_pdu)
return true;
}
bool e2_agent::handle_e2_rx_msg(srsran::unique_byte_buffer_t pdu,
const sockaddr_in& from,
const sctp_sndrcvinfo& sri,
int flags)
bool e2_agent::handle_ric_rx_msg(srsran::unique_byte_buffer_t pdu,
const sockaddr_in& from,
const sctp_sndrcvinfo& sri,
int flags)
{
// Handle Notification Case
if (flags & MSG_NOTIFICATION) {
// Received notification
union sctp_notification* notification = (union sctp_notification*)pdu->msg;
logger.info("SCTP Notification %04x", notification->sn_header.sn_type);
bool restart_e2 = false;
if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) {
logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id);
restart_e2 = true;
} else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE &&
notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) {
logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id);
restart_e2 = true;
} else if (notification->sn_header.sn_type == SCTP_REMOTE_ERROR) {
logger.info("SCTP remote error. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP remote error. Association: %d\n", sri.sinfo_assoc_id);
restart_e2 = true;
} else if (notification->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
logger.info("SCTP association changed. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP association changed. Association: %d\n", sri.sinfo_assoc_id);
}
if (restart_e2) {
logger.info("Restarting E2 connection");
srsran::console("Restarting E2 connection\n");
rx_sockets.remove_socket(ric_socket.get_socket());
ric_socket.close();
}
} else if (pdu->N_bytes == 0) {
logger.error("SCTP return 0 bytes. Closing socket");
ric_socket.close();
}
// Restart RIC connection procedure if we lost connection
if (not ric_socket.is_open()) {
ric_connected = false;
if (e2_setup_proc.is_busy()) {
logger.error("Failed to initiate RIC connection procedure, as it is already running.");
return false;
}
e2_setup_proc.launch();
return false;
}
if ((flags & MSG_NOTIFICATION) == 0 && pdu->N_bytes != 0) {
handle_e2_rx_pdu(pdu.get());
}
return true;
}
bool e2_agent::handle_e2_rx_pdu(srsran::byte_buffer_t* pdu)
{
printf("E2_AGENT: Received %d bytes from %s\n", pdu->N_bytes, inet_ntoa(from.sin_addr));
printf("E2_AGENT: Received %d bytes from RIC\n", pdu->N_bytes);
e2_ap_pdu_c pdu_c;
asn1::cbit_ref bref(pdu->msg, pdu->N_bytes);
if (pdu_c.unpack(bref) != asn1::SRSASN_SUCCESS) {
@ -248,8 +459,17 @@ bool e2_agent::handle_e2_setup_response(e2setup_resp_s setup_response)
{
if (e2ap_.process_setup_response(setup_response)) {
logger.error("Failed to process E2 Setup Response \n");
ric_connected = false;
e2_setup_proc_t::e2setupresult res;
res.success = false;
e2_setup_proc.trigger(res);
return false;
}
ric_connected = true;
e2_setup_proc_t::e2setupresult res;
res.success = true;
e2_setup_proc.trigger(res);
return true;
}

@ -232,6 +232,11 @@ e2_ap_pdu_c e2ap::generate_subscription_modification_required()
int e2ap::process_setup_response(e2setup_resp_s setup_response)
{
if (setup_response->transaction_id.value.value == 0) {
// TODO: transaction_id reset? check specs
setup_procedure_transaction_id = 0;
}
if (setup_procedure_transaction_id == setup_response->transaction_id.value.value) {
setup_procedure_transaction_id++;
e2_established = true;
@ -403,6 +408,11 @@ int e2ap::get_reset_id()
// implementation of e2ap failure functions
int e2ap::process_e2_setup_failure(e2setup_fail_s e2setup_failure)
{
if (e2setup_failure->transaction_id.value.value == 0) {
// TODO: transaction_id reset? check specs
setup_procedure_transaction_id = 0;
}
if (setup_procedure_transaction_id == e2setup_failure->transaction_id.value.value) {
setup_procedure_transaction_id++;
} else {

Loading…
Cancel
Save