converted byte_buffer_queue to use new circular buffer-based queue to avoid mallocs

master
Francisco 4 years ago committed by Francisco Paisana
parent 822e26b63f
commit 0ba93d274f

@ -128,20 +128,35 @@ public:
iterator begin() { return iterator(*this, rpos); } iterator begin() { return iterator(*this, rpos); }
iterator end() { return iterator(*this, (rpos + count) % max_size()); } iterator end() { return iterator(*this, (rpos + count) % max_size()); }
template <typename = std::enable_if<std::is_same<Container, std::vector<T> >::value> >
void set_size(size_t size)
{
buffer.resize(size);
}
private: private:
Container buffer; Container buffer;
size_t rpos = 0; size_t rpos = 0;
size_t count = 0; size_t count = 0;
}; };
template <typename CircBuffer> struct noop_operator {
template <typename T>
void operator()(const T&)
{
// noop
}
};
template <typename CircBuffer, typename PushingFunc, typename PoppingFunc>
class base_block_queue class base_block_queue
{ {
using T = typename CircBuffer::value_type; using T = typename CircBuffer::value_type;
public: public:
template <typename... Args> template <typename... Args>
base_block_queue(Args&&... args) : circ_buffer(std::forward<Args>(args)...) base_block_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) :
circ_buffer(std::forward<Args>(args)...), push_func(push_func_), pop_func(pop_func_)
{} {}
~base_block_queue() { stop(); } ~base_block_queue() { stop(); }
@ -165,10 +180,10 @@ public:
bool try_push(const T& t) { return push_(t, false); } bool try_push(const T& t) { return push_(t, false); }
srslte::error_type<T> try_push(T&& t) { return push_(std::move(t), false); } srslte::error_type<T> try_push(T&& t) { return push_(std::move(t), false); }
bool push(const T& t) { return push_(t, true); } bool push_blocking(const T& t) { return push_(t, true); }
srslte::error_type<T> push(T&& t) { return push_(std::move(t), true); } srslte::error_type<T> push_blocking(T&& t) { return push_(std::move(t), true); }
bool try_pop(T& obj) { return pop_(obj, false); } bool try_pop(T& obj) { return pop_(obj, false); }
T pop() T pop_blocking()
{ {
T obj{}; T obj{};
pop_(obj, true); pop_(obj, true);
@ -187,11 +202,16 @@ public:
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
return circ_buffer.size(); return circ_buffer.size();
} }
size_t empty() const bool empty() const
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
return circ_buffer.empty(); return circ_buffer.empty();
} }
bool full() const
{
std::lock_guard<std::mutex> lock(mutex);
return circ_buffer.full();
}
size_t max_size() const size_t max_size() const
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -202,12 +222,24 @@ public:
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
return not active; return not active;
} }
template <typename F>
bool try_call_on_front(F&& f)
{
std::lock_guard<std::mutex> lock(mutex);
if (not circ_buffer.empty()) {
f(circ_buffer.top());
return true;
}
return false;
}
private: protected:
bool active = true; bool active = true;
uint8_t nof_waiting = 0; uint8_t nof_waiting = 0;
mutable std::mutex mutex; mutable std::mutex mutex;
std::condition_variable cvar_empty, cvar_full; std::condition_variable cvar_empty, cvar_full;
PushingFunc push_func;
PoppingFunc pop_func;
CircBuffer circ_buffer; CircBuffer circ_buffer;
bool push_(const T& t, bool block_mode) bool push_(const T& t, bool block_mode)
@ -229,6 +261,7 @@ private:
return false; return false;
} }
} }
push_func(t);
circ_buffer.push(t); circ_buffer.push(t);
lock.unlock(); lock.unlock();
cvar_empty.notify_one(); cvar_empty.notify_one();
@ -253,7 +286,8 @@ private:
return std::move(t); return std::move(t);
} }
} }
circ_buffer.push(t); push_func(t);
circ_buffer.push(std::move(t));
lock.unlock(); lock.unlock();
cvar_empty.notify_one(); cvar_empty.notify_one();
return {}; return {};
@ -279,6 +313,7 @@ private:
} }
} }
obj = std::move(circ_buffer.top()); obj = std::move(circ_buffer.top());
pop_func(obj);
circ_buffer.pop(); circ_buffer.pop();
lock.unlock(); lock.unlock();
cvar_full.notify_one(); cvar_full.notify_one();
@ -305,22 +340,37 @@ public:
{ {
// Note: dynamic resizes not supported. // Note: dynamic resizes not supported.
assert(base_t::empty()); assert(base_t::empty());
base_t::buffer.resize(size); base_t::set_size(size);
} }
}; };
template <typename T, size_t N> template <typename T,
class static_block_queue : public detail::base_block_queue<static_circular_buffer<T, N> > size_t N,
{}; typename PushingCallback = detail::noop_operator,
typename PoppingCallback = detail::noop_operator>
class static_block_queue
: public detail::base_block_queue<static_circular_buffer<T, N>, PushingCallback, PoppingCallback>
{
using base_t = detail::base_block_queue<static_circular_buffer<T, N>, PushingCallback, PoppingCallback>;
template <typename T> public:
class dyn_block_queue : public detail::base_block_queue<dyn_circular_buffer<T> > explicit static_block_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) :
base_t(push_callback, pop_callback)
{}
};
template <typename T,
typename PushingCallback = detail::noop_operator,
typename PoppingCallback = detail::noop_operator>
class dyn_block_queue : public detail::base_block_queue<dyn_circular_buffer<T>, PushingCallback, PoppingCallback>
{ {
using base_t = detail::base_block_queue<dyn_circular_buffer<T> >; using base_t = detail::base_block_queue<dyn_circular_buffer<T>, PushingCallback, PoppingCallback>;
public: public:
dyn_block_queue() = default; dyn_block_queue() = default;
explicit dyn_block_queue(size_t size) : base_t(size) {} explicit dyn_block_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) :
base_t(push_callback, pop_callback, size)
{}
void set_size(size_t size) { base_t::circ_buffer.set_size(size); } void set_size(size_t size) { base_t::circ_buffer.set_size(size); }
}; };

