fix PNF PDU size cutting

master
Francisco Paisana 5 years ago
parent a0606669e2
commit e3e9bbcd62

@ -24,6 +24,10 @@
#include "basic_vnf_api.h"
#include "common.h"
#include "srslte/common/block_queue.h"
#include "srslte/common/buffer_pool.h"
#include "srslte/common/choice_type.h"
#include "srslte/common/logmap.h"
#include <arpa/inet.h>
#include <atomic>
#include <errno.h>
@ -56,6 +60,11 @@ struct pnf_metrics_t {
class srslte_basic_pnf
{
using msg_header_t = basic_vnf_api::msg_header_t;
const static size_t buffer_size =
srslte::static_max<sizeof(basic_vnf_api::dl_conf_msg_t), sizeof(basic_vnf_api::tx_request_msg_t)>::value;
using msg_buffer_t = std::array<uint8_t, buffer_size>;
public:
srslte_basic_pnf(const std::string& type_,
const std::string& vnf_p5_addr,
@ -72,7 +81,11 @@ public:
num_sf(num_sf_),
tb_len(tb_len_),
rand_gen(RAND_SEED),
rand_dist(MIN_TB_LEN, MAX_TB_LEN){};
rand_dist(MIN_TB_LEN, MAX_TB_LEN)
{
log_h->set_level(srslte::LOG_LEVEL_INFO);
}
~srslte_basic_pnf() { stop(); };
@ -106,10 +119,10 @@ public:
running = true;
if (type == "gnb") {
rx_thread = std::unique_ptr<std::thread>(new std::thread(&srslte_basic_pnf::rx_thread_function, this));
tx_thread = std::unique_ptr<std::thread>(new std::thread(&srslte_basic_pnf::tx_thread_function, this));
rx_thread = std::unique_ptr<std::thread>(new std::thread(&srslte_basic_pnf::dl_handler_thread, this));
tx_thread = std::unique_ptr<std::thread>(new std::thread(&srslte_basic_pnf::ul_handler_thread, this));
} else {
tx_thread = std::unique_ptr<std::thread>(new std::thread(&srslte_basic_pnf::tx_thread_function_ue, this));
tx_thread = std::unique_ptr<std::thread>(new std::thread(&srslte_basic_pnf::ue_dl_handler_thread, this));
}
return true;
@ -141,8 +154,12 @@ public:
return tmp;
}
void connect_out_rf_queue(srslte::block_queue<srslte::unique_byte_buffer_t>* rf_queue_) { rf_out_queue = rf_queue_; }
srslte::block_queue<srslte::unique_byte_buffer_t>* get_in_rf_queue() { return &rf_in_queue; }
private:
void rx_thread_function()
//! Waits for DL Config or Tx Request Msg from VNF and forwards to RF
void dl_handler_thread()
{
pthread_setname_np(pthread_self(), rx_thread_name.c_str());
@ -152,25 +169,26 @@ private:
fd.fd = sockfd;
fd.events = POLLIN;
const uint32_t max_basic_api_pdu = sizeof(basic_vnf_api::dl_conf_msg_t) + 32; // larger than biggest message
std::unique_ptr<std::array<uint8_t, max_basic_api_pdu> > rx_buffer =
std::unique_ptr<std::array<uint8_t, max_basic_api_pdu> >(new std::array<uint8_t, max_basic_api_pdu>);
// const uint32_t max_basic_api_pdu = sizeof(basic_vnf_api::dl_conf_msg_t) + 32; // larger than biggest message
// std::unique_ptr<std::array<uint8_t, max_basic_api_pdu> > rx_buffer =
// std::unique_ptr<std::array<uint8_t, max_basic_api_pdu> >(new std::array<uint8_t, max_basic_api_pdu>);
std::unique_ptr<msg_buffer_t> rx_buffer{new msg_buffer_t{}};
while (running) {
// receive response
int ret = poll(&fd, 1, RX_TIMEOUT_MS);
switch (ret) {
case -1:
printf("Error occured.\n");
printf("Error occurred.\n");
running = false;
break;
case 0:
// Timeout
printf("Error: Didn't receive response after %dms\n", RX_TIMEOUT_MS);
running = false;
// running = false;
break;
default:
int recv_ret = recv(sockfd, rx_buffer->data(), rx_buffer->size(), 0);
int recv_ret = recv(sockfd, rx_buffer->data(), sizeof(*rx_buffer), 0);
handle_msg(rx_buffer->data(), recv_ret);
break;
}
@ -185,7 +203,7 @@ private:
}
};
void tx_thread_function()
void ul_handler_thread()
{
pthread_setname_np(pthread_self(), tx_thread_name.c_str());
@ -200,6 +218,7 @@ private:
std::unique_ptr<std::array<uint8_t, max_basic_api_pdu> >(new std::array<uint8_t, max_basic_api_pdu>);
int32_t sf_counter = 0;
bool using_rf_queue = false;
while (running && (num_sf > 0 ? sf_counter < num_sf : true)) {
{
std::lock_guard<std::mutex> lock(mutex);
@ -213,10 +232,15 @@ private:
// Send request
send_sf_ind(tti);
if (not rf_in_queue.empty()) {
using_rf_queue = true;
send_rx_data_ind(tti);
} else if (not using_rf_queue) {
// provide UL data every 2nd TTI
if (tti % 2 == 0) {
send_rx_data_ind(tti);
}
}
sf_counter++;
}
@ -227,7 +251,7 @@ private:
printf("Leaving Tx thread after %d subframes\n", sf_counter);
};
void tx_thread_function_ue()
void ue_dl_handler_thread()
{
pthread_setname_np(pthread_self(), tx_thread_name.c_str());
@ -242,6 +266,7 @@ private:
std::unique_ptr<std::array<uint8_t, max_basic_api_pdu> >(new std::array<uint8_t, max_basic_api_pdu>);
int32_t sf_counter = 0;
bool using_rf_queue = false;
while (running && (num_sf > 0 ? sf_counter < num_sf : true)) {
{
std::lock_guard<std::mutex> lock(mutex);
@ -255,12 +280,18 @@ private:
// Send SF indication
send_sf_ind(tti);
if (not rf_in_queue.empty()) {
using_rf_queue = true;
srslte::unique_byte_buffer_t tb = rf_in_queue.wait_pop();
send_dl_ind(tti, std::move(tb));
} else if (not using_rf_queue) {
// provide DL grant every even TTI, and UL grant every odd
if (tti % 2 == 0) {
send_dl_ind(tti);
} else {
send_ul_ind(tti);
}
}
sf_counter++;
}
@ -291,7 +322,7 @@ private:
{
basic_vnf_api::msg_header_t* header = (basic_vnf_api::msg_header_t*)buffer;
// printf("Received %s (%d B) in TTI\n", msg_type_text[header->type], len);
log_h->info("Received %s (%d B) in TTI\n", basic_vnf_api::msg_type_text[header->type], len);
switch (header->type) {
case basic_vnf_api::SF_IND:
@ -330,7 +361,7 @@ private:
if (msg->tti != tti) {
metrics.num_timing_errors++;
// printf("Received TX request for TTI=%d but current TTI is %d\n", msg->tti, tti.load());
return -1;
// return -1;
}
for (uint32_t i = 0; i < msg->nof_pdus; ++i) {
@ -339,6 +370,15 @@ private:
metrics.num_pdus += msg->nof_pdus;
if (rf_out_queue != nullptr) {
for (uint32_t i = 0; i < msg->nof_pdus; ++i) {
srslte::unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool);
pdu->N_bytes = msg->pdus[i].length;
memcpy(pdu->msg, msg->pdus[i].data, msg->pdus[i].length);
rf_out_queue->push(std::move(pdu));
}
}
return 0;
}
@ -377,7 +417,7 @@ private:
}
}
void send_dl_ind(uint32_t tti_)
void send_dl_ind(uint32_t tti_, srslte::unique_byte_buffer_t tb = {})
{
#if PING_REQUEST_PDU
static uint8_t tv[] = {
@ -403,6 +443,8 @@ private:
0x3f,
};
#endif // PING_REQUEST_PDU
uint8_t* data = tb != nullptr ? tb->msg : tv;
uint32_t N_bytes = tb != nullptr ? tb->N_bytes : sizeof(tv);
basic_vnf_api::dl_ind_msg_t dl_ind = {};
@ -415,16 +457,18 @@ private:
dl_ind.pdus[0].type = basic_vnf_api::PDSCH;
dl_ind.pdus[0].length = tb_len > 0 ? tb_len : rand_dist(rand_gen);
if (dl_ind.pdus[0].length >= sizeof(tv)) {
if (dl_ind.pdus[0].length >= N_bytes) {
// copy TV
memcpy(dl_ind.pdus[0].data, tv, sizeof(tv));
memcpy(dl_ind.pdus[0].data, data, N_bytes);
// set remaining bytes to zero
memset(dl_ind.pdus[0].data + sizeof(tv), 0xaa, dl_ind.pdus[0].length - sizeof(tv));
memset(dl_ind.pdus[0].data + N_bytes, 0xaa, dl_ind.pdus[0].length - N_bytes);
} else {
// just fill with dummy bytes
memset(dl_ind.pdus[0].data, 0xab, dl_ind.pdus[0].length);
}
log_h->info_hex(dl_ind.pdus[0].data, N_bytes, "Sending to UE a TB (%d bytes)\n", N_bytes);
int n = 0;
if ((n = sendto(sockfd, &dl_ind, sizeof(dl_ind), 0, (struct sockaddr*)&servaddr, sizeof(servaddr))) < 0) {
printf("sendto failed, ret=%d\n", n);
@ -452,6 +496,8 @@ private:
std::unique_ptr<std::thread> tx_thread, rx_thread;
std::string tx_thread_name = "TX_PNF", rx_thread_name = "RX_PNF";
bool running = false;
srslte::byte_buffer_pool* pool = srslte::byte_buffer_pool::get_instance();
srslte::log_ref log_h{"PNF"};
std::mutex mutex;
std::atomic<std::uint32_t> tti;
@ -474,6 +520,9 @@ private:
// For random number generation
std::mt19937 rand_gen;
std::uniform_int_distribution<uint16_t> rand_dist;
srslte::block_queue<srslte::unique_byte_buffer_t>* rf_out_queue = nullptr;
srslte::block_queue<srslte::unique_byte_buffer_t> rf_in_queue;
};
} // namespace srslte

@ -24,7 +24,7 @@
#include "basic_vnf_api.h"
#include "common.h"
#include "srslte/common/log_filter.h"
#include "srslte/common/logmap.h"
#include "srslte/common/threads.h"
#include "srslte/interfaces/gnb_interfaces.h"
#include "srslte/interfaces/ue_nr_interfaces.h"
@ -70,7 +70,7 @@ private:
uint32_t calc_full_msg_len(const basic_vnf_api::tx_request_msg_t& msg);
srslte::logger* m_logger = nullptr;
std::unique_ptr<srslte::log_filter> m_log = nullptr;
srslte::log_ref log_h;
srsenb::stack_interface_phy_nr* m_gnb_stack = nullptr;
srsue::stack_interface_phy_nr* m_ue_stack = nullptr;
srslte::byte_buffer_pool* m_pool = nullptr;

@ -41,12 +41,11 @@ srslte_basic_vnf::srslte_basic_vnf(const vnf_args_t& args_, srslte::logger* logg
m_logger(logger_),
thread("BASIC_VNF_P7"),
m_tx_req_msg(new basic_vnf_api::tx_request_msg_t),
m_log(new srslte::log_filter),
log_h("VNF"),
m_pool(srslte::byte_buffer_pool::get_instance())
{
m_log->init("VNF ", m_logger);
m_log->set_level(m_args.log_level);
m_log->set_hex_limit(m_args.log_hex_limit);
log_h->set_level(m_args.log_level);
log_h->set_hex_limit(m_args.log_hex_limit);
if (m_args.type == "gnb" || m_args.type == "ue") {
if (m_args.type == "gnb") {
@ -55,10 +54,10 @@ srslte_basic_vnf::srslte_basic_vnf(const vnf_args_t& args_, srslte::logger* logg
m_ue_stack = (srsue::stack_interface_phy_nr*)stack_;
}
m_log->info("Initializing VNF for gNB\n");
log_h->info("Initializing VNF for gNB\n");
start();
} else {
m_log->error("Unknown VNF type. Exiting\n.");
log_h->error("Unknown VNF type. Exiting\n.");
}
}
@ -108,7 +107,7 @@ void srslte_basic_vnf::run_thread()
running = true;
m_log->info("Started VNF handler listening on %s:%d\n", m_args.bind_addr.c_str(), m_args.bind_port);
log_h->info("Started VNF handler listening on %s:%d\n", m_args.bind_addr.c_str(), m_args.bind_port);
while (running) {
int ret = poll(&fd, 1, RX_TIMEOUT_MS);
@ -128,14 +127,14 @@ void srslte_basic_vnf::run_thread()
break;
}
}
m_log->info("VNF thread stopped\n");
log_h->info("VNF thread stopped\n");
}
int srslte_basic_vnf::handle_msg(const uint8_t* buffer, const uint32_t len)
{
basic_vnf_api::msg_header_t* header = (basic_vnf_api::msg_header_t*)buffer;
m_log->info("Received %s (%d B)\n", basic_vnf_api::msg_type_text[header->type], len);
log_h->info("Received %s (%d B)\n", basic_vnf_api::msg_type_text[header->type], len);
switch (header->type) {
case basic_vnf_api::SF_IND:
@ -163,7 +162,7 @@ int srslte_basic_vnf::handle_msg(const uint8_t* buffer, const uint32_t len)
int srslte_basic_vnf::handle_sf_ind(basic_vnf_api::sf_ind_msg_t* msg)
{
int ret = SRSLTE_SUCCESS;
m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti);
log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti);
// store Rx timestamp
last_sf_indication_time = msg->t1;
@ -182,7 +181,7 @@ int srslte_basic_vnf::handle_sf_ind(basic_vnf_api::sf_ind_msg_t* msg)
int srslte_basic_vnf::handle_dl_ind(basic_vnf_api::dl_ind_msg_t* msg)
{
int ret = SRSLTE_ERROR;
m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti);
log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti);
uint32_t cc_idx = 0;
@ -191,7 +190,7 @@ int srslte_basic_vnf::handle_dl_ind(basic_vnf_api::dl_ind_msg_t* msg)
dl_grant.tti = msg->tti;
if (msg->nof_pdus > SRSLTE_MAX_TB) {
m_log->error("Too many TBs (%d > %d)\n", msg->nof_pdus, SRSLTE_MAX_TB);
log_h->error("Too many TBs (%d > %d)\n", msg->nof_pdus, SRSLTE_MAX_TB);
goto exit;
}
@ -204,7 +203,7 @@ int srslte_basic_vnf::handle_dl_ind(basic_vnf_api::dl_ind_msg_t* msg)
m_ue_stack->tb_decoded(cc_idx, dl_grant);
}
} else {
m_log->error("TB too big to fit into buffer (%d > %d)\n", msg->pdus[i].length, dl_grant.tb[i]->get_tailroom());
log_h->error("TB too big to fit into buffer (%d > %d)\n", msg->pdus[i].length, dl_grant.tb[i]->get_tailroom());
}
}
@ -217,10 +216,10 @@ exit:
int srslte_basic_vnf::handle_ul_ind(basic_vnf_api::ul_ind_msg_t* msg)
{
m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti);
log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti);
if (msg->pdus.type != basic_vnf_api::PUSCH) {
m_log->error("Received UL indication for wrong PDU type\n");
log_h->error("Received UL indication for wrong PDU type\n");
return SRSLTE_ERROR;
}
@ -238,10 +237,10 @@ int srslte_basic_vnf::handle_ul_ind(basic_vnf_api::ul_ind_msg_t* msg)
int srslte_basic_vnf::handle_rx_data_ind(basic_vnf_api::rx_data_ind_msg_t* msg)
{
m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->sfn);
log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->sfn);
if (msg->nof_pdus != 1 || msg->pdus[0].type != basic_vnf_api::PUSCH) {
m_log->error("Received UL indication for wrong PDU type\n");
log_h->error("Received UL indication for wrong PDU type\n");
return SRSLTE_ERROR;
}
@ -277,10 +276,10 @@ int srslte_basic_vnf::dl_config_request(const srsenb::phy_interface_stack_nr::dl
uint32_t len = sizeof(dl_conf);
// Send it to PNF
m_log->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[dl_conf.header.type], len);
log_h->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[dl_conf.header.type], len);
int n = 0;
if ((n = sendto(sockfd, &dl_conf, len, MSG_CONFIRM, (struct sockaddr*)&client_addr, sizeof(client_addr))) < 0) {
m_log->error("sendto failed, ret=%d\n", n);
log_h->error("sendto failed, ret=%d\n", n);
}
return 0;
@ -304,7 +303,7 @@ int srslte_basic_vnf::tx_request(const srsue::phy_interface_stack_nr::tx_request
// copy data from TB0
memcpy(m_tx_req_msg->pdus[0].data, request.data, request.tb_len);
} else {
m_log->error("Trying to send %d B PDU. Maximum size is %d B\n", request.tb_len, MAX_PDU_SIZE);
log_h->error("Trying to send %d B PDU. Maximum size is %d B\n", request.tb_len, MAX_PDU_SIZE);
}
// calculate actual length of
@ -314,11 +313,11 @@ int srslte_basic_vnf::tx_request(const srsue::phy_interface_stack_nr::tx_request
m_tx_req_msg->header.msg_len = len - sizeof(basic_vnf_api::msg_header_t);
// Send it to PNF
m_log->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len);
log_h->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len);
int n = 0;
if ((n = sendto(sockfd, m_tx_req_msg.get(), len, MSG_CONFIRM, (struct sockaddr*)&client_addr, sizeof(client_addr))) <
0) {
m_log->error("sendto failed, ret=%d\n", n);
log_h->error("sendto failed, ret=%d\n", n);
}
return 0;
@ -327,7 +326,7 @@ int srslte_basic_vnf::tx_request(const srsue::phy_interface_stack_nr::tx_request
int srslte_basic_vnf::tx_request(const srsenb::phy_interface_stack_nr::tx_request_t& request)
{
if (request.nof_pdus > MAX_NUM_PDUS) {
m_log->error("Trying to send %d PDUs but only %d supported\n", request.nof_pdus, MAX_NUM_PDUS);
log_h->error("Trying to send %d PDUs but only %d supported\n", request.nof_pdus, MAX_NUM_PDUS);
return SRSLTE_ERROR;
}
@ -346,7 +345,7 @@ int srslte_basic_vnf::tx_request(const srsenb::phy_interface_stack_nr::tx_reques
// copy data from TB0
memcpy(m_tx_req_msg->pdus[i].data, request.pdus[i].data[0], m_tx_req_msg->pdus[i].length);
} else {
m_log->error("Trying to send %d B PDU. Maximum size is %d B\n", request.pdus[i].length, MAX_PDU_SIZE);
log_h->error("Trying to send %d B PDU. Maximum size is %d B\n", request.pdus[i].length, MAX_PDU_SIZE);
}
}
@ -357,11 +356,21 @@ int srslte_basic_vnf::tx_request(const srsenb::phy_interface_stack_nr::tx_reques
m_tx_req_msg->header.msg_len = len - sizeof(basic_vnf_api::msg_header_t);
// Send it to PNF
m_log->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len);
log_h->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len);
if (log_h->get_level() == LOG_LEVEL_DEBUG) {
for (uint32_t i = 0; i < m_tx_req_msg->nof_pdus; ++i) {
log_h->debug_hex(m_tx_req_msg->pdus[i].data,
m_tx_req_msg->pdus[i].length,
"Sending PDU %s:%d (%d bytes)\n",
basic_vnf_api::msg_type_text[m_tx_req_msg->header.type],
m_tx_req_msg->pdus[i].index,
m_tx_req_msg->pdus[i].length);
}
}
int n = 0;
if ((n = sendto(sockfd, m_tx_req_msg.get(), len, MSG_CONFIRM, (struct sockaddr*)&client_addr, sizeof(client_addr))) <
0) {
m_log->error("sendto failed, ret=%d\n", n);
log_h->error("sendto failed, ret=%d\n", n);
}
return 0;

