From e1523692c2aab80623340ed5028b5ff1b12f3428 Mon Sep 17 00:00:00 2001 From: Francisco Date: Thu, 25 Mar 2021 14:18:00 +0000 Subject: [PATCH] implementation of concurrent fixed size pool that leverages thread local caches to avoid mutexing --- lib/include/srsran/adt/detail/type_storage.h | 12 ++ lib/include/srsran/adt/pool/fixed_size_pool.h | 113 +++++++++++++ lib/include/srsran/adt/{ => pool}/mem_pool.h | 124 +------------- lib/include/srsran/adt/pool/memblock_cache.h | 151 ++++++++++++++++++ .../srsran/{common => adt}/singleton.h | 0 lib/test/adt/mem_pool_test.cc | 62 ++++++- srsenb/hdr/stack/rrc/rrc_ue.h | 2 +- srsenb/src/stack/rrc/rrc_ue.cc | 2 +- 8 files changed, 344 insertions(+), 122 deletions(-) create mode 100644 lib/include/srsran/adt/pool/fixed_size_pool.h rename lib/include/srsran/adt/{ => pool}/mem_pool.h (64%) create mode 100644 lib/include/srsran/adt/pool/memblock_cache.h rename lib/include/srsran/{common => adt}/singleton.h (100%) diff --git a/lib/include/srsran/adt/detail/type_storage.h b/lib/include/srsran/adt/detail/type_storage.h index b97cc522e..ca9c6b8f0 100644 --- a/lib/include/srsran/adt/detail/type_storage.h +++ b/lib/include/srsran/adt/detail/type_storage.h @@ -13,6 +13,7 @@ #ifndef SRSRAN_TYPE_STORAGE_H #define SRSRAN_TYPE_STORAGE_H +#include #include #include @@ -20,6 +21,17 @@ namespace srsran { namespace detail { +// NOTE: gcc 4.8.5 is missing std::max_align_t. Need to create a struct +union max_alignment_t { + char c; + float f; + uint32_t i; + uint64_t i2; + double d; + long double d2; + uint32_t* ptr; +}; + template struct type_storage { using value_type = T; diff --git a/lib/include/srsran/adt/pool/fixed_size_pool.h b/lib/include/srsran/adt/pool/fixed_size_pool.h new file mode 100644 index 000000000..32a058878 --- /dev/null +++ b/lib/include/srsran/adt/pool/fixed_size_pool.h @@ -0,0 +1,113 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2021 Software Radio Systems Limited + * + * By using this file, you agree to the terms and conditions set + * forth in the LICENSE file which can be found at the top level of + * the distribution. + * + */ + +#ifndef SRSRAN_FIXED_SIZE_POOL_H +#define SRSRAN_FIXED_SIZE_POOL_H + +#include "memblock_cache.h" +#include "srsran/adt/circular_buffer.h" +#include "srsran/adt/singleton.h" +#include + +namespace srsran { + +/** + * Concurrent fixed size memory pool made of blocks of equal size + * Each worker keeps a separate thread-local memory block cache that uses for fast allocation/deallocation. + * When this cache gets depleted, the worker tries to obtain blocks from a shared memory block cache + * Note: This pool does not implement stealing of blocks between workers, so it is possible that a worker can't allocate + * while another worker still has blocks in its own cache. This situation is avoided by upper bounding the + * size of each worker cache + * Note2: Taking into account the usage of thread_local, this class is made a singleton + * @tparam NofObjects number of objects in the pool + * @tparam ObjSize object size + */ +template +class concurrent_fixed_memory_pool : public singleton_t > +{ + static_assert(NofObjects > 256, "This pool is particularly designed for a high number of objects"); + static_assert(ObjSize > 256, "This pool is particularly designed for large objects"); + + struct obj_storage_t { + typename std::aligned_storage::type buffer; + std::thread::id worker_id; + explicit obj_storage_t(std::thread::id id_) : worker_id(id_) {} + }; + + const static size_t batch_steal_size = 10; + +protected: + // ctor only accessible from singleton + concurrent_fixed_memory_pool() + { + allocated_blocks.resize(NofObjects); + for (std::unique_ptr& b : allocated_blocks) { + b.reset(new obj_storage_t(std::this_thread::get_id())); + srsran_assert(b.get() != nullptr, "Failed to instantiate fixed memory pool"); + shared_mem_cache.push(static_cast(b.get())); + } + } + +public: + static size_t size() { return NofObjects; } + + void* allocate_node(size_t sz) + { + srsran_assert(sz <= ObjSize, "Allocated node size=%zd exceeds max object size=%zd", sz, ObjSize); + memblock_cache* worker_cache = get_worker_cache(); + + void* node = worker_cache->try_pop(); + if (node == nullptr) { + // fill the thread local cache enough for this and next allocations + std::array popped_blocks; + size_t n = shared_mem_cache.try_pop(popped_blocks); + for (size_t i = 0; i < n; ++i) { + new (popped_blocks[i]) obj_storage_t(std::this_thread::get_id()); + worker_cache->push(static_cast(popped_blocks[i])); + } + node = worker_cache->try_pop(); + } + return node; + } + + void deallocate_node(void* p) + { + srsran_assert(p != nullptr, "Deallocated nodes must have valid address"); + memblock_cache* worker_cache = get_worker_cache(); + obj_storage_t* block_ptr = static_cast(p); + + if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= MaxWorkerCacheSize) { + // if block was allocated in a different thread or local cache reached max capacity, send block to shared + // container + shared_mem_cache.push(static_cast(block_ptr)); + return; + } + + // push to local memory block cache + worker_cache->push(static_cast(p)); + } + +private: + memblock_cache* get_worker_cache() + { + thread_local memblock_cache worker_cache; + return &worker_cache; + } + + mutexed_memblock_cache shared_mem_cache; + std::mutex mutex; + std::vector > allocated_blocks; +}; + +} // namespace srsran + +#endif // SRSRAN_FIXED_SIZE_POOL_H diff --git a/lib/include/srsran/adt/mem_pool.h b/lib/include/srsran/adt/pool/mem_pool.h similarity index 64% rename from lib/include/srsran/adt/mem_pool.h rename to lib/include/srsran/adt/pool/mem_pool.h index abe826481..028ea414c 100644 --- a/lib/include/srsran/adt/mem_pool.h +++ b/lib/include/srsran/adt/pool/mem_pool.h @@ -13,6 +13,7 @@ #ifndef SRSRAN_MEM_POOL_H #define SRSRAN_MEM_POOL_H +#include "memblock_cache.h" #include "srsran/common/thread_pool.h" #include #include @@ -21,121 +22,6 @@ namespace srsran { -/// Stores provided mem blocks in a stack in an non-owning manner. Not thread-safe -class memblock_stack -{ - struct node { - node* prev; - explicit node(node* prev_) : prev(prev_) {} - }; - -public: - constexpr static size_t min_memblock_size() { return sizeof(node); } - - memblock_stack() = default; - - memblock_stack(const memblock_stack&) = delete; - - memblock_stack(memblock_stack&& other) noexcept : head(other.head) { other.head = nullptr; } - - memblock_stack& operator=(const memblock_stack&) = delete; - - memblock_stack& operator=(memblock_stack&& other) noexcept - { - head = other.head; - other.head = nullptr; - return *this; - } - - void push(uint8_t* block) noexcept - { - // printf("head: %ld\n", (long)head); - node* next = ::new (block) node(head); - head = next; - count++; - } - - uint8_t* try_pop() noexcept - { - if (is_empty()) { - return nullptr; - } - node* last_head = head; - head = head->prev; - count--; - return (uint8_t*)last_head; - } - - bool is_empty() const { return head == nullptr; } - - size_t size() const { return count; } - - void clear() { head = nullptr; } - -private: - node* head = nullptr; - size_t count = 0; -}; - -/// memblock stack that mutexes pushing/popping -class mutexed_memblock_stack -{ -public: - mutexed_memblock_stack() = default; - - mutexed_memblock_stack(const mutexed_memblock_stack&) = delete; - - mutexed_memblock_stack(mutexed_memblock_stack&& other) noexcept - { - std::unique_lock lk1(other.mutex, std::defer_lock); - std::unique_lock lk2(mutex, std::defer_lock); - std::lock(lk1, lk2); - stack = std::move(other.stack); - } - - mutexed_memblock_stack& operator=(const mutexed_memblock_stack&) = delete; - - mutexed_memblock_stack& operator=(mutexed_memblock_stack&& other) noexcept - { - std::unique_lock lk1(other.mutex, std::defer_lock); - std::unique_lock lk2(mutex, std::defer_lock); - std::lock(lk1, lk2); - stack = std::move(other.stack); - return *this; - } - - void push(uint8_t* block) noexcept - { - std::lock_guard lock(mutex); - stack.push(block); - } - - uint8_t* try_pop() noexcept - { - std::lock_guard lock(mutex); - uint8_t* block = stack.try_pop(); - return block; - } - - bool is_empty() const noexcept { return stack.is_empty(); } - - size_t size() const noexcept - { - std::lock_guard lock(mutex); - return stack.size(); - } - - void clear() - { - std::lock_guard lock(mutex); - stack.clear(); - } - -private: - memblock_stack stack; - mutable std::mutex mutex; -}; - /** * Pool specialized for big objects. Created objects are not contiguous in memory. * Relevant methods: @@ -149,7 +35,7 @@ template class big_obj_pool { // memory stack type derivation (thread safe or not) - using stack_type = typename std::conditional::type; + using stack_type = typename std::conditional::type; // memory stack to cache allocate memory chunks stack_type stack; @@ -161,7 +47,7 @@ public: void* allocate_node(size_t sz) { assert(sz == sizeof(T)); - static const size_t blocksize = std::max(sizeof(T), memblock_stack::min_memblock_size()); + static const size_t blocksize = std::max(sizeof(T), memblock_cache::min_memblock_size()); uint8_t* block = stack.try_pop(); if (block == nullptr) { block = new uint8_t[blocksize]; @@ -179,7 +65,7 @@ public: /// Pre-reserve N memory chunks for future object allocations void reserve(size_t N) { - static const size_t blocksize = std::max(sizeof(T), memblock_stack::min_memblock_size()); + static const size_t blocksize = std::max(sizeof(T), memblock_cache::min_memblock_size()); for (size_t i = 0; i < N; ++i) { stack.push(new uint8_t[blocksize]); } @@ -284,7 +170,7 @@ private: // memory stack to cache allocate memory chunks std::mutex mutex; - memblock_stack obj_cache; + memblock_cache obj_cache; std::vector > batches; }; diff --git a/lib/include/srsran/adt/pool/memblock_cache.h b/lib/include/srsran/adt/pool/memblock_cache.h new file mode 100644 index 000000000..8bbe60a11 --- /dev/null +++ b/lib/include/srsran/adt/pool/memblock_cache.h @@ -0,0 +1,151 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2021 Software Radio Systems Limited + * + * By using this file, you agree to the terms and conditions set + * forth in the LICENSE file which can be found at the top level of + * the distribution. + * + */ + +#ifndef SRSRAN_MEMBLOCK_CACHE_H +#define SRSRAN_MEMBLOCK_CACHE_H + +#include + +namespace srsran { + +/// Stores provided mem blocks in a stack in an non-owning manner. Not thread-safe +class memblock_cache +{ + struct node { + node* prev; + explicit node(node* prev_) : prev(prev_) {} + }; + +public: + constexpr static size_t min_memblock_size() { return sizeof(node); } + + memblock_cache() = default; + + memblock_cache(const memblock_cache&) = delete; + + memblock_cache(memblock_cache&& other) noexcept : head(other.head) { other.head = nullptr; } + + memblock_cache& operator=(const memblock_cache&) = delete; + + memblock_cache& operator=(memblock_cache&& other) noexcept + { + head = other.head; + other.head = nullptr; + return *this; + } + + void push(void* block) noexcept + { + // printf("head: %ld\n", (long)head); + node* next = ::new (block) node(head); + head = next; + count++; + } + + uint8_t* try_pop() noexcept + { + if (is_empty()) { + return nullptr; + } + node* last_head = head; + head = head->prev; + count--; + return (uint8_t*)last_head; + } + + bool is_empty() const { return head == nullptr; } + + size_t size() const { return count; } + + void clear() { head = nullptr; } + +private: + node* head = nullptr; + size_t count = 0; +}; + +/// memblock stack that mutexes pushing/popping +class mutexed_memblock_cache +{ +public: + mutexed_memblock_cache() = default; + + mutexed_memblock_cache(const mutexed_memblock_cache&) = delete; + + mutexed_memblock_cache(mutexed_memblock_cache&& other) noexcept + { + std::unique_lock lk1(other.mutex, std::defer_lock); + std::unique_lock lk2(mutex, std::defer_lock); + std::lock(lk1, lk2); + stack = std::move(other.stack); + } + + mutexed_memblock_cache& operator=(const mutexed_memblock_cache&) = delete; + + mutexed_memblock_cache& operator=(mutexed_memblock_cache&& other) noexcept + { + std::unique_lock lk1(other.mutex, std::defer_lock); + std::unique_lock lk2(mutex, std::defer_lock); + std::lock(lk1, lk2); + stack = std::move(other.stack); + return *this; + } + + void push(void* block) noexcept + { + std::lock_guard lock(mutex); + stack.push(block); + } + + uint8_t* try_pop() noexcept + { + std::lock_guard lock(mutex); + uint8_t* block = stack.try_pop(); + return block; + } + + template + size_t try_pop(std::array& result) noexcept + { + std::lock_guard lock(mutex); + size_t i = 0; + for (; i < N; ++i) { + result[i] = stack.try_pop(); + if (result[i] == nullptr) { + break; + } + } + return i; + } + + bool is_empty() const noexcept { return stack.is_empty(); } + + size_t size() const noexcept + { + std::lock_guard lock(mutex); + return stack.size(); + } + + void clear() + { + std::lock_guard lock(mutex); + stack.clear(); + } + +private: + memblock_cache stack; + mutable std::mutex mutex; +}; + +} // namespace srsran + +#endif // SRSRAN_MEMBLOCK_CACHE_H diff --git a/lib/include/srsran/common/singleton.h b/lib/include/srsran/adt/singleton.h similarity index 100% rename from lib/include/srsran/common/singleton.h rename to lib/include/srsran/adt/singleton.h diff --git a/lib/test/adt/mem_pool_test.cc b/lib/test/adt/mem_pool_test.cc index f0c1f28b8..4e94df408 100644 --- a/lib/test/adt/mem_pool_test.cc +++ b/lib/test/adt/mem_pool_test.cc @@ -10,7 +10,8 @@ * */ -#include "srsran/adt/mem_pool.h" +#include "srsran/adt/pool/fixed_size_pool.h" +#include "srsran/adt/pool/mem_pool.h" #include "srsran/common/test_common.h" class C @@ -75,9 +76,68 @@ int test_nontrivial_obj_pool() return SRSRAN_SUCCESS; } +struct BigObj { + C c; + std::array space; + + using pool_t = srsran::concurrent_fixed_memory_pool<1024, 512>; + + void* operator new(size_t sz) + { + srsran_assert(sz == sizeof(BigObj), "Allocated node size and object size do not match"); + return pool_t::get_instance()->allocate_node(sizeof(BigObj)); + } + void* operator new(size_t sz, const std::nothrow_t& nothrow_value) noexcept + { + srsran_assert(sz == sizeof(BigObj), "Allocated node size and object size do not match"); + return pool_t::get_instance()->allocate_node(sizeof(BigObj)); + } + void operator delete(void* ptr) { pool_t::get_instance()->deallocate_node(ptr); } +}; + +void test_fixedsize_pool() +{ + { + std::vector > vec(BigObj::pool_t::size()); + for (size_t i = 0; i < BigObj::pool_t::size(); ++i) { + vec[i].reset(new BigObj()); + TESTASSERT(vec[i].get() != nullptr); + } + std::unique_ptr obj(new (std::nothrow) BigObj()); + TESTASSERT(obj == nullptr); + vec.clear(); + obj = std::unique_ptr(new (std::nothrow) BigObj()); + TESTASSERT(obj != nullptr); + obj.reset(); + } + + // TEST: one thread allocates, and the other deallocates + { + std::unique_ptr obj; + std::atomic stop(false); + srsran::dyn_blocking_queue > queue(BigObj::pool_t::size() / 2); + std::thread t([&queue, &stop]() { + while (not stop.load(std::memory_order_relaxed)) { + std::unique_ptr obj(new (std::nothrow) BigObj()); + TESTASSERT(obj != nullptr); + queue.push_blocking(std::move(obj)); + } + }); + + for (size_t i = 0; i < BigObj::pool_t::size() * 8; ++i) { + obj = queue.pop_blocking(); + TESTASSERT(obj != nullptr); + } + stop.store(true); + t.join(); + } +} + int main() { TESTASSERT(test_nontrivial_obj_pool() == SRSRAN_SUCCESS); + test_fixedsize_pool(); + srsran::console("Success\n"); return 0; } \ No newline at end of file diff --git a/srsenb/hdr/stack/rrc/rrc_ue.h b/srsenb/hdr/stack/rrc/rrc_ue.h index 821a20969..691ab2709 100644 --- a/srsenb/hdr/stack/rrc/rrc_ue.h +++ b/srsenb/hdr/stack/rrc/rrc_ue.h @@ -15,7 +15,7 @@ #include "mac_controller.h" #include "rrc.h" -#include "srsran/adt/mem_pool.h" +#include "srsran/adt/pool/mem_pool.h" #include "srsran/interfaces/enb_phy_interfaces.h" #include "srsran/interfaces/pdcp_interface_types.h" diff --git a/srsenb/src/stack/rrc/rrc_ue.cc b/srsenb/src/stack/rrc/rrc_ue.cc index 20fd22a55..9328765ce 100644 --- a/srsenb/src/stack/rrc/rrc_ue.cc +++ b/srsenb/src/stack/rrc/rrc_ue.cc @@ -14,7 +14,7 @@ #include "srsenb/hdr/stack/rrc/mac_controller.h" #include "srsenb/hdr/stack/rrc/rrc_mobility.h" #include "srsenb/hdr/stack/rrc/ue_rr_cfg.h" -#include "srsran/adt/mem_pool.h" +#include "srsran/adt/pool/mem_pool.h" #include "srsran/asn1/rrc_utils.h" #include "srsran/common/enb_events.h" #include "srsran/common/int_helpers.h"