@ -21,51 +21,43 @@
#ifndef SRSLTE_BYTE_BUFFERQUEUE_H #ifndef SRSLTE_BYTE_BUFFERQUEUE_H
#define SRSLTE_BYTE_BUFFERQUEUE_H #define SRSLTE_BYTE_BUFFERQUEUE_H
#include "srslte/adt/circular_buffer.h"
#include "srslte/common/block_queue.h" #include "srslte/common/block_queue.h"
#include "srslte/common/common.h" #include "srslte/common/common.h"
#include <pthread.h> #include <pthread.h>
namespace srslte { namespace srslte {
class byte_buffer_queue : public block_queue<unique_byte_buffer_t>::call_mutexed_itf class byte_buffer_queue
{ {
public: public:
byte_buffer_queue(int capacity = 128) : queue(capacity) { queue.set_mutexed_itf(this); } byte_buffer_queue(int capacity = 128) : queue(capacity, push_callback(unread_bytes), pop_callback(unread_bytes)) {}
// increase/decrease unread_bytes inside push/pop mutexed operations
void pushing(const unique_byte_buffer_t& msg) final { unread_bytes += msg->N_bytes; } void write(unique_byte_buffer_t msg) { queue.push_blocking(std::move(msg)); }
void popping(const unique_byte_buffer_t& msg) final
{
if (unread_bytes > msg->N_bytes) {
unread_bytes -= msg->N_bytes;
} else {
unread_bytes = 0;
}
}
void write(unique_byte_buffer_t msg) { queue.push(std::move(msg)); }
srslte::error_type<unique_byte_buffer_t> try_write(unique_byte_buffer_t&& msg) srslte::error_type<unique_byte_buffer_t> try_write(unique_byte_buffer_t&& msg)
{ {
return queue.try_push(std::move(msg)); return queue.try_push(std::move(msg));
} }
unique_byte_buffer_t read() { return queue.wait_pop(); } unique_byte_buffer_t read() { return queue.pop_blocking(); }
bool try_read(unique_byte_buffer_t* msg) { return queue.try_pop(msg); } bool try_read(unique_byte_buffer_t* msg) { return queue.try_pop(*msg); }
void resize(uint32_t capacity) { queue.resize(capacity); } void resize(uint32_t capacity) { queue.set_size(capacity); }
uint32_t size() { return (uint32_t)queue.size(); } uint32_t size() { return (uint32_t)queue.size(); }
uint32_t size_bytes() { return unread_bytes; } uint32_t size_bytes() { return unread_bytes; }
uint32_t size_tail_bytes() uint32_t size_tail_bytes()
{ {
if (!queue.empty()) { uint32_t size_next = 0;
const unique_byte_buffer_t& m = queue.front(); queue.try_call_on_front([&size_next](const unique_byte_buffer_t& front_val) {
if (m.get()) { if (front_val != nullptr) {
return m->N_bytes; size_next += front_val->N_bytes;
} }
} });
return 0; return size_next;
} }
// This is a hack to reset N_bytes counter when queue is corrupted (see line 89) // This is a hack to reset N_bytes counter when queue is corrupted (see line 89)
@ -76,7 +68,18 @@ public:
bool is_full() { return queue.full(); } bool is_full() { return queue.full(); }
private: private:
block_queue<unique_byte_buffer_t> queue; struct push_callback {
explicit push_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {}
void operator()(const unique_byte_buffer_t& msg) { *unread_bytes += msg->N_bytes; }
uint32_t* unread_bytes;
};
struct pop_callback {
explicit pop_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {}
void operator()(const unique_byte_buffer_t& msg) { *unread_bytes -= std::min(msg->N_bytes, *unread_bytes); }
uint32_t* unread_bytes;
};
dyn_block_queue<unique_byte_buffer_t, push_callback, pop_callback> queue;
uint32_t unread_bytes = 0; uint32_t unread_bytes = 0;
}; };

@ -71,7 +71,7 @@ int test_queue_block_api()
std::thread t([&queue]() { std::thread t([&queue]() {
int count = 0; int count = 0;
while (true) { while (true) {
int val = queue.pop(); int val = queue.pop_blocking();
if (queue.is_stopped()) { if (queue.is_stopped()) {
break; break;
} }
@ -81,7 +81,7 @@ int test_queue_block_api()
}); });
for (int i = 0; i < 10000; ++i) { for (int i = 0; i < 10000; ++i) {
queue.push(i); queue.push_blocking(i);
} }
queue.stop(); queue.stop();
@ -98,12 +98,12 @@ int test_queue_block_api_2()
t = std::thread([&queue]() { t = std::thread([&queue]() {
int count = 0; int count = 0;
while (queue.push(count++)) { while (queue.push_blocking(count++)) {
} }
}); });
for (int i = 0; i < 10000; ++i) { for (int i = 0; i < 10000; ++i) {
TESTASSERT(queue.pop() == i); TESTASSERT(queue.pop_blocking() == i);
} }
// queue dtor called // queue dtor called

Loading…
Cancel
Save