From be771e5c23cec826a75466e4731d20d0b295ded2 Mon Sep 17 00:00:00 2001 From: Francisco Date: Thu, 25 Mar 2021 15:54:15 +0000 Subject: [PATCH] performance - use of new concurrent fixed size pool for byte_buffer pool --- lib/include/srsran/adt/pool/fixed_size_pool.h | 84 ++++++++++++++++--- lib/include/srsran/common/buffer_pool.h | 58 +------------ lib/include/srsran/common/byte_buffer.h | 3 - lib/src/common/byte_buffer.cc | 6 +- lib/test/adt/mem_pool_test.cc | 22 +++-- 5 files changed, 94 insertions(+), 79 deletions(-) diff --git a/lib/include/srsran/adt/pool/fixed_size_pool.h b/lib/include/srsran/adt/pool/fixed_size_pool.h index 32a058878..e32de3ee4 100644 --- a/lib/include/srsran/adt/pool/fixed_size_pool.h +++ b/lib/include/srsran/adt/pool/fixed_size_pool.h @@ -15,7 +15,6 @@ #include "memblock_cache.h" #include "srsran/adt/circular_buffer.h" -#include "srsran/adt/singleton.h" #include namespace srsran { @@ -31,10 +30,9 @@ namespace srsran { * @tparam NofObjects number of objects in the pool * @tparam ObjSize object size */ -template -class concurrent_fixed_memory_pool : public singleton_t > +template +class concurrent_fixed_memory_pool { - static_assert(NofObjects > 256, "This pool is particularly designed for a high number of objects"); static_assert(ObjSize > 256, "This pool is particularly designed for large objects"); struct obj_storage_t { @@ -45,20 +43,41 @@ class concurrent_fixed_memory_pool : public singleton_t 0, "A positive pool size must be provided"); + + std::lock_guard lock(mutex); + allocated_blocks.resize(nof_objects_); for (std::unique_ptr& b : allocated_blocks) { b.reset(new obj_storage_t(std::this_thread::get_id())); srsran_assert(b.get() != nullptr, "Failed to instantiate fixed memory pool"); shared_mem_cache.push(static_cast(b.get())); } + max_objs_per_cache = allocated_blocks.size() / 16; + max_objs_per_cache = max_objs_per_cache < batch_steal_size ? batch_steal_size : max_objs_per_cache; } public: - static size_t size() { return NofObjects; } + concurrent_fixed_memory_pool(const concurrent_fixed_memory_pool&) = delete; + concurrent_fixed_memory_pool(concurrent_fixed_memory_pool&&) = delete; + concurrent_fixed_memory_pool& operator=(const concurrent_fixed_memory_pool&) = delete; + concurrent_fixed_memory_pool& operator=(concurrent_fixed_memory_pool&&) = delete; + + ~concurrent_fixed_memory_pool() + { + std::lock_guard lock(mutex); + allocated_blocks.clear(); + } + + static concurrent_fixed_memory_pool* get_instance(size_t size = 4096) + { + static concurrent_fixed_memory_pool pool(size); + return &pool; + } + + size_t size() { return allocated_blocks.size(); } void* allocate_node(size_t sz) { @@ -76,6 +95,12 @@ public: } node = worker_cache->try_pop(); } + +#ifdef SRSRAN_BUFFER_POOL_LOG_ENABLED + if (node == nullptr) { + print_error("Error allocating buffer in pool of ObjSize=%zd", ObjSize); + } +#endif return node; } @@ -85,7 +110,16 @@ public: memblock_cache* worker_cache = get_worker_cache(); obj_storage_t* block_ptr = static_cast(p); - if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= MaxWorkerCacheSize) { + if (DebugSanitizeAddress) { + std::lock_guard lock(mutex); + srsran_assert(std::any_of(allocated_blocks.begin(), + allocated_blocks.end(), + [block_ptr](const std::unique_ptr& b) { return b.get() == block_ptr; }), + "Error deallocating block with address 0x%lx.", + (long unsigned)block_ptr); + } + + if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= max_objs_per_cache) { // if block was allocated in a different thread or local cache reached max capacity, send block to shared // container shared_mem_cache.push(static_cast(block_ptr)); @@ -96,6 +130,22 @@ public: worker_cache->push(static_cast(p)); } + void enable_logger(bool enabled) + { + if (enabled) { + logger = &srslog::fetch_basic_logger("POOL"); + logger->set_level(srslog::basic_levels::debug); + } else { + logger = nullptr; + } + } + + void print_all_buffers() + { + std::lock_guard lock(mutex); + printf("There are %zd/%zd buffers in shared block container.\n", shared_mem_cache.size(), allocated_blocks.size()); + } + private: memblock_cache* get_worker_cache() { @@ -103,6 +153,20 @@ private: return &worker_cache; } + /// Formats and prints the input string and arguments into the configured output stream. + template + void print_error(const char* str, Args&&... args) + { + if (logger != nullptr) { + logger->error(str, std::forward(args)...); + } else { + fmt::printf(std::string(str) + "\n", std::forward(args)...); + } + } + + size_t max_objs_per_cache = 0; + srslog::basic_logger* logger = nullptr; + mutexed_memblock_cache shared_mem_cache; std::mutex mutex; std::vector > allocated_blocks; diff --git a/lib/include/srsran/common/buffer_pool.h b/lib/include/srsran/common/buffer_pool.h index 8642bb01f..7f0f3c9e8 100644 --- a/lib/include/srsran/common/buffer_pool.h +++ b/lib/include/srsran/common/buffer_pool.h @@ -21,10 +21,7 @@ #include #include -/******************************************************************************* - INCLUDES -*******************************************************************************/ - +#include "srsran/adt/pool/fixed_size_pool.h" #include "srsran/common/common.h" #include "srsran/srslog/srslog.h" @@ -165,58 +162,7 @@ private: uint32_t capacity; }; -class byte_buffer_pool -{ - using mem_chunk = typename std::aligned_storage::type; - -public: - // Singleton static methods - static byte_buffer_pool* get_instance(int capacity = -1) - { - static std::unique_ptr instance(new byte_buffer_pool(capacity)); - return instance.get(); - } - byte_buffer_pool(int capacity = -1) : pool(capacity) {} - byte_buffer_pool(const byte_buffer_pool& other) = delete; - byte_buffer_pool(byte_buffer_pool&& other) = delete; - byte_buffer_pool& operator=(const byte_buffer_pool& other) = delete; - byte_buffer_pool& operator=(byte_buffer_pool&& other) = delete; - void* allocate(const char* debug_name = nullptr, bool blocking = false) - { - return pool.allocate(debug_name, blocking); - } - void enable_logger(bool enabled) { print_to_log = enabled; } - void deallocate(void* b) - { - if (!b) { - return; - } - if (!pool.deallocate(static_cast(b))) { -#ifdef SRSRAN_BUFFER_POOL_LOG_ENABLED - print_error("Error deallocating PDU: Addr=0x%p, name=%s not found in pool", (void*)b, b->debug_name); -#else - print_error("Error deallocating PDU: Addr=0x%p", (void*)b); -#endif - } - } - void print_all_buffers() { pool.print_all_buffers(); } - -private: - /// Formats and prints the input string and arguments into the configured output stream. - template - void print_error(const char* str, Args&&... args) - { - if (print_to_log) { - srslog::fetch_basic_logger("POOL", false).error(str, std::forward(args)...); - } else { - fmt::printf(std::string(str) + "\n", std::forward(args)...); - } - } - -private: - bool print_to_log = false; - buffer_pool pool; -}; +using byte_buffer_pool = concurrent_fixed_memory_pool; inline unique_byte_buffer_t make_byte_buffer() noexcept { diff --git a/lib/include/srsran/common/byte_buffer.h b/lib/include/srsran/common/byte_buffer.h index 47042c724..dfa6dd8dd 100644 --- a/lib/include/srsran/common/byte_buffer.h +++ b/lib/include/srsran/common/byte_buffer.h @@ -197,9 +197,6 @@ struct bit_buffer_t { uint32_t get_headroom() { return msg - buffer; } }; -// Create a Managed Life-Time Byte Buffer -class byte_buffer_pool; - using unique_byte_buffer_t = std::unique_ptr; /// diff --git a/lib/src/common/byte_buffer.cc b/lib/src/common/byte_buffer.cc index 991a06036..fb281efee 100644 --- a/lib/src/common/byte_buffer.cc +++ b/lib/src/common/byte_buffer.cc @@ -18,13 +18,13 @@ namespace srsran { void* byte_buffer_t::operator new(size_t sz, const std::nothrow_t& nothrow_value) noexcept { assert(sz == sizeof(byte_buffer_t)); - return byte_buffer_pool::get_instance()->allocate(nullptr, false); + return byte_buffer_pool::get_instance()->allocate_node(sz); } void* byte_buffer_t::operator new(size_t sz) { assert(sz == sizeof(byte_buffer_t)); - void* ptr = byte_buffer_pool::get_instance()->allocate(nullptr, false); + void* ptr = byte_buffer_pool::get_instance()->allocate_node(sz); if (ptr == nullptr) { throw std::bad_alloc(); } @@ -33,7 +33,7 @@ void* byte_buffer_t::operator new(size_t sz) void byte_buffer_t::operator delete(void* ptr) { - byte_buffer_pool::get_instance()->deallocate(ptr); + byte_buffer_pool::get_instance()->deallocate_node(ptr); } } // namespace srsran diff --git a/lib/test/adt/mem_pool_test.cc b/lib/test/adt/mem_pool_test.cc index 4e94df408..cedb2cc6f 100644 --- a/lib/test/adt/mem_pool_test.cc +++ b/lib/test/adt/mem_pool_test.cc @@ -80,7 +80,7 @@ struct BigObj { C c; std::array space; - using pool_t = srsran::concurrent_fixed_memory_pool<1024, 512>; + using pool_t = srsran::concurrent_fixed_memory_pool<512, true>; void* operator new(size_t sz) { @@ -97,9 +97,12 @@ struct BigObj { void test_fixedsize_pool() { + size_t pool_size = 1024; + auto* fixed_pool = BigObj::pool_t::get_instance(pool_size); + fixed_pool->print_all_buffers(); { - std::vector > vec(BigObj::pool_t::size()); - for (size_t i = 0; i < BigObj::pool_t::size(); ++i) { + std::vector > vec(pool_size); + for (size_t i = 0; i < pool_size; ++i) { vec[i].reset(new BigObj()); TESTASSERT(vec[i].get() != nullptr); } @@ -109,32 +112,37 @@ void test_fixedsize_pool() obj = std::unique_ptr(new (std::nothrow) BigObj()); TESTASSERT(obj != nullptr); obj.reset(); + fixed_pool->print_all_buffers(); } + fixed_pool->print_all_buffers(); // TEST: one thread allocates, and the other deallocates { std::unique_ptr obj; std::atomic stop(false); - srsran::dyn_blocking_queue > queue(BigObj::pool_t::size() / 2); + srsran::dyn_blocking_queue > queue(pool_size / 2); std::thread t([&queue, &stop]() { while (not stop.load(std::memory_order_relaxed)) { std::unique_ptr obj(new (std::nothrow) BigObj()); TESTASSERT(obj != nullptr); - queue.push_blocking(std::move(obj)); + queue.try_push(std::move(obj)); } }); - for (size_t i = 0; i < BigObj::pool_t::size() * 8; ++i) { + for (size_t i = 0; i < pool_size * 8; ++i) { obj = queue.pop_blocking(); TESTASSERT(obj != nullptr); } stop.store(true); t.join(); } + fixed_pool->print_all_buffers(); } -int main() +int main(int argc, char** argv) { + srsran::test_init(argc, argv); + TESTASSERT(test_nontrivial_obj_pool() == SRSRAN_SUCCESS); test_fixedsize_pool();