@ -100,9 +100,9 @@ add_test(expected_test expected_test)
if(ENABLE_5GNR)
add_executable(pnf_dummy pnf_dummy.cc)
target_link_libraries(pnf_dummy ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES})
target_link_libraries(pnf_dummy srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES})
add_executable(pnf_bridge pnf_bridge.cc)
target_link_libraries(pnf_bridge ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES})
target_link_libraries(pnf_bridge srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES})
endif()

@ -100,6 +100,8 @@ int main(int argc, char** argv)
srslte::srslte_basic_pnf gnb_pnf(
"gnb", args.gnb_vnf_addr, args.gnb_vnf_port, args.sf_interval, args.num_sf, args.tb_len);
gnb_pnf.connect_out_rf_queue(ue_pnf.get_in_rf_queue());
ue_pnf.start();
gnb_pnf.start();

@ -128,6 +128,8 @@ int enb::init(const all_args_t& args_, srslte::logger* logger_)
srsenb::nr_phy_cfg_t nr_phy_cfg = {};
args.phy.vnf_args.type = "gnb";
args.phy.vnf_args.log_level = args.phy.log.phy_level;
args.phy.vnf_args.log_hex_limit = args.phy.log.phy_hex_limit;
if (nr_phy->init(args.phy, nr_phy_cfg, nr_stack.get())) {
log->console("Error initializing PHY.\n");
return SRSLTE_ERROR;

Loading…
Cancel
Save