diff --git a/lib/src/common/network_utils.cc b/lib/src/common/network_utils.cc index b7c5c3b6d..3bf566556 100644 --- a/lib/src/common/network_utils.cc +++ b/lib/src/common/network_utils.cc @@ -225,7 +225,7 @@ bool sctp_subscribe_to_events(int fd) evnts.sctp_data_io_event = 1; evnts.sctp_shutdown_event = 1; evnts.sctp_address_event = 1; - if (setsockopt(fd, SOL_SOCKET, SCTP_EVENTS, &evnts, sizeof(evnts)) != 0) { + if (setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts)) != 0) { srslog::fetch_basic_logger(LOGSERVICE).error("Failed to subscribe to SCTP_SHUTDOWN event: %s", strerror(errno)); perror("Could not register socket to SCTP events\n"); close(fd); diff --git a/srsenb/enb.conf.example b/srsenb/enb.conf.example index 45db00a7b..55f20045c 100644 --- a/srsenb/enb.conf.example +++ b/srsenb/enb.conf.example @@ -370,6 +370,14 @@ enable = false #auto_target_papr = 8 #ema_alpha = 0.0143 +# Expert configuration options +# +# ric_ip: IP address of the RIC controller +# ric_port: Port of the RIC controller +# ric_bind_ip: Local IP address to bind for RIC connection +# ric_bind_port: Local port to bind for RIC connection +# max_ric_setup_retries: Maximum amount of retries to setup the RIC connection. If this value is exceeded, an alarm is written to the log. -1 means infinity. +# ric_connect_timer: Connection Retry Timer for RIC connection (seconds) ##################################################################### [e2_agent] enable = true @@ -377,6 +385,8 @@ enable = true #ric_port = 36421 #ric_bind_ip = 127.0.0.1 #ric_bind_port = 36425 +#max_ric_setup_retries = -1 +#ric_connect_timer = 10 ##################################################################### # Expert configuration options diff --git a/srsenb/src/main.cc b/srsenb/src/main.cc index c60bf6c1c..441e0ae93 100644 --- a/srsenb/src/main.cc +++ b/srsenb/src/main.cc @@ -234,6 +234,8 @@ void parse_args(all_args_t* args, int argc, char* argv[]) ("e2_agent.ric_port", bpo::value(&args->e2_agent.ric_port)->default_value(36421), "RIC port") ("e2_agent.ric_bind_ip", bpo::value(&args->e2_agent.ric_bind_ip)->default_value("127.0.0.1"), "Local IP address to bind for RIC connection") ("e2_agent.ric_bind_port", bpo::value(&args->e2_agent.ric_bind_port)->default_value(36425), "Local port to bind for RIC connection") + ("e2_agent.max_ric_setup_retries", bpo::value(&args->e2_agent.max_ric_setup_retries)->default_value(-1), "Max RIC setup retries") + ("e2_agent.ric_connect_timer", bpo::value(&args->e2_agent.ric_connect_timer)->default_value(10), "Connection Retry Timer for RIC connection (seconds)") /* Expert section */ ("expert.metrics_period_secs", bpo::value(&args->general.metrics_period_secs)->default_value(1.0), "Periodicity for metrics in seconds.") diff --git a/srsgnb/hdr/stack/ric/e2_agent.h b/srsgnb/hdr/stack/ric/e2_agent.h index 70dee9fdd..3e22bdc58 100644 --- a/srsgnb/hdr/stack/ric/e2_agent.h +++ b/srsgnb/hdr/stack/ric/e2_agent.h @@ -16,10 +16,12 @@ #include "srsgnb/hdr/stack/ric/e2ap.h" #include "srsran/common/network_utils.h" +#include "srsran/common/stack_procedure.h" #include "srsran/common/task_scheduler.h" #include "srsran/common/threads.h" #include "srsran/interfaces/e2_metrics_interface.h" #include "srsran/srsran.h" + static const int e2ap_ppid = 70; enum e2_msg_type_t { @@ -37,6 +39,8 @@ struct e2_agent_args_t { uint32_t ric_port; std::string ric_bind_ip; uint32_t ric_bind_port; + int32_t max_ric_setup_retries; + uint32_t ric_connect_timer; }; namespace srsenb { @@ -51,6 +55,7 @@ public: void stop(); void run_thread(); void tic(); + bool is_ric_connected(); // Send messages to RIC bool send_sctp(srsran::unique_byte_buffer_t& buf); @@ -61,7 +66,8 @@ public: // Handle messages received from RIC bool - handle_e2_rx_msg(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags); + handle_ric_rx_msg(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags); + bool handle_e2_rx_pdu(srsran::byte_buffer_t* pdu); bool handle_e2_init_msg(asn1::e2ap::init_msg_s& init_msg); bool handle_e2_successful_outcome(asn1::e2ap::successful_outcome_s& successful_outcome); bool handle_e2_unsuccessful_outcome(asn1::e2ap::unsuccessful_outcome_s& unsuccessful_outcome); @@ -76,16 +82,52 @@ public: bool handle_reset_request(reset_request_s& reset_request); private: + bool connect_ric(); + bool setup_e2(); + + e2_agent_args_t _args; srsran::task_scheduler task_sched; srsran::task_queue_handle ric_rece_task_queue; srsran::unique_socket ric_socket; srsran::socket_manager rx_sockets; srslog::basic_logger& logger; - struct sockaddr_in ric_addr = {}; // RIC address - bool running = false; + struct sockaddr_in ric_addr = {}; // RIC address + bool running = false; + bool ric_connected = false; + srsran::unique_timer ric_connect_timer; + srsran::unique_timer e2_setup_timeout; srsenb::e2_interface_metrics* gnb_metrics = nullptr; e2ap e2ap_; + + // procedures + class e2_setup_proc_t + { + public: + struct e2setupresult { + bool success = false; + enum class cause_t { timeout, failure } cause; + }; + struct e2connectresult { + bool success = false; + }; + + explicit e2_setup_proc_t(e2_agent* e2_agent_) : e2_agent_ptr(e2_agent_) {} + srsran::proc_outcome_t init(); + srsran::proc_outcome_t step() { return srsran::proc_outcome_t::yield; } + srsran::proc_outcome_t react(const e2connectresult& event); + srsran::proc_outcome_t react(const e2setupresult& event); + void then(const srsran::proc_state_t& result); + const char* name() const { return "RIC Connection"; } + uint16_t connect_count = 0; + + private: + srsran::proc_outcome_t start_ric_connection(); + + e2_agent* e2_agent_ptr = nullptr; + }; + + srsran::proc_t e2_setup_proc; }; } // namespace srsenb diff --git a/srsgnb/src/stack/ric/e2_agent.cc b/srsgnb/src/stack/ric/e2_agent.cc index 7b81ac1b3..f5b2502cc 100644 --- a/srsgnb/src/stack/ric/e2_agent.cc +++ b/srsgnb/src/stack/ric/e2_agent.cc @@ -12,74 +12,226 @@ #include "srsgnb/hdr/stack/ric/e2_agent.h" #include "srsran/asn1/e2ap.h" +#include "srsran/common/standard_streams.h" using namespace srsenb; + +/********************************************************* + * RIC Connection + *********************************************************/ + +srsran::proc_outcome_t e2_agent::e2_setup_proc_t::init() +{ + e2_agent_ptr->logger.info("Starting new RIC connection."); + connect_count++; + return start_ric_connection(); +} + +srsran::proc_outcome_t e2_agent::e2_setup_proc_t::start_ric_connection() +{ + if (not e2_agent_ptr->running) { + e2_agent_ptr->logger.info("E2 Agent is not running anymore."); + return srsran::proc_outcome_t::error; + } + if (e2_agent_ptr->ric_connected) { + e2_agent_ptr->logger.info("E2 Agent is already connected to RIC"); + return srsran::proc_outcome_t::success; + } + + auto connect_callback = [this]() { + bool connected = e2_agent_ptr->connect_ric(); + + auto notify_result = [this, connected]() { + e2_setup_proc_t::e2connectresult res; + res.success = connected; + e2_agent_ptr->e2_setup_proc.trigger(res); + }; + e2_agent_ptr->task_sched.notify_background_task_result(notify_result); + }; + srsran::get_background_workers().push_task(connect_callback); + e2_agent_ptr->logger.debug("Connection to RIC requested."); + + return srsran::proc_outcome_t::yield; +} + +srsran::proc_outcome_t e2_agent::e2_setup_proc_t::react(const srsenb::e2_agent::e2_setup_proc_t::e2connectresult& event) +{ + if (event.success) { + e2_agent_ptr->logger.info("Connected to RIC. Sending setup request."); + e2_agent_ptr->e2_setup_timeout.run(); + if (not e2_agent_ptr->setup_e2()) { + e2_agent_ptr->logger.error("E2 setup failed. Exiting..."); + srsran::console("E2 setup failed\n"); + e2_agent_ptr->running = false; + return srsran::proc_outcome_t::error; + } + e2_agent_ptr->logger.info("E2 setup request sent. Waiting for response."); + return srsran::proc_outcome_t::yield; + } + + e2_agent_ptr->logger.info("Could not connected to RIC. Aborting"); + return srsran::proc_outcome_t::error; +} + +srsran::proc_outcome_t e2_agent::e2_setup_proc_t::react(const srsenb::e2_agent::e2_setup_proc_t::e2setupresult& event) +{ + if (e2_agent_ptr->e2_setup_timeout.is_running()) { + e2_agent_ptr->e2_setup_timeout.stop(); + } + if (event.success) { + e2_agent_ptr->logger.info("E2 Setup procedure completed successfully"); + return srsran::proc_outcome_t::success; + } + e2_agent_ptr->logger.error("E2 Setup failed."); + srsran::console("E2 setup failed\n"); + return srsran::proc_outcome_t::error; +} + +void e2_agent::e2_setup_proc_t::then(const srsran::proc_state_t& result) +{ + if (result.is_error()) { + e2_agent_ptr->logger.info("Failed to initiate RIC connection. Attempting reconnection in %d seconds", + e2_agent_ptr->ric_connect_timer.duration() / 1000); + srsran::console("Failed to initiate RIC connection. Attempting reconnection in %d seconds\n", + e2_agent_ptr->ric_connect_timer.duration() / 1000); + e2_agent_ptr->rx_sockets.remove_socket(e2_agent_ptr->ric_socket.get_socket()); + e2_agent_ptr->ric_socket.close(); + e2_agent_ptr->logger.info("R2 Agent socket closed."); + e2_agent_ptr->ric_connect_timer.run(); + if (e2_agent_ptr->_args.max_ric_setup_retries > 0 && connect_count > e2_agent_ptr->_args.max_ric_setup_retries) { + srsran_terminate("Error connecting to RIC"); + } + // Try again with in 10 seconds + } else { + connect_count = 0; + } +} + +/********************************************************* + * E2 Agent class + *********************************************************/ + e2_agent::e2_agent(srslog::basic_logger& logger, e2_interface_metrics* _gnb_metrics) : - task_sched(), logger(logger), rx_sockets(), thread("E2_AGENT_THREAD"), e2ap_(logger, this, _gnb_metrics, &task_sched) + task_sched(), + logger(logger), + rx_sockets(), + thread("E2_AGENT_THREAD"), + e2ap_(logger, this, _gnb_metrics, &task_sched), + e2_setup_proc(this) { gnb_metrics = _gnb_metrics; + ric_rece_task_queue = task_sched.make_task_queue(); } bool e2_agent::init(e2_agent_args_t args) { - printf("E2_AGENT: Init\n"); + _args = args; + + // Setup RIC reconnection timer + ric_connect_timer = task_sched.get_unique_timer(); + auto ric_connect_run = [this](uint32_t tid) { + if (e2_setup_proc.is_busy()) { + logger.error("Failed to initiate RIC Setup procedure: procedure is busy."); + } + e2_setup_proc.launch(); + }; + ric_connect_timer.set(_args.ric_connect_timer * 1000, ric_connect_run); + // Setup timeout + e2_setup_timeout = task_sched.get_unique_timer(); + uint32_t ric_setup_timeout_val = 5000; + e2_setup_timeout.set(ric_setup_timeout_val, [this](uint32_t tid) { + e2_setup_proc_t::e2setupresult res; + res.success = false; + res.cause = e2_setup_proc_t::e2setupresult::cause_t::timeout; + e2_setup_proc.trigger(res); + }); + + start(0); + running = true; + // starting RIC connection + if (not e2_setup_proc.launch()) { + logger.error("Failed to initiate RIC Setup procedure: error launching procedure."); + } + + return SRSRAN_SUCCESS; +} + +void e2_agent::stop() +{ + running = false; + wait_thread_finish(); +} + +void e2_agent::tic() +{ + // get tick every 1ms to advance timers + task_sched.tic(); +} + +bool e2_agent::is_ric_connected() +{ + return ric_connected; +} + +bool e2_agent::connect_ric() +{ using namespace srsran::net_utils; + logger.info("Connecting to RIC %s:%d", _args.ric_ip.c_str(), _args.ric_port); // Open SCTP socket if (not ric_socket.open_socket(addr_family::ipv4, socket_type::seqpacket, protocol_type::SCTP)) { return false; } - printf("RIC SCTP socket opened. fd=%d\n", ric_socket.fd()); + + // Subscribe to shutdown events if (not ric_socket.sctp_subscribe_to_events()) { ric_socket.close(); return false; } + // Set SRTO_MAX + if (not ric_socket.sctp_set_rto_opts(6000)) { + return false; + } + + // Set SCTP init options + if (not ric_socket.sctp_set_init_msg_opts(3, 5000)) { + return false; + } + // Bind socket - if (not ric_socket.bind_addr(args.ric_bind_ip.c_str(), args.ric_bind_port)) { + if (not ric_socket.bind_addr(_args.ric_bind_ip.c_str(), _args.ric_bind_port)) { ric_socket.close(); return false; } + logger.info("SCTP socket opened. fd=%d", ric_socket.fd()); // Connect to the AMF address - if (not ric_socket.connect_to(args.ric_ip.c_str(), args.ric_port, &ric_addr)) { + if (not ric_socket.connect_to(_args.ric_ip.c_str(), _args.ric_port, &ric_addr)) { + ric_socket.close(); return false; } + logger.info("SCTP socket connected with RIC. fd=%d", ric_socket.fd()); + // Assign a handler to rx RIC packets - ric_rece_task_queue = task_sched.make_task_queue(); auto rx_callback = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) { - handle_e2_rx_msg(std::move(pdu), from, sri, flags); + handle_ric_rx_msg(std::move(pdu), from, sri, flags); }; rx_sockets.add_socket_handler(ric_socket.fd(), srsran::make_sctp_sdu_handler(logger, ric_rece_task_queue, rx_callback)); - printf("SCTP socket connected with RIC. fd=%d \n", ric_socket.fd()); - running = true; - start(0); - return SRSRAN_SUCCESS; -} - -void e2_agent::stop() -{ - running = false; - wait_thread_finish(); + logger.info("SCTP socket connected established with RIC"); + return true; } -void e2_agent::tic() +bool e2_agent::setup_e2() { - // get tick every 1ms to advance timers - task_sched.tic(); + return send_e2_msg(E2_SETUP_REQUEST); } void e2_agent::run_thread() { - using namespace asn1::e2ap; - while (running) { - if (e2ap_.send_setup_request()) { - send_e2_msg(E2_SETUP_REQUEST); - printf("e2 setup request sent\n"); - } task_sched.run_next_task(); } } @@ -132,6 +284,11 @@ bool e2_agent::send_e2_msg(e2_msg_type_t msg_type) bool e2_agent::queue_send_e2ap_pdu(e2_ap_pdu_c e2ap_pdu) { + if (not ric_connected) { + logger.error("Aborting sending msg. Cause: RIC is not connected."); + return false; + } + auto send_e2ap_pdu_task = [this, e2ap_pdu]() { send_e2ap_pdu(e2ap_pdu); }; ric_rece_task_queue.push(send_e2ap_pdu_task); return true; @@ -158,12 +315,66 @@ bool e2_agent::send_e2ap_pdu(e2_ap_pdu_c send_pdu) return true; } -bool e2_agent::handle_e2_rx_msg(srsran::unique_byte_buffer_t pdu, - const sockaddr_in& from, - const sctp_sndrcvinfo& sri, - int flags) +bool e2_agent::handle_ric_rx_msg(srsran::unique_byte_buffer_t pdu, + const sockaddr_in& from, + const sctp_sndrcvinfo& sri, + int flags) +{ + // Handle Notification Case + if (flags & MSG_NOTIFICATION) { + // Received notification + union sctp_notification* notification = (union sctp_notification*)pdu->msg; + logger.info("SCTP Notification %04x", notification->sn_header.sn_type); + bool restart_e2 = false; + if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) { + logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id); + srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id); + restart_e2 = true; + } else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE && + notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) { + logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id); + srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id); + restart_e2 = true; + } else if (notification->sn_header.sn_type == SCTP_REMOTE_ERROR) { + logger.info("SCTP remote error. Association: %d", sri.sinfo_assoc_id); + srsran::console("SCTP remote error. Association: %d\n", sri.sinfo_assoc_id); + restart_e2 = true; + } else if (notification->sn_header.sn_type == SCTP_ASSOC_CHANGE) { + logger.info("SCTP association changed. Association: %d", sri.sinfo_assoc_id); + srsran::console("SCTP association changed. Association: %d\n", sri.sinfo_assoc_id); + } + if (restart_e2) { + logger.info("Restarting E2 connection"); + srsran::console("Restarting E2 connection\n"); + rx_sockets.remove_socket(ric_socket.get_socket()); + ric_socket.close(); + } + } else if (pdu->N_bytes == 0) { + logger.error("SCTP return 0 bytes. Closing socket"); + ric_socket.close(); + } + + // Restart RIC connection procedure if we lost connection + if (not ric_socket.is_open()) { + ric_connected = false; + if (e2_setup_proc.is_busy()) { + logger.error("Failed to initiate RIC connection procedure, as it is already running."); + return false; + } + e2_setup_proc.launch(); + return false; + } + + if ((flags & MSG_NOTIFICATION) == 0 && pdu->N_bytes != 0) { + handle_e2_rx_pdu(pdu.get()); + } + + return true; +} + +bool e2_agent::handle_e2_rx_pdu(srsran::byte_buffer_t* pdu) { - printf("E2_AGENT: Received %d bytes from %s\n", pdu->N_bytes, inet_ntoa(from.sin_addr)); + printf("E2_AGENT: Received %d bytes from RIC\n", pdu->N_bytes); e2_ap_pdu_c pdu_c; asn1::cbit_ref bref(pdu->msg, pdu->N_bytes); if (pdu_c.unpack(bref) != asn1::SRSASN_SUCCESS) { @@ -248,8 +459,17 @@ bool e2_agent::handle_e2_setup_response(e2setup_resp_s setup_response) { if (e2ap_.process_setup_response(setup_response)) { logger.error("Failed to process E2 Setup Response \n"); + ric_connected = false; + e2_setup_proc_t::e2setupresult res; + res.success = false; + e2_setup_proc.trigger(res); return false; } + + ric_connected = true; + e2_setup_proc_t::e2setupresult res; + res.success = true; + e2_setup_proc.trigger(res); return true; } diff --git a/srsgnb/src/stack/ric/e2ap.cc b/srsgnb/src/stack/ric/e2ap.cc index ab34e43e3..b7ff88b1f 100644 --- a/srsgnb/src/stack/ric/e2ap.cc +++ b/srsgnb/src/stack/ric/e2ap.cc @@ -232,6 +232,11 @@ e2_ap_pdu_c e2ap::generate_subscription_modification_required() int e2ap::process_setup_response(e2setup_resp_s setup_response) { + if (setup_response->transaction_id.value.value == 0) { + // TODO: transaction_id reset? check specs + setup_procedure_transaction_id = 0; + } + if (setup_procedure_transaction_id == setup_response->transaction_id.value.value) { setup_procedure_transaction_id++; e2_established = true; @@ -403,6 +408,11 @@ int e2ap::get_reset_id() // implementation of e2ap failure functions int e2ap::process_e2_setup_failure(e2setup_fail_s e2setup_failure) { + if (e2setup_failure->transaction_id.value.value == 0) { + // TODO: transaction_id reset? check specs + setup_procedure_transaction_id = 0; + } + if (setup_procedure_transaction_id == e2setup_failure->transaction_id.value.value) { setup_procedure_transaction_id++; } else {