documentation,bugfix - added documentation to new blocking queues, changed class names, and removed blocking pushes from the critical path

master
Francisco 4 years ago committed by Francisco Paisana
parent d1236fd62f
commit f0ed1e06a8

@ -39,6 +39,14 @@ size_t get_max_size(const std::vector<T>& a)
return a.capacity();
}
/**
* Base common class for definition of circular buffer data structures with the following features:
* - no allocations while pushing/popping new elements. Just an internal index update
* - it provides helper methods to add/remove objects
* - it provides an iterator interface to iterate over added elements in the buffer
* - not thread-safe
* @tparam Container underlying container type used as buffer (e.g. std::array<T, N> or std::vector<T>)
*/
template <typename Container>
class base_circular_buffer
{
@ -85,9 +93,16 @@ public:
};
template <typename... Args>
base_circular_buffer(Args&&... args) : buffer(std::forward<Args>(args)...)
explicit base_circular_buffer(Args&&... args) : buffer(std::forward<Args>(args)...)
{}
bool try_push(T&& t)
{
if (full()) {
return false;
}
push(std::move(t));
}
void push(T&& t)
{
assert(not full());
@ -95,6 +110,13 @@ public:
buffer[wpos] = std::move(t);
count++;
}
bool try_push(const T& t)
{
if (full()) {
return false;
}
push(t);
}
void push(const T& t)
{
assert(not full());
@ -148,17 +170,26 @@ struct noop_operator {
}
};
/**
* Base common class for definition of blocking queue data structures with the following features:
* - it stores pushed/popped samples in an internal circular buffer
* - provides blocking and non-blocking push/pop APIs
* - thread-safe
* @tparam CircBuffer underlying circular buffer data type (e.g. static_circular_buffer<T, N> or dyn_circular_buffer<T>)
* @tparam PushingFunc function void(const T&) called while pushing an element to the queue
* @tparam PoppingFunc function void(const T&) called while popping an element from the queue
*/
template <typename CircBuffer, typename PushingFunc, typename PoppingFunc>
class base_block_queue
class base_blocking_queue
{
using T = typename CircBuffer::value_type;
public:
template <typename... Args>
base_block_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) :
base_blocking_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) :
circ_buffer(std::forward<Args>(args)...), push_func(push_func_), pop_func(pop_func_)
{}
~base_block_queue() { stop(); }
~base_blocking_queue() { stop(); }
void stop()
{
@ -323,10 +354,24 @@ protected:
} // namespace detail
/**
* Circular buffer with fixed, embedded buffer storage via a std::array<T, N>.
* - Single allocation at object creation for std::array. Given that the buffer size is known at compile-time, the
* circular iteration over the buffer may be more optimized (e.g. when N is a power of 2, % operator can be avoided)
* - not thread-safe
* @tparam T value type stored by buffer
* @tparam N size of the queue
*/
template <typename T, size_t N>
class static_circular_buffer : public detail::base_circular_buffer<std::array<T, N> >
{};
/**
* Circular buffer with buffer storage via a std::vector<T>.
* - size can be defined at run-time.
* - not thread-safe
* @tparam T value type stored by buffer
*/
template <typename T>
class dyn_circular_buffer : public detail::base_circular_buffer<std::vector<T> >
{
@ -344,31 +389,52 @@ public:
}
};
/**
* Blocking queue with fixed, embedded buffer storage via a std::array<T, N>.
* - Blocking push/pop API via push_blocking(...) and pop_blocking(...) methods
* - Non-blocking push/pop API via try_push(...) and try_pop(...) methods
* - Only one initial allocation for the std::array<T, N>
* - thread-safe
* @tparam T value type stored by buffer
* @tparam N size of queue
* @tparam PushingCallback function void(const T&) called while pushing an element to the queue
* @tparam PoppingCallback function void(const T&) called while popping an element from the queue
*/
template <typename T,
size_t N,
typename PushingCallback = detail::noop_operator,
typename PoppingCallback = detail::noop_operator>
class static_block_queue
: public detail::base_block_queue<static_circular_buffer<T, N>, PushingCallback, PoppingCallback>
class static_blocking_queue
: public detail::base_blocking_queue<static_circular_buffer<T, N>, PushingCallback, PoppingCallback>
{
using base_t = detail::base_block_queue<static_circular_buffer<T, N>, PushingCallback, PoppingCallback>;
using base_t = detail::base_blocking_queue<static_circular_buffer<T, N>, PushingCallback, PoppingCallback>;
public:
explicit static_block_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) :
explicit static_blocking_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) :
base_t(push_callback, pop_callback)
{}
};
/**
* Blocking queue with buffer storage represented via a std::vector<T>. Features:
* - Blocking push/pop API via push_blocking(...) and pop_blocking(...) methods
* - Non-blocking push/pop API via try_push(...) and try_pop(...) methods
* - Size can be defined at runtime.
* - thread-safe
* @tparam T value type stored by buffer
* @tparam PushingCallback function void(const T&) called while pushing an element to the queue
* @tparam PoppingCallback function void(const T&) called while popping an element from the queue
*/
template <typename T,
typename PushingCallback = detail::noop_operator,
typename PoppingCallback = detail::noop_operator>
class dyn_block_queue : public detail::base_block_queue<dyn_circular_buffer<T>, PushingCallback, PoppingCallback>
class dyn_blocking_queue : public detail::base_blocking_queue<dyn_circular_buffer<T>, PushingCallback, PoppingCallback>
{
using base_t = detail::base_block_queue<dyn_circular_buffer<T>, PushingCallback, PoppingCallback>;
using base_t = detail::base_blocking_queue<dyn_circular_buffer<T>, PushingCallback, PoppingCallback>;
public:
dyn_block_queue() = default;
explicit dyn_block_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) :
dyn_blocking_queue() = default;
explicit dyn_blocking_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) :
base_t(push_callback, pop_callback, size)
{}
void set_size(size_t size) { base_t::circ_buffer.set_size(size); }

