Reimplement batch-based background object/memory pool

Main changes:
- addition of pool utilities
- The node size/alignment and batch allocation threshold are now runtime arguments
- object pool and memory pool are not anymore based on the same class.
  The object pool cannot use intrusive free list because it would overwrite the object
  memory
master
Francisco 4 years ago committed by Francisco Paisana
parent fbeb87c53e
commit cdf72248f3

@ -32,6 +32,7 @@ union max_alignment_t {
long double d2;
uint32_t* ptr;
};
const static size_t max_alignment = alignof(max_alignment_t);
template <typename T, size_t MinSize = 0, size_t AlignSize = 0>
struct type_storage {

@ -1,192 +0,0 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_BACKGROUND_MEM_POOL_H
#define SRSRAN_BACKGROUND_MEM_POOL_H
#include "common_pool.h"
#include "memblock_cache.h"
#include "pool_utils.h"
#include "srsran/common/srsran_assert.h"
#include "srsran/common/thread_pool.h"
#include <memory>
#include <mutex>
#include <vector>
namespace srsran {
namespace detail {
/**
* Pool specialized for in allocating batches of objects in a preemptive way in a background thread to minimize latency.
* Note: Current implementation assumes that the pool object will outlive the background callbacks to allocate new
* batches
* @tparam T individual object type that is being allocated
* @tparam BatchSize number of T objects in a batch
* @tparam ThresholdSize number of T objects below which a new batch needs to be allocated
*/
template <typename T, size_t BatchSize, size_t ThresholdSize, typename CtorFunc, typename RecycleFunc>
class base_background_pool
{
static_assert(ThresholdSize > 0, "ThresholdSize needs to be positive");
static_assert(BatchSize > 1, "BatchSize needs to be higher than 1");
using pool_type = base_background_pool<T, BatchSize, ThresholdSize, CtorFunc, RecycleFunc>;
public:
explicit base_background_pool(size_t initial_size = BatchSize,
CtorFunc&& ctor_func_ = {},
RecycleFunc&& recycle_func_ = {}) :
ctor_func(std::forward<CtorFunc>(ctor_func_)),
recycle_func(std::forward<RecycleFunc>(recycle_func_)),
state(std::make_shared<detached_pool_state>(this))
{
int nof_batches = ceilf(initial_size / (float)BatchSize);
while (nof_batches-- > 0) {
allocate_batch_();
}
}
base_background_pool(base_background_pool&&) = delete;
base_background_pool(const base_background_pool&) = delete;
base_background_pool& operator=(base_background_pool&&) = delete;
base_background_pool& operator=(const base_background_pool&) = delete;
~base_background_pool()
{
std::lock_guard<std::mutex> lock(state->mutex);
state->pool = nullptr;
for (std::unique_ptr<batch_obj_t>& batch : batches) {
for (obj_storage_t& obj_store : *batch) {
obj_store.destroy();
}
}
batches.clear();
}
/// alloc new object space. If no memory is pre-reserved in the pool, malloc is called to allocate new batch.
void* allocate_node(size_t sz)
{
srsran_assert(sz == sizeof(T), "Mismatch of allocated node size=%zd and object size=%zd", sz, sizeof(T));
std::lock_guard<std::mutex> lock(state->mutex);
void* block = obj_cache.try_pop();
if (block != nullptr) {
// allocation successful
if (obj_cache.size() < ThresholdSize) {
allocate_batch_in_background();
}
return block;
}
// try allocation of new batch in same thread as caller.
allocate_batch_();
return obj_cache.try_pop();
}
void deallocate_node(void* p)
{
std::lock_guard<std::mutex> lock(state->mutex);
recycle_func(*static_cast<T*>(p));
obj_cache.push(static_cast<void*>(p));
}
void allocate_batch_in_background()
{
std::shared_ptr<detached_pool_state> state_copy = state;
get_background_workers().push_task([state_copy]() {
std::lock_guard<std::mutex> lock(state_copy->mutex);
if (state_copy->pool != nullptr) {
state_copy->pool->allocate_batch_();
}
});
}
private:
using obj_storage_t = type_storage<T, memblock_cache::min_memblock_size(), memblock_cache::min_memblock_align()>;
using batch_obj_t = std::array<obj_storage_t, BatchSize>;
/// Unprotected allocation of new Batch of Objects
void allocate_batch_()
{
std::unique_ptr<batch_obj_t> batch(new batch_obj_t());
if (batch == nullptr) {
srslog::fetch_basic_logger("POOL").warning("Failed to allocate new batch in background thread");
return;
}
for (obj_storage_t& obj_store : *batch) {
ctor_func(obj_store.addr());
obj_cache.push(&obj_store.buffer);
}
batches.emplace_back(std::move(batch));
}
CtorFunc ctor_func;
RecycleFunc recycle_func;
struct detached_pool_state {
std::mutex mutex;
pool_type* pool;
explicit detached_pool_state(pool_type* pool_) : pool(pool_) {}
};
std::shared_ptr<detached_pool_state> state;
// memory stack to cache allocate memory chunks
memblock_cache obj_cache;
std::vector<std::unique_ptr<batch_obj_t> > batches;
};
} // namespace detail
template <typename T, size_t BatchSize, size_t ThresholdSize>
using background_mem_pool = detail::base_background_pool<detail::type_storage<T>,
BatchSize,
ThresholdSize,
detail::noop_operator,
detail::noop_operator>;
template <typename T,
size_t BatchSize,
size_t ThresholdSize,
typename CtorFunc = detail::inplace_default_ctor_operator<T>,
typename RecycleFunc = detail::noop_operator>
class background_obj_pool : public obj_pool_itf<T>
{
using pool_type = background_obj_pool<T, BatchSize, ThresholdSize, CtorFunc, RecycleFunc>;
using mem_pool_type = detail::base_background_pool<T, BatchSize, ThresholdSize, CtorFunc, RecycleFunc>;
struct pool_deleter {
mem_pool_type* pool;
explicit pool_deleter(mem_pool_type* pool_) : pool(pool_) {}
void operator()(void* ptr)
{
if (ptr != nullptr) {
pool->deallocate_node(ptr);
}
}
};
public:
explicit background_obj_pool(size_t initial_size, CtorFunc&& ctor_func = {}, RecycleFunc&& recycle_func = {}) :
pool(initial_size, std::forward<CtorFunc>(ctor_func), std::forward<RecycleFunc>(recycle_func))
{}
unique_pool_ptr<T> allocate_object() final
{
void* ptr = pool.allocate_node(sizeof(T));
return std::unique_ptr<T, pool_deleter>(static_cast<T*>(ptr), pool_deleter(&pool));
}
private:
mem_pool_type pool;
};
} // namespace srsran
#endif // SRSRAN_BACKGROUND_MEM_POOL_H

@ -0,0 +1,161 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_BATCH_MEM_POOL_H
#define SRSRAN_BATCH_MEM_POOL_H
#include "memblock_cache.h"
#include "pool_utils.h"
#include "srsran/common/srsran_assert.h"
#include "srsran/common/thread_pool.h"
#include <memory>
#include <mutex>
namespace srsran {
/**
* Non-thread-safe, node-based memory pool that allocates nodes in batches of "objs_per_batch" > 1, and caches
* allocated blocks on deallocation
*/
class growing_batch_mem_pool
{
public:
explicit growing_batch_mem_pool(size_t objs_per_batch_,
size_t node_size_,
size_t node_alignment_,
int init_size = -1) :
objs_per_batch(objs_per_batch_),
memblock_size(std::max(node_size_, free_memblock_list::min_memblock_size())),
allocated(objs_per_batch * memblock_size, std::max(node_alignment_, free_memblock_list::min_memblock_align()))
{
size_t N = init_size < 0 ? objs_per_batch_ : init_size;
while (N > cache_size()) {
allocate_batch();
}
}
~growing_batch_mem_pool()
{
srsran_assert(cache_size() == size(), "Not all nodes have been deallocated yet (%zd < %zd)", cache_size(), size());
}
size_t get_node_max_size() const { return allocated.get_node_max_size(); }
void clear()
{
free_list.clear();
allocated.clear();
}
size_t cache_size() const { return free_list.size(); }
size_t size() const { return allocated.size() * objs_per_batch; }
void allocate_batch()
{
uint8_t* batch_payload = static_cast<uint8_t*>(allocated.allocate_block());
for (size_t i = 0; i < objs_per_batch; ++i) {
void* cache_node = batch_payload + i * memblock_size;
free_list.push(cache_node);
}
}
void* allocate_node()
{
if (free_list.empty()) {
allocate_batch();
}
return free_list.pop();
}
void deallocate_node(void* ptr) { free_list.push(ptr); }
private:
const size_t objs_per_batch;
const size_t memblock_size;
memblock_stack allocated;
free_memblock_list free_list;
};
/**
* Thread-safe object pool specialized in allocating batches of objects in a preemptive way in a background thread
* to minimize latency.
* Note: The dispatched allocation jobs may outlive the pool. To handle this, the pool state is passed to jobs via a
* shared ptr.
*/
class background_mem_pool
{
public:
const size_t batch_threshold;
explicit background_mem_pool(size_t nodes_per_batch_, size_t node_size_, size_t thres_, int initial_size = -1) :
batch_threshold(thres_),
state(std::make_shared<detached_pool_state>(this)),
grow_pool(nodes_per_batch_, node_size_, detail::max_alignment, initial_size)
{
srsran_assert(batch_threshold > 1, "Invalid arguments for background memory pool");
}
~background_mem_pool()
{
std::lock_guard<std::mutex> lock(state->mutex);
state->pool = nullptr;
grow_pool.clear();
}
/// alloc new object space. If no memory is pre-reserved in the pool, malloc is called to allocate new batch.
void* allocate_node(size_t sz)
{
srsran_assert(sz <= grow_pool.get_node_max_size(),
"Mismatch of allocated node size=%zd and object size=%zd",
sz,
grow_pool.get_node_max_size());
std::lock_guard<std::mutex> lock(state->mutex);
void* node = grow_pool.allocate_node();
if (grow_pool.size() < batch_threshold) {
allocate_batch_in_background();
}
return node;
}
void deallocate_node(void* p)
{
std::lock_guard<std::mutex> lock(state->mutex);
grow_pool.deallocate_node(p);
}
size_t get_node_max_size() const { return grow_pool.get_node_max_size(); }
private:
void allocate_batch_in_background()
{
std::shared_ptr<detached_pool_state> state_copy = state;
get_background_workers().push_task([state_copy]() {
std::lock_guard<std::mutex> lock(state_copy->mutex);
if (state_copy->pool != nullptr) {
state_copy->pool->grow_pool.allocate_batch();
}
});
}
struct detached_pool_state {
std::mutex mutex;
background_mem_pool* pool;
explicit detached_pool_state(background_mem_pool* pool_) : pool(pool_) {}
};
std::shared_ptr<detached_pool_state> state;
growing_batch_mem_pool grow_pool;
};
} // namespace srsran
#endif // SRSRAN_BATCH_MEM_POOL_H

@ -1,43 +0,0 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_COMMON_POOL_H
#define SRSRAN_COMMON_POOL_H
#include "srsran/adt/move_callback.h"
namespace srsran {
/// unique ptr with type-erased dtor, so that it can be used by any pool
template <typename T>
using unique_pool_ptr = std::unique_ptr<T, srsran::move_callback<void(void*)> >;
/// Common object pool interface
template <typename T>
class obj_pool_itf
{
public:
using object_type = T;
obj_pool_itf() = default;
obj_pool_itf(const obj_pool_itf&) = delete;
obj_pool_itf(obj_pool_itf&&) = delete;
obj_pool_itf& operator=(const obj_pool_itf&) = delete;
obj_pool_itf& operator=(obj_pool_itf&&) = delete;
virtual ~obj_pool_itf() = default;
virtual unique_pool_ptr<T> allocate_object() = 0;
};
} // namespace srsran
#endif // SRSRAN_COMMON_POOL_H

@ -159,12 +159,12 @@ public:
private:
struct worker_ctxt {
std::thread::id id;
memblock_cache cache;
free_memblock_list cache;
worker_ctxt() : id(std::this_thread::get_id()) {}
~worker_ctxt()
{
mutexed_memblock_cache& central_cache = pool_type::get_instance()->central_mem_cache;
concurrent_free_memblock_list& central_cache = pool_type::get_instance()->central_mem_cache;
central_cache.steal_blocks(cache, cache.size());
}
};
@ -189,7 +189,7 @@ private:
size_t local_growth_thres = 0;
srslog::basic_logger* logger = nullptr;
mutexed_memblock_cache central_mem_cache;
concurrent_free_memblock_list central_mem_cache;
std::mutex mutex;
std::vector<std::unique_ptr<obj_storage_t> > allocated_blocks;
};

