implementation of concurrent fixed size pool that leverages thread local caches to avoid mutexing

master
Francisco 4 years ago committed by Francisco Paisana
parent e200a3359e
commit e1523692c2

@ -13,6 +13,7 @@
#ifndef SRSRAN_TYPE_STORAGE_H #ifndef SRSRAN_TYPE_STORAGE_H
#define SRSRAN_TYPE_STORAGE_H #define SRSRAN_TYPE_STORAGE_H
#include <cstdint>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
@ -20,6 +21,17 @@ namespace srsran {
namespace detail { namespace detail {
// NOTE: gcc 4.8.5 is missing std::max_align_t. Need to create a struct
union max_alignment_t {
char c;
float f;
uint32_t i;
uint64_t i2;
double d;
long double d2;
uint32_t* ptr;
};
template <typename T> template <typename T>
struct type_storage { struct type_storage {
using value_type = T; using value_type = T;

@ -0,0 +1,113 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_FIXED_SIZE_POOL_H
#define SRSRAN_FIXED_SIZE_POOL_H
#include "memblock_cache.h"
#include "srsran/adt/circular_buffer.h"
#include "srsran/adt/singleton.h"
#include <thread>
namespace srsran {
/**
* Concurrent fixed size memory pool made of blocks of equal size
* Each worker keeps a separate thread-local memory block cache that uses for fast allocation/deallocation.
* When this cache gets depleted, the worker tries to obtain blocks from a shared memory block cache
* Note: This pool does not implement stealing of blocks between workers, so it is possible that a worker can't allocate
* while another worker still has blocks in its own cache. This situation is avoided by upper bounding the
* size of each worker cache
* Note2: Taking into account the usage of thread_local, this class is made a singleton
* @tparam NofObjects number of objects in the pool
* @tparam ObjSize object size
*/
template <size_t NofObjects, size_t ObjSize, size_t MaxWorkerCacheSize = NofObjects / 16>
class concurrent_fixed_memory_pool : public singleton_t<concurrent_fixed_memory_pool<NofObjects, ObjSize> >
{
static_assert(NofObjects > 256, "This pool is particularly designed for a high number of objects");
static_assert(ObjSize > 256, "This pool is particularly designed for large objects");
struct obj_storage_t {
typename std::aligned_storage<ObjSize, alignof(detail::max_alignment_t)>::type buffer;
std::thread::id worker_id;
explicit obj_storage_t(std::thread::id id_) : worker_id(id_) {}
};
const static size_t batch_steal_size = 10;
protected:
// ctor only accessible from singleton
concurrent_fixed_memory_pool()
{
allocated_blocks.resize(NofObjects);
for (std::unique_ptr<obj_storage_t>& b : allocated_blocks) {
b.reset(new obj_storage_t(std::this_thread::get_id()));
srsran_assert(b.get() != nullptr, "Failed to instantiate fixed memory pool");
shared_mem_cache.push(static_cast<void*>(b.get()));
}
}
public:
static size_t size() { return NofObjects; }
void* allocate_node(size_t sz)
{
srsran_assert(sz <= ObjSize, "Allocated node size=%zd exceeds max object size=%zd", sz, ObjSize);
memblock_cache* worker_cache = get_worker_cache();
void* node = worker_cache->try_pop();
if (node == nullptr) {
// fill the thread local cache enough for this and next allocations
std::array<void*, batch_steal_size> popped_blocks;
size_t n = shared_mem_cache.try_pop(popped_blocks);
for (size_t i = 0; i < n; ++i) {
new (popped_blocks[i]) obj_storage_t(std::this_thread::get_id());
worker_cache->push(static_cast<uint8_t*>(popped_blocks[i]));
}
node = worker_cache->try_pop();
}
return node;
}
void deallocate_node(void* p)
{
srsran_assert(p != nullptr, "Deallocated nodes must have valid address");
memblock_cache* worker_cache = get_worker_cache();
obj_storage_t* block_ptr = static_cast<obj_storage_t*>(p);
if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= MaxWorkerCacheSize) {
// if block was allocated in a different thread or local cache reached max capacity, send block to shared
// container
shared_mem_cache.push(static_cast<void*>(block_ptr));
return;
}
// push to local memory block cache
worker_cache->push(static_cast<uint8_t*>(p));
}
private:
memblock_cache* get_worker_cache()
{
thread_local memblock_cache worker_cache;
return &worker_cache;
}
mutexed_memblock_cache shared_mem_cache;
std::mutex mutex;
std::vector<std::unique_ptr<obj_storage_t> > allocated_blocks;
};
} // namespace srsran
#endif // SRSRAN_FIXED_SIZE_POOL_H

@ -13,6 +13,7 @@
#ifndef SRSRAN_MEM_POOL_H #ifndef SRSRAN_MEM_POOL_H
#define SRSRAN_MEM_POOL_H #define SRSRAN_MEM_POOL_H
#include "memblock_cache.h"
#include "srsran/common/thread_pool.h" #include "srsran/common/thread_pool.h"
#include <cassert> #include <cassert>
#include <cstdint> #include <cstdint>
@ -21,121 +22,6 @@
namespace srsran { namespace srsran {
/// Stores provided mem blocks in a stack in an non-owning manner. Not thread-safe
class memblock_stack
{
struct node {
node* prev;
explicit node(node* prev_) : prev(prev_) {}
};
public:
constexpr static size_t min_memblock_size() { return sizeof(node); }
memblock_stack() = default;
memblock_stack(const memblock_stack&) = delete;
memblock_stack(memblock_stack&& other) noexcept : head(other.head) { other.head = nullptr; }
memblock_stack& operator=(const memblock_stack&) = delete;
memblock_stack& operator=(memblock_stack&& other) noexcept
{
head = other.head;
other.head = nullptr;
return *this;
}
void push(uint8_t* block) noexcept
{
// printf("head: %ld\n", (long)head);
node* next = ::new (block) node(head);
head = next;
count++;
}
uint8_t* try_pop() noexcept
{
if (is_empty()) {
return nullptr;
}
node* last_head = head;
head = head->prev;
count--;
return (uint8_t*)last_head;
}
bool is_empty() const { return head == nullptr; }
size_t size() const { return count; }
void clear() { head = nullptr; }
private:
node* head = nullptr;
size_t count = 0;
};
/// memblock stack that mutexes pushing/popping
class mutexed_memblock_stack
{
public:
mutexed_memblock_stack() = default;
mutexed_memblock_stack(const mutexed_memblock_stack&) = delete;
mutexed_memblock_stack(mutexed_memblock_stack&& other) noexcept
{
std::unique_lock<std::mutex> lk1(other.mutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(mutex, std::defer_lock);
std::lock(lk1, lk2);
stack = std::move(other.stack);
}
mutexed_memblock_stack& operator=(const mutexed_memblock_stack&) = delete;
mutexed_memblock_stack& operator=(mutexed_memblock_stack&& other) noexcept
{
std::unique_lock<std::mutex> lk1(other.mutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(mutex, std::defer_lock);
std::lock(lk1, lk2);
stack = std::move(other.stack);
return *this;
}
void push(uint8_t* block) noexcept
{
std::lock_guard<std::mutex> lock(mutex);
stack.push(block);
}
uint8_t* try_pop() noexcept
{
std::lock_guard<std::mutex> lock(mutex);
uint8_t* block = stack.try_pop();
return block;
}
bool is_empty() const noexcept { return stack.is_empty(); }
size_t size() const noexcept
{
std::lock_guard<std::mutex> lock(mutex);
return stack.size();
}
void clear()
{
std::lock_guard<std::mutex> lock(mutex);
stack.clear();
}
private:
memblock_stack stack;
mutable std::mutex mutex;
};
/** /**
* Pool specialized for big objects. Created objects are not contiguous in memory. * Pool specialized for big objects. Created objects are not contiguous in memory.
* Relevant methods: * Relevant methods:
@ -149,7 +35,7 @@ template <typename T, bool ThreadSafe = false>
class big_obj_pool class big_obj_pool
{ {
// memory stack type derivation (thread safe or not) // memory stack type derivation (thread safe or not)
using stack_type = typename std::conditional<ThreadSafe, mutexed_memblock_stack, memblock_stack>::type; using stack_type = typename std::conditional<ThreadSafe, mutexed_memblock_cache, memblock_cache>::type;
// memory stack to cache allocate memory chunks // memory stack to cache allocate memory chunks
stack_type stack; stack_type stack;
@ -161,7 +47,7 @@ public:
void* allocate_node(size_t sz) void* allocate_node(size_t sz)
{ {
assert(sz == sizeof(T)); assert(sz == sizeof(T));
static const size_t blocksize = std::max(sizeof(T), memblock_stack::min_memblock_size()); static const size_t blocksize = std::max(sizeof(T), memblock_cache::min_memblock_size());
uint8_t* block = stack.try_pop(); uint8_t* block = stack.try_pop();
if (block == nullptr) { if (block == nullptr) {
block = new uint8_t[blocksize]; block = new uint8_t[blocksize];
@ -179,7 +65,7 @@ public:
/// Pre-reserve N memory chunks for future object allocations /// Pre-reserve N memory chunks for future object allocations
void reserve(size_t N) void reserve(size_t N)
{ {
static const size_t blocksize = std::max(sizeof(T), memblock_stack::min_memblock_size()); static const size_t blocksize = std::max(sizeof(T), memblock_cache::min_memblock_size());
for (size_t i = 0; i < N; ++i) { for (size_t i = 0; i < N; ++i) {
stack.push(new uint8_t[blocksize]); stack.push(new uint8_t[blocksize]);
} }
@ -284,7 +170,7 @@ private:
// memory stack to cache allocate memory chunks // memory stack to cache allocate memory chunks
std::mutex mutex; std::mutex mutex;
memblock_stack obj_cache; memblock_cache obj_cache;
std::vector<std::unique_ptr<batch_obj_t> > batches; std::vector<std::unique_ptr<batch_obj_t> > batches;
}; };

@ -0,0 +1,151 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_MEMBLOCK_CACHE_H
#define SRSRAN_MEMBLOCK_CACHE_H
#include <mutex>
namespace srsran {
/// Stores provided mem blocks in a stack in an non-owning manner. Not thread-safe
class memblock_cache
{
struct node {
node* prev;
explicit node(node* prev_) : prev(prev_) {}
};
public:
constexpr static size_t min_memblock_size() { return sizeof(node); }
memblock_cache() = default;
memblock_cache(const memblock_cache&) = delete;
memblock_cache(memblock_cache&& other) noexcept : head(other.head) { other.head = nullptr; }
memblock_cache& operator=(const memblock_cache&) = delete;
memblock_cache& operator=(memblock_cache&& other) noexcept
{
head = other.head;
other.head = nullptr;
return *this;
}
void push(void* block) noexcept
{
// printf("head: %ld\n", (long)head);
node* next = ::new (block) node(head);
head = next;
count++;
}
uint8_t* try_pop() noexcept
{
if (is_empty()) {
return nullptr;
}
node* last_head = head;
head = head->prev;
count--;
return (uint8_t*)last_head;
}
bool is_empty() const { return head == nullptr; }
size_t size() const { return count; }
void clear() { head = nullptr; }
private:
node* head = nullptr;
size_t count = 0;
};
/// memblock stack that mutexes pushing/popping
class mutexed_memblock_cache
{
public:
mutexed_memblock_cache() = default;
mutexed_memblock_cache(const mutexed_memblock_cache&) = delete;
mutexed_memblock_cache(mutexed_memblock_cache&& other) noexcept
{
std::unique_lock<std::mutex> lk1(other.mutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(mutex, std::defer_lock);
std::lock(lk1, lk2);
stack = std::move(other.stack);
}
mutexed_memblock_cache& operator=(const mutexed_memblock_cache&) = delete;
mutexed_memblock_cache& operator=(mutexed_memblock_cache&& other) noexcept
{
std::unique_lock<std::mutex> lk1(other.mutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(mutex, std::defer_lock);
std::lock(lk1, lk2);
stack = std::move(other.stack);
return *this;
}
void push(void* block) noexcept
{
std::lock_guard<std::mutex> lock(mutex);
stack.push(block);
}
uint8_t* try_pop() noexcept
{
std::lock_guard<std::mutex> lock(mutex);
uint8_t* block = stack.try_pop();
return block;
}
template <size_t N>
size_t try_pop(std::array<void*, N>& result) noexcept
{
std::lock_guard<std::mutex> lock(mutex);
size_t i = 0;
for (; i < N; ++i) {
result[i] = stack.try_pop();
if (result[i] == nullptr) {
break;
}
}
return i;
}
bool is_empty() const noexcept { return stack.is_empty(); }
size_t size() const noexcept
{
std::lock_guard<std::mutex> lock(mutex);
return stack.size();
}
void clear()
{
std::lock_guard<std::mutex> lock(mutex);
stack.clear();
}
private:
memblock_cache stack;
mutable std::mutex mutex;
};
} // namespace srsran
#endif // SRSRAN_MEMBLOCK_CACHE_H

@ -10,7 +10,8 @@
* *
*/ */
#include "srsran/adt/mem_pool.h" #include "srsran/adt/pool/fixed_size_pool.h"
#include "srsran/adt/pool/mem_pool.h"
#include "srsran/common/test_common.h" #include "srsran/common/test_common.h"
class C class C
@ -75,9 +76,68 @@ int test_nontrivial_obj_pool()
return SRSRAN_SUCCESS; return SRSRAN_SUCCESS;
} }
struct BigObj {
C c;
std::array<uint8_t, 500> space;
using pool_t = srsran::concurrent_fixed_memory_pool<1024, 512>;
void* operator new(size_t sz)
{
srsran_assert(sz == sizeof(BigObj), "Allocated node size and object size do not match");
return pool_t::get_instance()->allocate_node(sizeof(BigObj));
}
void* operator new(size_t sz, const std::nothrow_t& nothrow_value) noexcept
{
srsran_assert(sz == sizeof(BigObj), "Allocated node size and object size do not match");
return pool_t::get_instance()->allocate_node(sizeof(BigObj));
}
void operator delete(void* ptr) { pool_t::get_instance()->deallocate_node(ptr); }
};
void test_fixedsize_pool()
{
{
std::vector<std::unique_ptr<BigObj> > vec(BigObj::pool_t::size());
for (size_t i = 0; i < BigObj::pool_t::size(); ++i) {
vec[i].reset(new BigObj());
TESTASSERT(vec[i].get() != nullptr);
}
std::unique_ptr<BigObj> obj(new (std::nothrow) BigObj());
TESTASSERT(obj == nullptr);
vec.clear();
obj = std::unique_ptr<BigObj>(new (std::nothrow) BigObj());
TESTASSERT(obj != nullptr);
obj.reset();
}
// TEST: one thread allocates, and the other deallocates
{
std::unique_ptr<BigObj> obj;
std::atomic<bool> stop(false);
srsran::dyn_blocking_queue<std::unique_ptr<BigObj> > queue(BigObj::pool_t::size() / 2);
std::thread t([&queue, &stop]() {
while (not stop.load(std::memory_order_relaxed)) {
std::unique_ptr<BigObj> obj(new (std::nothrow) BigObj());
TESTASSERT(obj != nullptr);
queue.push_blocking(std::move(obj));
}
});
for (size_t i = 0; i < BigObj::pool_t::size() * 8; ++i) {
obj = queue.pop_blocking();
TESTASSERT(obj != nullptr);
}
stop.store(true);
t.join();
}
}
int main() int main()
{ {
TESTASSERT(test_nontrivial_obj_pool() == SRSRAN_SUCCESS); TESTASSERT(test_nontrivial_obj_pool() == SRSRAN_SUCCESS);
test_fixedsize_pool();
srsran::console("Success\n"); srsran::console("Success\n");
return 0; return 0;
} }

@ -15,7 +15,7 @@
#include "mac_controller.h" #include "mac_controller.h"
#include "rrc.h" #include "rrc.h"
#include "srsran/adt/mem_pool.h" #include "srsran/adt/pool/mem_pool.h"
#include "srsran/interfaces/enb_phy_interfaces.h" #include "srsran/interfaces/enb_phy_interfaces.h"
#include "srsran/interfaces/pdcp_interface_types.h" #include "srsran/interfaces/pdcp_interface_types.h"

@ -14,7 +14,7 @@
#include "srsenb/hdr/stack/rrc/mac_controller.h" #include "srsenb/hdr/stack/rrc/mac_controller.h"
#include "srsenb/hdr/stack/rrc/rrc_mobility.h" #include "srsenb/hdr/stack/rrc/rrc_mobility.h"
#include "srsenb/hdr/stack/rrc/ue_rr_cfg.h" #include "srsenb/hdr/stack/rrc/ue_rr_cfg.h"
#include "srsran/adt/mem_pool.h" #include "srsran/adt/pool/mem_pool.h"
#include "srsran/asn1/rrc_utils.h" #include "srsran/asn1/rrc_utils.h"
#include "srsran/common/enb_events.h" #include "srsran/common/enb_events.h"
#include "srsran/common/int_helpers.h" #include "srsran/common/int_helpers.h"

Loading…
Cancel
Save