pcap: make PCAP write thread-safe

* offload PCAP writing to background thread
* use blocking_queue between writer and clients to make it thread-safe
* add basic test case

this fixes point 1-3 of #2161
master
Andre Puschmann 4 years ago
parent 732a108982
commit 4fa89b7039

@ -13,19 +13,24 @@
#ifndef SRSLTE_MAC_PCAP_H
#define SRSLTE_MAC_PCAP_H
#include "srslte/common/block_queue.h"
#include "srslte/common/buffer_pool.h"
#include "srslte/common/common.h"
#include "srslte/common/logmap.h"
#include "srslte/common/pcap.h"
#include "srslte/common/threads.h"
#include <stdint.h>
#include <thread>
namespace srslte {
class mac_pcap
class mac_pcap : srslte::thread
{
public:
mac_pcap();
~mac_pcap();
void enable(bool en);
void open(const char* filename, uint32_t ue_id = 0);
void close();
uint32_t open(const char* filename, uint32_t ue_id = 0);
uint32_t close();
void set_ue_id(uint16_t ue_id);
@ -46,18 +51,29 @@ public:
void write_sl_crnti(uint8_t* pdu, uint32_t pdu_len_bytes, uint16_t rnti, uint32_t reTX, uint32_t tti, uint8_t cc_idx);
private:
bool enable_write;
FILE* pcap_file;
uint32_t ue_id;
void pack_and_write(uint8_t* pdu,
uint32_t pdu_len_bytes,
uint32_t reTX,
bool crc_ok,
uint8_t cc_idx,
uint32_t tti,
uint16_t crnti_,
uint8_t direction,
uint8_t rnti_type);
srslte::byte_buffer_pool* pool = nullptr;
srslte::log_ref log;
bool running = false;
FILE* pcap_file = nullptr;
uint32_t ue_id = 0;
void pack_and_queue(uint8_t* pdu,
uint32_t pdu_len_bytes,
uint32_t reTX,
bool crc_ok,
uint8_t cc_idx,
uint32_t tti,
uint16_t crnti_,
uint8_t direction,
uint8_t rnti_type);
typedef struct {
MAC_Context_Info_t context;
unique_byte_buffer_t pdu;
} pcap_pdu_t;
block_queue<pcap_pdu_t> queue;
void write_pdu(pcap_pdu_t& pdu);
void run_thread() final;
};
} // namespace srslte

