diff --git a/lib/include/srslte/adt/circular_buffer.h b/lib/include/srslte/adt/circular_buffer.h index 8c455bf55..4b16f3898 100644 --- a/lib/include/srslte/adt/circular_buffer.h +++ b/lib/include/srslte/adt/circular_buffer.h @@ -128,20 +128,35 @@ public: iterator begin() { return iterator(*this, rpos); } iterator end() { return iterator(*this, (rpos + count) % max_size()); } + template >::value> > + void set_size(size_t size) + { + buffer.resize(size); + } + private: Container buffer; size_t rpos = 0; size_t count = 0; }; -template +struct noop_operator { + template + void operator()(const T&) + { + // noop + } +}; + +template class base_block_queue { using T = typename CircBuffer::value_type; public: template - base_block_queue(Args&&... args) : circ_buffer(std::forward(args)...) + base_block_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) : + circ_buffer(std::forward(args)...), push_func(push_func_), pop_func(pop_func_) {} ~base_block_queue() { stop(); } @@ -165,10 +180,10 @@ public: bool try_push(const T& t) { return push_(t, false); } srslte::error_type try_push(T&& t) { return push_(std::move(t), false); } - bool push(const T& t) { return push_(t, true); } - srslte::error_type push(T&& t) { return push_(std::move(t), true); } + bool push_blocking(const T& t) { return push_(t, true); } + srslte::error_type push_blocking(T&& t) { return push_(std::move(t), true); } bool try_pop(T& obj) { return pop_(obj, false); } - T pop() + T pop_blocking() { T obj{}; pop_(obj, true); @@ -187,11 +202,16 @@ public: std::lock_guard lock(mutex); return circ_buffer.size(); } - size_t empty() const + bool empty() const { std::lock_guard lock(mutex); return circ_buffer.empty(); } + bool full() const + { + std::lock_guard lock(mutex); + return circ_buffer.full(); + } size_t max_size() const { std::lock_guard lock(mutex); @@ -202,12 +222,24 @@ public: std::lock_guard lock(mutex); return not active; } + template + bool try_call_on_front(F&& f) + { + std::lock_guard lock(mutex); + if (not circ_buffer.empty()) { + f(circ_buffer.top()); + return true; + } + return false; + } -private: +protected: bool active = true; uint8_t nof_waiting = 0; mutable std::mutex mutex; std::condition_variable cvar_empty, cvar_full; + PushingFunc push_func; + PoppingFunc pop_func; CircBuffer circ_buffer; bool push_(const T& t, bool block_mode) @@ -229,6 +261,7 @@ private: return false; } } + push_func(t); circ_buffer.push(t); lock.unlock(); cvar_empty.notify_one(); @@ -253,7 +286,8 @@ private: return std::move(t); } } - circ_buffer.push(t); + push_func(t); + circ_buffer.push(std::move(t)); lock.unlock(); cvar_empty.notify_one(); return {}; @@ -279,6 +313,7 @@ private: } } obj = std::move(circ_buffer.top()); + pop_func(obj); circ_buffer.pop(); lock.unlock(); cvar_full.notify_one(); @@ -305,22 +340,37 @@ public: { // Note: dynamic resizes not supported. assert(base_t::empty()); - base_t::buffer.resize(size); + base_t::set_size(size); } }; -template -class static_block_queue : public detail::base_block_queue > -{}; +template +class static_block_queue + : public detail::base_block_queue, PushingCallback, PoppingCallback> +{ + using base_t = detail::base_block_queue, PushingCallback, PoppingCallback>; -template -class dyn_block_queue : public detail::base_block_queue > +public: + explicit static_block_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : + base_t(push_callback, pop_callback) + {} +}; + +template +class dyn_block_queue : public detail::base_block_queue, PushingCallback, PoppingCallback> { - using base_t = detail::base_block_queue >; + using base_t = detail::base_block_queue, PushingCallback, PoppingCallback>; public: dyn_block_queue() = default; - explicit dyn_block_queue(size_t size) : base_t(size) {} + explicit dyn_block_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : + base_t(push_callback, pop_callback, size) + {} void set_size(size_t size) { base_t::circ_buffer.set_size(size); } }; diff --git a/lib/include/srslte/upper/byte_buffer_queue.h b/lib/include/srslte/upper/byte_buffer_queue.h index 4013afe91..db8b050ed 100644 --- a/lib/include/srslte/upper/byte_buffer_queue.h +++ b/lib/include/srslte/upper/byte_buffer_queue.h @@ -21,51 +21,43 @@ #ifndef SRSLTE_BYTE_BUFFERQUEUE_H #define SRSLTE_BYTE_BUFFERQUEUE_H +#include "srslte/adt/circular_buffer.h" #include "srslte/common/block_queue.h" #include "srslte/common/common.h" #include namespace srslte { -class byte_buffer_queue : public block_queue::call_mutexed_itf +class byte_buffer_queue { public: - byte_buffer_queue(int capacity = 128) : queue(capacity) { queue.set_mutexed_itf(this); } - // increase/decrease unread_bytes inside push/pop mutexed operations - void pushing(const unique_byte_buffer_t& msg) final { unread_bytes += msg->N_bytes; } - void popping(const unique_byte_buffer_t& msg) final - { - if (unread_bytes > msg->N_bytes) { - unread_bytes -= msg->N_bytes; - } else { - unread_bytes = 0; - } - } - void write(unique_byte_buffer_t msg) { queue.push(std::move(msg)); } + byte_buffer_queue(int capacity = 128) : queue(capacity, push_callback(unread_bytes), pop_callback(unread_bytes)) {} + + void write(unique_byte_buffer_t msg) { queue.push_blocking(std::move(msg)); } srslte::error_type try_write(unique_byte_buffer_t&& msg) { return queue.try_push(std::move(msg)); } - unique_byte_buffer_t read() { return queue.wait_pop(); } + unique_byte_buffer_t read() { return queue.pop_blocking(); } - bool try_read(unique_byte_buffer_t* msg) { return queue.try_pop(msg); } + bool try_read(unique_byte_buffer_t* msg) { return queue.try_pop(*msg); } - void resize(uint32_t capacity) { queue.resize(capacity); } + void resize(uint32_t capacity) { queue.set_size(capacity); } uint32_t size() { return (uint32_t)queue.size(); } uint32_t size_bytes() { return unread_bytes; } uint32_t size_tail_bytes() { - if (!queue.empty()) { - const unique_byte_buffer_t& m = queue.front(); - if (m.get()) { - return m->N_bytes; + uint32_t size_next = 0; + queue.try_call_on_front([&size_next](const unique_byte_buffer_t& front_val) { + if (front_val != nullptr) { + size_next += front_val->N_bytes; } - } - return 0; + }); + return size_next; } // This is a hack to reset N_bytes counter when queue is corrupted (see line 89) @@ -76,8 +68,19 @@ public: bool is_full() { return queue.full(); } private: - block_queue queue; - uint32_t unread_bytes = 0; + struct push_callback { + explicit push_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {} + void operator()(const unique_byte_buffer_t& msg) { *unread_bytes += msg->N_bytes; } + uint32_t* unread_bytes; + }; + struct pop_callback { + explicit pop_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {} + void operator()(const unique_byte_buffer_t& msg) { *unread_bytes -= std::min(msg->N_bytes, *unread_bytes); } + uint32_t* unread_bytes; + }; + + dyn_block_queue queue; + uint32_t unread_bytes = 0; }; } // namespace srslte diff --git a/lib/test/adt/circular_buffer_test.cc b/lib/test/adt/circular_buffer_test.cc index 5464f1be5..cc7b4d3c7 100644 --- a/lib/test/adt/circular_buffer_test.cc +++ b/lib/test/adt/circular_buffer_test.cc @@ -71,7 +71,7 @@ int test_queue_block_api() std::thread t([&queue]() { int count = 0; while (true) { - int val = queue.pop(); + int val = queue.pop_blocking(); if (queue.is_stopped()) { break; } @@ -81,7 +81,7 @@ int test_queue_block_api() }); for (int i = 0; i < 10000; ++i) { - queue.push(i); + queue.push_blocking(i); } queue.stop(); @@ -98,12 +98,12 @@ int test_queue_block_api_2() t = std::thread([&queue]() { int count = 0; - while (queue.push(count++)) { + while (queue.push_blocking(count++)) { } }); for (int i = 0; i < 10000; ++i) { - TESTASSERT(queue.pop() == i); + TESTASSERT(queue.pop_blocking() == i); } // queue dtor called