@ -35,7 +35,7 @@ template <typename T, bool ThreadSafe = false>
class big_obj_pool
{
// memory stack type derivation (thread safe or not)
using stack_type = typename std::conditional<ThreadSafe, mutexed_memblock_cache, memblock_cache>::type;
using stack_type = typename std::conditional<ThreadSafe, concurrent_free_memblock_list, free_memblock_list>::type;
// memory stack to cache allocate memory chunks
stack_type stack;
@ -47,7 +47,7 @@ public:
void* allocate_node(size_t sz)
{
assert(sz == sizeof(T));
static const size_t blocksize = std::max(sizeof(T), memblock_cache::min_memblock_size());
static const size_t blocksize = std::max(sizeof(T), free_memblock_list::min_memblock_size());
void* block = stack.try_pop();
if (block == nullptr) {
block = new uint8_t[blocksize];
@ -65,7 +65,7 @@ public:
/// Pre-reserve N memory chunks for future object allocations
void reserve(size_t N)
{
static const size_t blocksize = std::max(sizeof(T), memblock_cache::min_memblock_size());
static const size_t blocksize = std::max(sizeof(T), free_memblock_list::min_memblock_size());
for (size_t i = 0; i < N; ++i) {
stack.push(static_cast<void*>(new uint8_t[blocksize]));
}
@ -170,7 +170,7 @@ private:
// memory stack to cache allocate memory chunks
std::mutex mutex;
memblock_cache obj_cache;
free_memblock_list obj_cache;
std::vector<std::unique_ptr<batch_obj_t> > batches;
};

@ -13,112 +13,209 @@
#ifndef SRSRAN_MEMBLOCK_CACHE_H
#define SRSRAN_MEMBLOCK_CACHE_H
#include "pool_utils.h"
#include <mutex>
namespace srsran {
/// Stores provided mem blocks in a stack in an non-owning manner. Not thread-safe
class memblock_cache
namespace detail {
class intrusive_memblock_list
{
public:
struct node {
node* prev;
explicit node(node* prev_) : prev(prev_) {}
node* next;
explicit node(node* prev_) : next(prev_) {}
};
node* head = nullptr;
size_t count = 0;
public:
constexpr static size_t min_memblock_size() { return sizeof(node); }
constexpr static size_t min_memblock_align() { return alignof(node); }
memblock_cache() = default;
void push(void* block) noexcept
{
srsran_assert(is_aligned(block, min_memblock_align()), "The provided memory block is not aligned");
node* ptr = ::new (block) node(head);
head = ptr;
count++;
}
void* pop() noexcept
{
srsran_assert(not empty(), "pop() called on empty list");
node* last_head = head;
head = head->next;
last_head->~node();
count--;
return static_cast<void*>(last_head);
}
memblock_cache(const memblock_cache&) = delete;
void* try_pop() noexcept { return empty() ? nullptr : pop(); }
memblock_cache(memblock_cache&& other) noexcept : head(other.head) { other.head = nullptr; }
bool empty() const noexcept { return head == nullptr; }
memblock_cache& operator=(const memblock_cache&) = delete;
size_t size() const { return count; }
memblock_cache& operator=(memblock_cache&& other) noexcept
void clear() noexcept
{
head = other.head;
other.head = nullptr;
return *this;
head = nullptr;
count = 0;
}
};
} // namespace detail
template <typename T>
void push(T* block) noexcept
/**
* List of memory blocks. It overwrites bytes of blocks passed via push(void*). Thus, it is not safe to use in any
* pool of initialized objects
*/
class free_memblock_list : public detail::intrusive_memblock_list
{
private:
using base_t = detail::intrusive_memblock_list;
using base_t::count;
using base_t::head;
};
/**
* List of memory blocks, each memory block containing a node. Memory Structure:
* memory block 1 memory block
* [ next | node ] [ next | node ]
* '--------------^ '-----------> nullptr
*/
class memblock_node_list : public detail::intrusive_memblock_list
{
using base_t = detail::intrusive_memblock_list;
using base_t::count;
using base_t::head;
using base_t::try_pop;
public:
const size_t memblock_alignment;
const size_t header_size;
const size_t payload_size;
const size_t memblock_size;
explicit memblock_node_list(size_t node_size_, size_t node_alignment_ = detail::max_alignment) :
memblock_alignment(std::max(free_memblock_list::min_memblock_align(), node_alignment_)),
header_size(align_next(base_t::min_memblock_size(), memblock_alignment)),
payload_size(align_next(node_size_, memblock_alignment)),
memblock_size(header_size + payload_size)
{
static_assert(sizeof(T) >= sizeof(node) and alignof(T) >= alignof(node), "Provided memory block is too small");
push(static_cast<void*>(block));
srsran_assert(node_size_ > 0 and is_valid_alignment(node_alignment_),
"Invalid arguments node size=%zd,alignment=%zd",
node_size_,
node_alignment_);
}
void push(void* block) noexcept
void* get_node_header(void* payload_addr)
{
node* next = ::new (block) node(head);
head = next;
srsran_assert(is_aligned(payload_addr, memblock_alignment), "Provided address is not valid");
return static_cast<void*>(static_cast<uint8_t*>(payload_addr) - header_size);
}
/// returns address of memblock payload (skips memblock header)
void* top() noexcept { return static_cast<void*>(reinterpret_cast<uint8_t*>(this->head) + header_size); }
void steal_top(intrusive_memblock_list& other) noexcept
{
srsran_assert(not other.empty(), "Trying to steal from empty memblock list");
node* other_head = other.head;
other.head = other.head->next;
other_head->next = head;
head = other_head;
other.count--;
count++;
}
};
void* try_pop() noexcept
/// Similar to node_memblock_list, but manages the allocation/deallocation of memory blocks
class memblock_stack
{
public:
explicit memblock_stack(size_t node_size_, size_t node_alignment_ = detail::max_alignment) :
node_list(node_size_, node_alignment_)
{}
memblock_stack(const memblock_stack&) = delete;
memblock_stack(memblock_stack&& other) noexcept = delete;
memblock_stack& operator=(const memblock_stack&) = delete;
memblock_stack& operator=(memblock_stack&&) = delete;
~memblock_stack() { clear(); }
void clear()
{
if (is_empty()) {
return nullptr;
while (not empty()) {
deallocate_block();
}
node* last_head = head;
head = head->prev;
last_head->~node();
count--;
return static_cast<void*>(last_head);
}
bool is_empty() const { return head == nullptr; }
size_t get_memblock_size() const { return node_list.memblock_size; }
size_t get_node_max_size() const { return node_list.payload_size; }
size_t size() const { return count; }
void* allocate_block()
{
node_list.push(new uint8_t[node_list.memblock_size]);
return current_node();
}
void clear() { head = nullptr; }
void deallocate_block() noexcept
{
uint8_t* block = static_cast<uint8_t*>(node_list.pop());
delete[] block;
}
bool empty() const noexcept { return node_list.empty(); }
size_t size() const noexcept { return node_list.size(); }
void* current_node() noexcept { return node_list.top(); }
void steal_top(memblock_stack& other) noexcept { return node_list.steal_top(other.node_list); }
private:
node* head = nullptr;
size_t count = 0;
static size_t get_memblock_start_offset(size_t node_alignment)
{
return align_next(detail::intrusive_memblock_list::min_memblock_size(), node_alignment);
}
static size_t get_memblock_size(size_t node_size, size_t node_alignment)
{
return align_next(get_memblock_start_offset(node_alignment) + node_size, detail::max_alignment);
}
memblock_node_list node_list;
};
/// memblock stack that mutexes pushing/popping
class mutexed_memblock_cache
class concurrent_free_memblock_list
{
public:
mutexed_memblock_cache() = default;
mutexed_memblock_cache(const mutexed_memblock_cache&) = delete;
mutexed_memblock_cache(mutexed_memblock_cache&& other) noexcept
concurrent_free_memblock_list() = default;
concurrent_free_memblock_list(const concurrent_free_memblock_list&) = delete;
concurrent_free_memblock_list(concurrent_free_memblock_list&& other) noexcept
{
std::unique_lock<std::mutex> lk1(other.mutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(mutex, std::defer_lock);
std::lock(lk1, lk2);
stack = std::move(other.stack);
stack = other.stack;
}
mutexed_memblock_cache& operator=(const mutexed_memblock_cache&) = delete;
mutexed_memblock_cache& operator=(mutexed_memblock_cache&& other) noexcept
concurrent_free_memblock_list& operator=(const concurrent_free_memblock_list&) = delete;
concurrent_free_memblock_list& operator=(concurrent_free_memblock_list&& other) noexcept
{
std::unique_lock<std::mutex> lk1(other.mutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(mutex, std::defer_lock);
std::lock(lk1, lk2);
stack = std::move(other.stack);
stack = other.stack;
return *this;
}
template <typename T>
void push(T* block) noexcept
void push(void* block) noexcept
{
std::lock_guard<std::mutex> lock(mutex);
stack.push(block);
}
void steal_blocks(memblock_cache& other, size_t max_n) noexcept
void steal_blocks(free_memblock_list& other, size_t max_n) noexcept
{
std::lock_guard<std::mutex> lock(mutex);
for (size_t i = 0; i < max_n and not other.is_empty(); ++i) {
for (size_t i = 0; i < max_n and not other.empty(); ++i) {
stack.push(other.try_pop());
}
}
@ -144,7 +241,7 @@ public:
return i;
}
bool is_empty() const noexcept { return stack.is_empty(); }
bool empty() const noexcept { return stack.empty(); }
size_t size() const noexcept
{
@ -159,10 +256,39 @@ public:
}
private:
memblock_cache stack;
free_memblock_list stack;
mutable std::mutex mutex;
};
/**
* Manages the allocation, caching and deallocation of memory blocks.
* On alloc, a memory block is stolen from cache. If cache is empty, malloc/new is called.
* Only the last allocated memory block can be deallocated.
*/
class cached_memblock_stack
{
public:
explicit cached_memblock_stack(size_t block_size_) : used(block_size_), cache(block_size_) {}
void* allocate_block()
{
if (cache.empty()) {
used.allocate_block();
} else {
used.steal_top(cache);
}
return used.current_node();
}
void* current_node() noexcept { return used.current_node(); }
void deallocate_block() noexcept { cache.steal_top(used); }
size_t cache_size() const noexcept { return cache.size(); }
private:
memblock_stack used;
memblock_stack cache;
};
} // namespace srsran
#endif // SRSRAN_MEMBLOCK_CACHE_H

@ -0,0 +1,193 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_OBJ_POOL_H
#define SRSRAN_OBJ_POOL_H
#include "batch_mem_pool.h"
#include "memblock_cache.h"
#include "pool_interface.h"
namespace srsran {
template <typename T>
class background_obj_pool;
template <typename T>
class growing_batch_obj_pool final : public obj_pool_itf<T>
{
static size_t memblock_size()
{
/// Node Structure [ node header | (pad to node alignment) | node size | (pad to node header alignment) ]
return align_next(align_next(free_memblock_list::min_memblock_size(), alignof(T)) + sizeof(T),
free_memblock_list::min_memblock_align());
}
static size_t batch_size(size_t nof_objs_per_batch)
{
/// Batch Structure: [allocated stack header | (pad max alignment) | [memblock] x objs_per_batch ]
return align_next(detail::max_alignment + (memblock_size() * nof_objs_per_batch), detail::max_alignment);
}
public:
using init_mem_oper_t = srsran::move_callback<void(void*)>;
using recycle_oper_t = srsran::move_callback<void(T&)>;
explicit growing_batch_obj_pool(size_t objs_per_batch_,
int init_size = -1,
init_mem_oper_t init_oper_ = detail::inplace_default_ctor_operator<T>{},
recycle_oper_t recycle_oper_ = detail::noop_operator{}) :
objs_per_batch(objs_per_batch_),
init_oper(std::move(init_oper_)),
recycle_oper(std::move(recycle_oper_)),
allocated(batch_size(objs_per_batch_), detail::max_alignment),
cache(sizeof(T), alignof(T))
{
size_t N = init_size < 0 ? objs_per_batch_ : init_size;
while (N > cache.size()) {
allocate_batch();
}
}
~growing_batch_obj_pool() { clear(); }
void clear()
{
if (not allocated.empty()) {
srsran_assert(allocated.size() * objs_per_batch == cache_size(),
"Not all objects have been deallocated (%zd < %zd)",
cache_size(),
allocated.size() * objs_per_batch);
while (not cache.empty()) {
void* node_payload = cache.top();
static_cast<T*>(node_payload)->~T();
cache.pop();
}
allocated.clear();
}
}
void allocate_batch()
{
uint8_t* batch_payload = static_cast<uint8_t*>(allocated.allocate_block());
for (size_t i = 0; i < objs_per_batch; ++i) {
void* cache_node = batch_payload + (i * cache.memblock_size);
cache.push(cache_node);
init_oper(cache.top());
}
}
size_t cache_size() const { return cache.size(); }
private:
friend class background_obj_pool<T>;
T* do_allocate() final
{
if (cache.empty()) {
allocate_batch();
}
void* top = cache.top();
cache.pop();
return static_cast<T*>(top);
}
void do_deallocate(void* payload_ptr) final
{
recycle_oper(*static_cast<T*>(payload_ptr));
void* header_ptr = cache.get_node_header(payload_ptr);
cache.push(header_ptr);
}
// args
const size_t objs_per_batch;
init_mem_oper_t init_oper;
recycle_oper_t recycle_oper;
memblock_stack allocated;
memblock_node_list cache;
};
/**
* Thread-safe object pool specialized in allocating batches of objects in a preemptive way in a background thread
* to minimize latency.
* Note: The dispatched allocation jobs may outlive the pool. To handle this, the pool state is passed to jobs via a
* shared ptr.
*/
template <typename T>
class background_obj_pool final : public obj_pool_itf<T>
{
public:
using init_mem_oper_t = typename growing_batch_obj_pool<T>::init_mem_oper_t;
using recycle_oper_t = typename growing_batch_obj_pool<T>::recycle_oper_t;
explicit background_obj_pool(size_t nof_objs_per_batch,
size_t thres_,
int init_size = -1,
init_mem_oper_t init_oper_ = detail::inplace_default_ctor_operator<T>{},
recycle_oper_t recycle_oper_ = detail::noop_operator{}) :
thres(thres_),
state(std::make_shared<detached_pool_state>(this)),
grow_pool(nof_objs_per_batch, init_size, std::move(init_oper_), std::move(recycle_oper_))
{
srsran_assert(thres_ > 1, "The provided threshold=%zd is not valid", thres_);
}
~background_obj_pool()
{
std::lock_guard<std::mutex> lock(state->mutex);
state->pool = nullptr;
grow_pool.clear();
}
size_t cache_size() const { return grow_pool.cache_size(); }
private:
T* do_allocate() final
{
std::lock_guard<std::mutex> lock(state->mutex);
T* obj = grow_pool.do_allocate();
if (grow_pool.cache_size() < thres) {
allocate_batch_in_background_();
}
return obj;
}
void do_deallocate(void* ptr) final
{
std::lock_guard<std::mutex> lock(state->mutex);
return grow_pool.do_deallocate(ptr);
}
void allocate_batch_in_background_()
{
std::shared_ptr<detached_pool_state> state_copy = state;
get_background_workers().push_task([state_copy]() {
std::lock_guard<std::mutex> lock(state_copy->mutex);
if (state_copy->pool != nullptr) {
state_copy->pool->grow_pool.allocate_batch();
}
});
}
size_t thres;
// state of pool is detached because pool may be destroyed while batches are being allocated in the background
struct detached_pool_state {
std::mutex mutex;
background_obj_pool<T>* pool;
explicit detached_pool_state(background_obj_pool<T>* pool_) : pool(pool_) {}
};
std::shared_ptr<detached_pool_state> state;
growing_batch_obj_pool<T> grow_pool;
};
} // namespace srsran
#endif // SRSRAN_OBJ_POOL_H

@ -0,0 +1,63 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#ifndef SRSRAN_POOL_INTERFACE_H
#define SRSRAN_POOL_INTERFACE_H
#include "srsran/adt/move_callback.h"
namespace srsran {
/// Common object pool interface
template <typename T>
class obj_pool_itf
{
public:
struct pool_deallocator {
obj_pool_itf<T>* pool;
explicit pool_deallocator(obj_pool_itf<T>* pool_ = nullptr) : pool(pool_) {}
void operator()(void* ptr)
{
if (ptr != nullptr and pool != nullptr) {
pool->do_deallocate(ptr);
}
}
};
using object_type = T;
obj_pool_itf() = default;
// Object pool address should not change
obj_pool_itf(const obj_pool_itf&) = delete;
obj_pool_itf(obj_pool_itf&&) = delete;
obj_pool_itf& operator=(const obj_pool_itf&) = delete;
obj_pool_itf& operator=(obj_pool_itf&&) = delete;
virtual ~obj_pool_itf() = default;
std::unique_ptr<T, pool_deallocator> make()
{
return std::unique_ptr<T, pool_deallocator>(do_allocate(), pool_deallocator(this));
}
private:
// defined in child class
virtual T* do_allocate() = 0;
virtual void do_deallocate(void* ptr) = 0;
};
/// unique ptr with type-erased dtor, so that it can be used by any object pool
template <typename T>
using unique_pool_ptr = std::unique_ptr<T, typename obj_pool_itf<T>::pool_deallocator>;
} // namespace srsran
#endif // SRSRAN_POOL_INTERFACE_H

@ -27,7 +27,7 @@ struct inplace_default_ctor_operator {
struct noop_operator {
template <typename T>
void operator()(const T& ptr)
void operator()(T&& t) const
{
// do nothing
}
@ -35,6 +35,26 @@ struct noop_operator {
} // namespace detail
/// check if alignment is power of 2
constexpr bool is_valid_alignment(std::size_t alignment)
{
return alignment && (alignment & (alignment - 1)) == 0u;
}
inline bool is_aligned(void* ptr, std::size_t alignment)
{
return (reinterpret_cast<std::uintptr_t>(ptr) & (alignment - 1)) == 0;
}
constexpr std::uintptr_t align_next(std::uintptr_t pos, size_t alignment)
{
return (pos + (alignment - 1)) & ~(alignment - 1);
}
inline void* align_to(void* pos, size_t alignment)
{
return reinterpret_cast<void*>(align_next(reinterpret_cast<std::uintptr_t>(pos), alignment));
}
} // namespace srsran
#endif // SRSRAN_POOL_UTILS_H

@ -10,9 +10,9 @@
*
*/
#include "srsran/adt/pool/background_mem_pool.h"
#include "srsran/adt/pool/fixed_size_pool.h"
#include "srsran/adt/pool/mem_pool.h"
#include "srsran/adt/pool/obj_pool.h"
#include "srsran/common/test_common.h"
class C
@ -142,21 +142,32 @@ void test_fixedsize_pool()
TESTASSERT(C::default_ctor_counter == C::dtor_counter);
}
struct D : public C {
char val = '\0';
};
void test_background_pool()
{
C::default_ctor_counter = 0;
C::dtor_counter = 0;
{
srsran::background_obj_pool<C, 16, 4> obj_pool(16);
std::vector<srsran::unique_pool_ptr<C> > objs;
auto init_D_val = [](void* ptr) {
new (ptr) D();
static_cast<D*>(ptr)->val = 'c';
};
srsran::background_obj_pool<D> obj_pool(16, 4, 16, init_D_val);
TESTASSERT(obj_pool.cache_size() == 16);
std::vector<srsran::unique_pool_ptr<D> > objs;
for (size_t i = 0; i < 16 - 4; ++i) {
objs.push_back(obj_pool.allocate_object());
objs.push_back(obj_pool.make());
}
TESTASSERT(
std::all_of(objs.begin(), objs.end(), [](const srsran::unique_pool_ptr<D>& d) { return d->val == 'c'; }));
TESTASSERT(C::default_ctor_counter == 16);
// This will trigger a new batch allocation in the background
objs.push_back(obj_pool.allocate_object());
objs.push_back(obj_pool.make());
}
TESTASSERT(C::dtor_counter == C::default_ctor_counter);
}

@ -16,7 +16,7 @@
#include "sched.h"
#include "srsenb/hdr/stack/mac/schedulers/sched_time_rr.h"
#include "srsran/adt/circular_map.h"
#include "srsran/adt/pool/background_mem_pool.h"
#include "srsran/adt/pool/batch_mem_pool.h"
#include "srsran/common/mac_pcap.h"
#include "srsran/common/mac_pcap_net.h"
#include "srsran/common/task_scheduler.h"

@ -16,7 +16,7 @@
#include "mac_metrics.h"
#include "srsran/adt/circular_array.h"
#include "srsran/adt/circular_map.h"
#include "srsran/adt/pool/common_pool.h"
#include "srsran/adt/pool/pool_interface.h"
#include "srsran/common/block_queue.h"
#include "srsran/common/mac_pcap.h"
#include "srsran/common/mac_pcap_net.h"
@ -48,6 +48,7 @@ struct ue_cc_softbuffers {
cc_softbuffer_rx_list_t softbuffer_rx_list;
ue_cc_softbuffers(uint32_t nof_prb, uint32_t nof_tx_harq_proc_, uint32_t nof_rx_harq_proc_);
ue_cc_softbuffers(ue_cc_softbuffers&&) noexcept = default;
~ue_cc_softbuffers();
void clear();

@ -15,7 +15,7 @@
#include "mac_controller.h"
#include "rrc.h"
#include "srsran/adt/pool/background_mem_pool.h"
#include "srsran/adt/pool/batch_mem_pool.h"
#include "srsran/interfaces/enb_phy_interfaces.h"
#include "srsran/interfaces/pdcp_interface_types.h"
@ -153,7 +153,7 @@ public:
void operator delete(void* ptr)noexcept;
void operator delete[](void* ptr) = delete;
using ue_pool_t = srsran::background_mem_pool<ue, 16, 4>;
using ue_pool_t = srsran::background_mem_pool;
static ue_pool_t* get_ue_pool();
private:

@ -14,6 +14,7 @@
#include <string.h>
#include "srsenb/hdr/stack/mac/mac.h"
#include "srsran/adt/pool/obj_pool.h"
#include "srsran/common/rwlock_guard.h"
#include "srsran/common/standard_streams.h"
#include "srsran/common/time_prof.h"
@ -77,17 +78,15 @@ bool mac::init(const mac_args_t& args_,
reset();
// Initiate common pool of softbuffers
using softbuffer_pool_t = srsran::background_obj_pool<ue_cc_softbuffers,
16,
4,
srsran::move_callback<void(void*)>,
srsran::move_callback<void(ue_cc_softbuffers&)> >;
uint32_t nof_prb = args.nof_prb;
auto init_softbuffers = [nof_prb](void* ptr) {
new (ptr) ue_cc_softbuffers(nof_prb, SRSRAN_FDD_NOF_HARQ, SRSRAN_FDD_NOF_HARQ);
};
auto recycle_softbuffers = [](ue_cc_softbuffers& softbuffers) { softbuffers.clear(); };
softbuffer_pool.reset(new softbuffer_pool_t(std::min(args.max_nof_ues, 16U), // initial allocation size
softbuffer_pool.reset(
new srsran::background_obj_pool<ue_cc_softbuffers>(16,
4,
std::min(args.max_nof_ues, 16U), // initial allocation size
init_softbuffers,
recycle_softbuffers));

@ -219,7 +219,7 @@ ue::ue(uint16_t rnti_,
pdus.init(this);
// Allocate buffer for PCell
cc_buffers[0].allocate_cc(softbuffer_pool->allocate_object());
cc_buffers[0].allocate_cc(softbuffer_pool->make());
}
ue::~ue()
@ -549,7 +549,7 @@ void ue::allocate_ce(srsran::sch_pdu* pdu, uint32_t lcid)
// Allocate and initialize Rx/Tx softbuffers for new carriers (exclude PCell)
for (size_t i = 0; i < std::min(active_scell_list.size(), cc_buffers.size()); ++i) {
if (active_scell_list[i] and cc_buffers[i].empty()) {
cc_buffers[i].allocate_cc(softbuffer_pool->allocate_object());
cc_buffers[i].allocate_cc(softbuffer_pool->make());
}
}
} else {

@ -34,8 +34,8 @@ namespace srsenb {
rrc::rrc(srsran::task_sched_handle task_sched_) :
logger(srslog::fetch_basic_logger("RRC")), task_sched(task_sched_), rx_pdu_queue(64)
{
pending_paging.clear();
rrc::ue::get_ue_pool()->allocate_batch_in_background();
// initialize ue pool
rrc::ue::get_ue_pool();
}
rrc::~rrc() {}

@ -69,7 +69,7 @@ rrc::ue::ue_pool_t* rrc::ue::get_ue_pool()
{
// Note: batch allocation is going to be explicitly called in enb class construction. The pool object, therefore,
// will only be initialized if we instantiate an eNB
static rrc::ue::ue_pool_t ue_pool;
static rrc::ue::ue_pool_t ue_pool(16, sizeof(ue), 4);
return &ue_pool;
}

Loading…
Cancel
Save