diff --git a/lib/include/srslte/adt/circular_buffer.h b/lib/include/srslte/adt/circular_buffer.h new file mode 100644 index 000000000..8c455bf55 --- /dev/null +++ b/lib/include/srslte/adt/circular_buffer.h @@ -0,0 +1,329 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2020 Software Radio Systems Limited + * + * By using this file, you agree to the terms and conditions set + * forth in the LICENSE file which can be found at the top level of + * the distribution. + * + */ + +#ifndef SRSLTE_CIRCULAR_BUFFER_H +#define SRSLTE_CIRCULAR_BUFFER_H + +#include "srslte/adt/expected.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace srslte { + +namespace detail { + +template +size_t get_max_size(const std::array& a) +{ + return a.max_size(); +} + +template +size_t get_max_size(const std::vector& a) +{ + return a.capacity(); +} + +template +class base_circular_buffer +{ + using T = typename Container::value_type; + +public: + using value_type = T; + using difference_type = ptrdiff_t; + + struct iterator { + iterator(base_circular_buffer& parent_, size_t i) : parent(&parent_), idx(i) {} + iterator& operator++() + { + idx = (idx + 1) % parent->max_size(); + return *this; + } + iterator operator++(int) + { + iterator tmp(*this); + ++(*this); + return tmp; + } + iterator operator+(difference_type n) + { + iterator tmp(*this); + tmp += n; + return tmp; + } + iterator& operator+=(difference_type n) + { + idx = (idx + n) % parent->max_size(); + return *this; + } + value_type* operator->() { return &parent->buffer[idx]; } + const value_type* operator->() const { return &parent->buffer[idx]; } + value_type& operator*() { return parent->buffer[idx]; } + const value_type& operator*() const { return parent->buffer[idx]; } + bool operator==(const iterator& it) const { return it.parent == parent and it.idx == idx; } + bool operator!=(const iterator& it) const { return not(*this == it); } + + private: + base_circular_buffer* parent; + size_t idx; + }; + + template + base_circular_buffer(Args&&... args) : buffer(std::forward(args)...) + {} + + void push(T&& t) + { + assert(not full()); + size_t wpos = (rpos + count) % max_size(); + buffer[wpos] = std::move(t); + count++; + } + void push(const T& t) + { + assert(not full()); + size_t wpos = (rpos + count) % max_size(); + buffer[wpos] = t; + count++; + } + void pop() + { + assert(not empty()); + rpos = (rpos + 1) % max_size(); + count--; + } + T& top() + { + assert(not empty()); + return buffer[rpos]; + } + const T& top() const + { + assert(not empty()); + return buffer[rpos]; + } + void clear() { count = 0; } + + bool full() const { return count == max_size(); } + bool empty() const { return count == 0; } + size_t size() const { return count; } + size_t max_size() const { return detail::get_max_size(buffer); } + + iterator begin() { return iterator(*this, rpos); } + iterator end() { return iterator(*this, (rpos + count) % max_size()); } + +private: + Container buffer; + size_t rpos = 0; + size_t count = 0; +}; + +template +class base_block_queue +{ + using T = typename CircBuffer::value_type; + +public: + template + base_block_queue(Args&&... args) : circ_buffer(std::forward(args)...) + {} + ~base_block_queue() { stop(); } + + void stop() + { + std::unique_lock lock(mutex); + if (active) { + active = false; + if (nof_waiting == 0) { + return; + } + do { + lock.unlock(); + cvar_empty.notify_all(); + cvar_full.notify_all(); + std::this_thread::yield(); + lock.lock(); + } while (nof_waiting > 0); + } + } + + bool try_push(const T& t) { return push_(t, false); } + srslte::error_type try_push(T&& t) { return push_(std::move(t), false); } + bool push(const T& t) { return push_(t, true); } + srslte::error_type push(T&& t) { return push_(std::move(t), true); } + bool try_pop(T& obj) { return pop_(obj, false); } + T pop() + { + T obj{}; + pop_(obj, true); + return obj; + } + void clear() + { + std::lock_guard lock(mutex); + T obj; + while (pop_(obj, false)) { + } + } + + size_t size() const + { + std::lock_guard lock(mutex); + return circ_buffer.size(); + } + size_t empty() const + { + std::lock_guard lock(mutex); + return circ_buffer.empty(); + } + size_t max_size() const + { + std::lock_guard lock(mutex); + return circ_buffer.max_size(); + } + bool is_stopped() const + { + std::lock_guard lock(mutex); + return not active; + } + +private: + bool active = true; + uint8_t nof_waiting = 0; + mutable std::mutex mutex; + std::condition_variable cvar_empty, cvar_full; + CircBuffer circ_buffer; + + bool push_(const T& t, bool block_mode) + { + std::unique_lock lock(mutex); + if (not active) { + return false; + } + if (circ_buffer.full()) { + if (not block_mode) { + return false; + } + nof_waiting++; + while (circ_buffer.full() and active) { + cvar_full.wait(lock); + } + nof_waiting--; + if (not active) { + return false; + } + } + circ_buffer.push(t); + lock.unlock(); + cvar_empty.notify_one(); + return true; + } + srslte::error_type push_(T&& t, bool block_mode) + { + std::unique_lock lock(mutex); + if (not active) { + return std::move(t); + } + if (circ_buffer.full()) { + if (not block_mode) { + return std::move(t); + } + nof_waiting++; + while (circ_buffer.full() and active) { + cvar_full.wait(lock); + } + nof_waiting--; + if (not active) { + return std::move(t); + } + } + circ_buffer.push(t); + lock.unlock(); + cvar_empty.notify_one(); + return {}; + } + + bool pop_(T& obj, bool block) + { + std::unique_lock lock(mutex); + if (not active) { + return false; + } + if (circ_buffer.empty()) { + if (not block) { + return false; + } + nof_waiting++; + while (circ_buffer.empty() and active) { + cvar_empty.wait(lock); + } + nof_waiting--; + if (not active) { + return false; + } + } + obj = std::move(circ_buffer.top()); + circ_buffer.pop(); + lock.unlock(); + cvar_full.notify_one(); + return true; + } +}; + +} // namespace detail + +template +class static_circular_buffer : public detail::base_circular_buffer > +{}; + +template +class dyn_circular_buffer : public detail::base_circular_buffer > +{ + using base_t = detail::base_circular_buffer >; + +public: + dyn_circular_buffer() = default; + explicit dyn_circular_buffer(size_t size) : base_t(size) {} + + void set_size(size_t size) + { + // Note: dynamic resizes not supported. + assert(base_t::empty()); + base_t::buffer.resize(size); + } +}; + +template +class static_block_queue : public detail::base_block_queue > +{}; + +template +class dyn_block_queue : public detail::base_block_queue > +{ + using base_t = detail::base_block_queue >; + +public: + dyn_block_queue() = default; + explicit dyn_block_queue(size_t size) : base_t(size) {} + void set_size(size_t size) { base_t::circ_buffer.set_size(size); } +}; + +} // namespace srslte + +#endif // SRSLTE_CIRCULAR_BUFFER_H diff --git a/lib/test/adt/CMakeLists.txt b/lib/test/adt/CMakeLists.txt index 357572300..0ccf53d06 100644 --- a/lib/test/adt/CMakeLists.txt +++ b/lib/test/adt/CMakeLists.txt @@ -41,3 +41,7 @@ add_test(bounded_vector_test bounded_vector_test) add_executable(mem_pool_test mem_pool_test.cc) target_link_libraries(mem_pool_test srslte_common) add_test(mem_pool_test mem_pool_test) + +add_executable(circular_buffer_test circular_buffer_test.cc) +target_link_libraries(circular_buffer_test srslte_common) +add_test(circular_buffer_test circular_buffer_test) diff --git a/lib/test/adt/circular_buffer_test.cc b/lib/test/adt/circular_buffer_test.cc new file mode 100644 index 000000000..5464f1be5 --- /dev/null +++ b/lib/test/adt/circular_buffer_test.cc @@ -0,0 +1,125 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2020 Software Radio Systems Limited + * + * By using this file, you agree to the terms and conditions set + * forth in the LICENSE file which can be found at the top level of + * the distribution. + * + */ + +#include "srslte/adt/circular_buffer.h" +#include "srslte/common/test_common.h" + +namespace srslte { + +int test_static_circular_buffer() +{ + static_circular_buffer circ_buffer; + TESTASSERT(circ_buffer.max_size() == 10); + TESTASSERT(circ_buffer.empty() and not circ_buffer.full() and circ_buffer.size() == 0); + + // push until full + for (size_t i = 0; i < circ_buffer.max_size(); ++i) { + TESTASSERT(circ_buffer.size() == i and not circ_buffer.full()); + circ_buffer.push(i); + TESTASSERT(not circ_buffer.empty()); + } + TESTASSERT(circ_buffer.size() == 10 and circ_buffer.full()); + + // test iterator + int count = 0; + for (int& it : circ_buffer) { + TESTASSERT(it == count); + count++; + } + TESTASSERT(*circ_buffer.begin() == circ_buffer.top()); + + // pop until empty + for (size_t i = 0; i < circ_buffer.max_size(); ++i) { + TESTASSERT(circ_buffer.size() == circ_buffer.max_size() - i and not circ_buffer.empty()); + TESTASSERT(circ_buffer.top() == (int)i); + circ_buffer.pop(); + } + TESTASSERT(circ_buffer.empty() and circ_buffer.size() == 0); + + // test iteration with wrap-around in memory + for (size_t i = 0; i < circ_buffer.max_size(); ++i) { + circ_buffer.push(i); + } + for (size_t i = 0; i < circ_buffer.max_size() / 2; ++i) { + circ_buffer.pop(); + } + circ_buffer.push(circ_buffer.max_size()); + circ_buffer.push(circ_buffer.max_size() + 1); + TESTASSERT(circ_buffer.size() == circ_buffer.max_size() / 2 + 2); + count = circ_buffer.max_size() / 2; + for (int& it : circ_buffer) { + TESTASSERT(it == count); + count++; + } + + return SRSLTE_SUCCESS; +} + +int test_queue_block_api() +{ + dyn_block_queue queue(100); + + std::thread t([&queue]() { + int count = 0; + while (true) { + int val = queue.pop(); + if (queue.is_stopped()) { + break; + } + assert(val == count); + count++; + } + }); + + for (int i = 0; i < 10000; ++i) { + queue.push(i); + } + + queue.stop(); + t.join(); + return SRSLTE_SUCCESS; +} + +int test_queue_block_api_2() +{ + std::thread t; + + { + dyn_block_queue queue(100); + + t = std::thread([&queue]() { + int count = 0; + while (queue.push(count++)) { + } + }); + + for (int i = 0; i < 10000; ++i) { + TESTASSERT(queue.pop() == i); + } + + // queue dtor called + } + + t.join(); + return SRSLTE_SUCCESS; +} + +} // namespace srslte + +int main() +{ + TESTASSERT(srslte::test_static_circular_buffer() == SRSLTE_SUCCESS); + TESTASSERT(srslte::test_queue_block_api() == SRSLTE_SUCCESS); + TESTASSERT(srslte::test_queue_block_api_2() == SRSLTE_SUCCESS); + srslte::console("Success\n"); + return SRSLTE_SUCCESS; +} \ No newline at end of file diff --git a/lib/test/common/CMakeLists.txt b/lib/test/common/CMakeLists.txt index 3c43defb3..69a5ba96d 100644 --- a/lib/test/common/CMakeLists.txt +++ b/lib/test/common/CMakeLists.txt @@ -51,7 +51,7 @@ target_link_libraries(bcd_helpers_test srslte_common) add_executable(stack_procedure_test stack_procedure_test.cc) add_test(stack_procedure_test stack_procedure_test) -add_executable(queue_test queue_test.cc) +add_executable(queue_test multiqueue_test.cc) target_link_libraries(queue_test srslte_common ${CMAKE_THREAD_LIBS_INIT}) add_test(queue_test queue_test) diff --git a/lib/test/common/queue_test.cc b/lib/test/common/multiqueue_test.cc similarity index 100% rename from lib/test/common/queue_test.cc rename to lib/test/common/multiqueue_test.cc