@ -93,11 +93,11 @@ protected:
virtual void write_pdu(pcap_pdu_t& pdu) = 0;
void run_thread() final;
std::mutex mutex;
srslog::basic_logger& logger;
bool running = false;
static_block_queue<pcap_pdu_t, 256> queue;
uint16_t ue_id = 0;
std::mutex mutex;
srslog::basic_logger& logger;
bool running = false;
static_blocking_queue<pcap_pdu_t, 256> queue;
uint16_t ue_id = 0;
private:
void pack_and_queue(uint8_t* payload,

@ -61,8 +61,8 @@ private:
} pdu_t;
dyn_block_queue<pdu_t*> pdu_q;
buffer_pool<pdu_t> pool;
dyn_blocking_queue<pdu_t*> pdu_q;
buffer_pool<pdu_t> pool;
process_callback* callback;
srslog::basic_logger& logger;

@ -79,8 +79,8 @@ private:
uint32_t* unread_bytes;
};
dyn_block_queue<unique_byte_buffer_t, push_callback, pop_callback> queue;
uint32_t unread_bytes = 0;
dyn_blocking_queue<unique_byte_buffer_t, push_callback, pop_callback> queue;
uint32_t unread_bytes = 0;
};
} // namespace srslte

@ -303,8 +303,8 @@ private:
uint32_t nof_bytes;
} pdu_t;
static_block_queue<pdu_t, 256> rx_pdu_resume_queue;
static_block_queue<unique_byte_buffer_t, 256> tx_sdu_resume_queue;
static_blocking_queue<pdu_t, 256> rx_pdu_resume_queue;
static_blocking_queue<unique_byte_buffer_t, 256> tx_sdu_resume_queue;
};
} // namespace srslte

@ -84,7 +84,9 @@ void mac_pcap_base::pack_and_queue(uint8_t* payload,
// copy payload into PDU buffer
memcpy(pdu.pdu->msg, payload, payload_len);
pdu.pdu->N_bytes = payload_len;
queue.push_blocking(std::move(pdu));
if (not queue.try_push(std::move(pdu))) {
logger.error("Failed to push message to pcap writer queue");
}
} else {
logger.info("Dropping PDU in PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len);
}
@ -119,7 +121,9 @@ void mac_pcap_base::pack_and_queue_nr(uint8_t* payload,
// copy payload into PDU buffer
memcpy(pdu.pdu->msg, payload, payload_len);
pdu.pdu->N_bytes = payload_len;
queue.push_blocking(std::move(pdu));
if (not queue.try_push(std::move(pdu))) {
logger.error("Failed to push message to pcap writer queue");
}
} else {
logger.info("Dropping PDU in NR PCAP. No buffer available or not enough space (pdu_len=%d).", payload_len);
}

@ -66,7 +66,7 @@ int test_static_circular_buffer()
int test_queue_block_api()
{
dyn_block_queue<int> queue(100);
dyn_blocking_queue<int> queue(100);
std::thread t([&queue]() {
int count = 0;
@ -94,7 +94,7 @@ int test_queue_block_api_2()
std::thread t;
{
dyn_block_queue<int> queue(100);
dyn_blocking_queue<int> queue(100);
t = std::thread([&queue]() {
int count = 0;

@ -162,7 +162,7 @@ private:
// state
bool started = false;
srslte::dyn_block_queue<stack_metrics_t> pending_stack_metrics;
srslte::dyn_blocking_queue<stack_metrics_t> pending_stack_metrics;
};
} // namespace srsenb

@ -140,8 +140,8 @@ private:
std::map<uint16_t, std::unique_ptr<ue> > ue_db, ues_to_rem;
uint16_t last_rnti = 70;
srslte::static_block_queue<std::unique_ptr<ue>, 32> ue_pool; ///< Pool of pre-allocated UE objects
void prealloc_ue(uint32_t nof_ue);
srslte::static_blocking_queue<std::unique_ptr<ue>, 32> ue_pool; ///< Pool of pre-allocated UE objects
void prealloc_ue(uint32_t nof_ue);
uint8_t* assemble_rar(sched_interface::dl_sched_rar_grant_t* grants,
uint32_t enb_cc_idx,

@ -189,8 +189,8 @@ private:
const static uint32_t LCID_ACT_USER = 0xffff0004;
const static uint32_t LCID_RTX_USER = 0xffff0005;
bool running = false;
srslte::dyn_block_queue<rrc_pdu> rx_pdu_queue;
bool running = false;
srslte::dyn_blocking_queue<rrc_pdu> rx_pdu_queue;
asn1::rrc::mcch_msg_s mcch;
bool enable_mbms = false;

Loading…
Cancel
Save