@ -11,47 +11,91 @@
*/
#include "srslte/common/mac_pcap.h"
#include "srslte/common/pcap.h"
#include "srslte/srslte.h"
#include <stdint.h>
namespace srslte {
mac_pcap::mac_pcap() : enable_write(false), ue_id(0), pcap_file(nullptr) {}
mac_pcap::mac_pcap() :
pool(srslte::byte_buffer_pool::get_instance()), log(srslte::logmap::get("MAC")), thread("PCAP_WRITER")
{}
mac_pcap::~mac_pcap()
{
close();
}
void mac_pcap::enable(bool en)
void mac_pcap::enable(bool enable_)
{
enable_write = true;
running = enable_;
}
void mac_pcap::open(const char* filename, uint32_t ue_id)
uint32_t mac_pcap::open(const char* filename, uint32_t ue_id_)
{
pcap_file = LTE_PCAP_Open(MAC_LTE_DLT, filename);
this->ue_id = ue_id;
enable_write = true;
if (pcap_file != nullptr) {
log->error("PCAP writer already running. Close first.\n");
return SRSLTE_ERROR;
}
pcap_file = LTE_PCAP_Open(MAC_LTE_DLT, filename);
ue_id = ue_id_;
running = true;
// start writer thread
start();
return SRSLTE_SUCCESS;
}
void mac_pcap::close()
uint32_t mac_pcap::close()
{
enable_write = false;
if (pcap_file != nullptr) {
fprintf(stdout, "Saving MAC PCAP file\n");
LTE_PCAP_Close(pcap_file);
pcap_file = nullptr;
if (running == false || pcap_file == nullptr) {
return SRSLTE_ERROR;
}
// tell writer thread to stop
running = false;
pcap_pdu_t pdu = {};
queue.push(std::move(pdu));
wait_thread_finish();
// close file handle
srslte::console("Saving MAC PCAP file\n");
LTE_PCAP_Close(pcap_file);
pcap_file = nullptr;
return SRSLTE_SUCCESS;
}
void mac_pcap::set_ue_id(uint16_t ue_id)
void mac_pcap::write_pdu(pcap_pdu_t& pdu)
{
this->ue_id = ue_id;
if (pdu.pdu != nullptr) {
LTE_PCAP_MAC_WritePDU(pcap_file, &pdu.context, pdu.pdu->msg, pdu.pdu->N_bytes);
}
}
void mac_pcap::pack_and_write(uint8_t* pdu,
uint32_t pdu_len_bytes,
void mac_pcap::run_thread()
{
// blocking write until stopped
while (running) {
pcap_pdu_t pdu = queue.wait_pop();
write_pdu(pdu);
}
// write remainder of queue
pcap_pdu_t pdu = {};
while (queue.try_pop(&pdu)) {
write_pdu(pdu);
}
}
void mac_pcap::set_ue_id(uint16_t ue_id_)
{
ue_id = ue_id_;
}
void mac_pcap::pack_and_queue(uint8_t* payload,
uint32_t payload_len,
uint32_t reTX,
bool crc_ok,
uint8_t cc_idx,
@ -60,20 +104,28 @@ void mac_pcap::pack_and_write(uint8_t* pdu,
uint8_t direction,
uint8_t rnti_type)
{
if (enable_write) {
MAC_Context_Info_t context = {};
context.radioType = FDD_RADIO;
context.direction = direction;
context.rntiType = rnti_type;
context.rnti = crnti;
context.ueid = (uint16_t)ue_id;
context.isRetx = (uint8_t)reTX;
context.crcStatusOK = crc_ok;
context.cc_idx = cc_idx;
context.sysFrameNumber = (uint16_t)(tti / 10);
context.subFrameNumber = (uint16_t)(tti % 10);
if (pdu) {
LTE_PCAP_MAC_WritePDU(pcap_file, &context, pdu, pdu_len_bytes);
if (running && payload != nullptr) {
pcap_pdu_t pdu = {};
pdu.context.radioType = FDD_RADIO;
pdu.context.direction = direction;
pdu.context.rntiType = rnti_type;
pdu.context.rnti = crnti;
pdu.context.ueid = (uint16_t)ue_id;
pdu.context.isRetx = (uint8_t)reTX;
pdu.context.crcStatusOK = crc_ok;
pdu.context.cc_idx = cc_idx;
pdu.context.sysFrameNumber = (uint16_t)(tti / 10);
pdu.context.subFrameNumber = (uint16_t)(tti % 10);
// try to allocate PDU buffer
pdu.pdu = srslte::allocate_unique_buffer(*pool);
if (pdu.pdu != nullptr && pdu.pdu->get_tailroom() >= payload_len) {
// copy payload into PDU buffer
memcpy(pdu.pdu->msg, payload, payload_len);
pdu.pdu->N_bytes = payload_len;
queue.push(std::move(pdu));
} else {
log->info("Dropping PDU in PCAP. No buffer available or not enough space (pdu_len=%d).\n", payload_len);
}
}
}
@ -85,7 +137,7 @@ void mac_pcap::write_dl_crnti(uint8_t* pdu,
uint32_t tti,
uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, rnti, DIRECTION_DOWNLINK, C_RNTI);
pack_and_queue(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, rnti, DIRECTION_DOWNLINK, C_RNTI);
}
void mac_pcap::write_dl_ranti(uint8_t* pdu,
uint32_t pdu_len_bytes,
@ -94,7 +146,7 @@ void mac_pcap::write_dl_ranti(uint8_t* pdu,
uint32_t tti,
uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, rnti, DIRECTION_DOWNLINK, RA_RNTI);
pack_and_queue(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, rnti, DIRECTION_DOWNLINK, RA_RNTI);
}
void mac_pcap::write_ul_crnti(uint8_t* pdu,
uint32_t pdu_len_bytes,
@ -103,7 +155,7 @@ void mac_pcap::write_ul_crnti(uint8_t* pdu,
uint32_t tti,
uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, reTX, true, cc_idx, tti, rnti, DIRECTION_UPLINK, C_RNTI);
pack_and_queue(pdu, pdu_len_bytes, reTX, true, cc_idx, tti, rnti, DIRECTION_UPLINK, C_RNTI);
}
void mac_pcap::write_sl_crnti(uint8_t* pdu,
@ -113,24 +165,24 @@ void mac_pcap::write_sl_crnti(uint8_t* pdu,
uint32_t tti,
uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, reTX, true, cc_idx, tti, rnti, DIRECTION_UPLINK, SL_RNTI);
pack_and_queue(pdu, pdu_len_bytes, reTX, true, cc_idx, tti, rnti, DIRECTION_UPLINK, SL_RNTI);
}
void mac_pcap::write_dl_bch(uint8_t* pdu, uint32_t pdu_len_bytes, bool crc_ok, uint32_t tti, uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, 0, DIRECTION_DOWNLINK, NO_RNTI);
pack_and_queue(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, 0, DIRECTION_DOWNLINK, NO_RNTI);
}
void mac_pcap::write_dl_pch(uint8_t* pdu, uint32_t pdu_len_bytes, bool crc_ok, uint32_t tti, uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, SRSLTE_PRNTI, DIRECTION_DOWNLINK, P_RNTI);
pack_and_queue(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, SRSLTE_PRNTI, DIRECTION_DOWNLINK, P_RNTI);
}
void mac_pcap::write_dl_mch(uint8_t* pdu, uint32_t pdu_len_bytes, bool crc_ok, uint32_t tti, uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, SRSLTE_MRNTI, DIRECTION_DOWNLINK, M_RNTI);
pack_and_queue(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, SRSLTE_MRNTI, DIRECTION_DOWNLINK, M_RNTI);
}
void mac_pcap::write_dl_sirnti(uint8_t* pdu, uint32_t pdu_len_bytes, bool crc_ok, uint32_t tti, uint8_t cc_idx)
{
pack_and_write(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, SRSLTE_SIRNTI, DIRECTION_DOWNLINK, SI_RNTI);
pack_and_queue(pdu, pdu_len_bytes, 0, crc_ok, cc_idx, tti, SRSLTE_SIRNTI, DIRECTION_DOWNLINK, SI_RNTI);
}
void mac_pcap::write_ul_rrc_pdu(const uint8_t* input, const int32_t input_len)
@ -140,7 +192,7 @@ void mac_pcap::write_ul_rrc_pdu(const uint8_t* input, const int32_t input_len)
// Size is limited by PDU buffer and MAC subheader (format 1 < 128 B)
if (input_len > 128 - 7) {
printf("PDU too large.\n");
log->error("PDU too large.\n");
return;
}

@ -10,6 +10,10 @@ add_executable(pdu_test pdu_test.cc)
target_link_libraries(pdu_test srslte_phy srslte_common srslte_mac ${CMAKE_THREAD_LIBS_INIT})
add_test(pdu_test pdu_test)
add_executable(mac_pcap_test mac_pcap_test.cc)
target_link_libraries(mac_pcap_test srslte_common srslte_mac ${SCTP_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
add_test(mac_pcap_test mac_pcap_test)
if (ENABLE_5GNR)
add_executable(mac_nr_pdu_test mac_nr_pdu_test.cc)
target_link_libraries(mac_nr_pdu_test srslte_phy srslte_mac srslte_common ${CMAKE_THREAD_LIBS_INIT})

@ -0,0 +1,62 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2020 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#include "srslte/common/common.h"
#include "srslte/common/mac_pcap.h"
#include "srslte/common/test_common.h"
#include <iostream>
#include <thread>
void write_pcap_thread_function(srslte::mac_pcap* pcap_handle, const std::array<uint8_t, 150>& pdu, uint32_t num_pdus)
{
for (uint32_t i = 0; i < num_pdus; i++) {
pcap_handle->write_ul_crnti(const_cast<uint8_t*>(pdu.data()), pdu.size(), 0x1001, true, 1, 0);
}
std::cout << "Finished thread " << std::this_thread::get_id() << "\n";
}
int main()
{
std::array<uint8_t, 150> tv = {
0x21, 0x08, 0x22, 0x80, 0x82, 0x1f, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
uint32_t num_threads = 10;
uint32_t num_pdus_per_thread = 100;
std::unique_ptr<srslte::mac_pcap> pcap_handle = std::unique_ptr<srslte::mac_pcap>(new srslte::mac_pcap());
TESTASSERT(pcap_handle->open("mac_pcap_test.pcap") == SRSLTE_SUCCESS);
TESTASSERT(pcap_handle->open("mac_pcap_test.pcap") != SRSLTE_SUCCESS); // open again will fail
std::vector<std::thread> writer_threads;
for (uint32_t i = 0; i < num_threads; i++) {
writer_threads.push_back(std::thread(write_pcap_thread_function, pcap_handle.get(), tv, num_pdus_per_thread));
}
// wait for threads to finish
for (std::thread& thread : writer_threads) {
thread.join();
}
TESTASSERT(pcap_handle->close() == SRSLTE_SUCCESS);
TESTASSERT(pcap_handle->close() != 0); // closing twice will fail
return SRSLTE_SUCCESS;
}
Loading…
Cancel
Save