adt - improved concurrent fixed memory pool policy to send buffers from thread cache to central cache

master
Francisco 4 years ago committed by Francisco Paisana
parent eb38ff43ab
commit 60cd7e6cfe

@ -21,25 +21,26 @@ namespace srsran {
/** /**
* Concurrent fixed size memory pool made of blocks of equal size * Concurrent fixed size memory pool made of blocks of equal size
* Each worker keeps a separate thread-local memory block cache that uses for fast allocation/deallocation. * Each worker keeps a separate thread-local memory block cache that it uses for fast allocation/deallocation.
* When this cache gets depleted, the worker tries to obtain blocks from a shared memory block cache * When this cache gets depleted, the worker tries to obtain blocks from a central memory block cache.
* Note: This pool does not implement stealing of blocks between workers, so it is possible that a worker can't allocate * When accessing a thread local cache, no locks are required.
* while another worker still has blocks in its own cache. This situation is avoided by upper bounding the * Since there is no stealing of blocks between workers, it is possible that a worker can't allocate while another
* size of each worker cache * worker still has blocks in its own cache. To minimize the impact of this event, an upper bound is place on a worker
* Note2: Taking into account the usage of thread_local, this class is made a singleton * thread cache size. Once a worker reaches that upper bound, it sends half of its stored blocks to the central cache.
* Note: Taking into account the usage of thread_local, this class is made a singleton
* Note2: No considerations were made regarding false sharing between threads. It is assumed that the blocks are big
* enough to fill a cache line.
* @tparam NofObjects number of objects in the pool * @tparam NofObjects number of objects in the pool
* @tparam ObjSize object size * @tparam ObjSize object size
*/ */
template <size_t ObjSize, bool DebugSanitizeAddress = false> template <size_t ObjSize, bool DebugSanitizeAddress = false>
class concurrent_fixed_memory_pool class concurrent_fixed_memory_pool
{ {
static_assert(ObjSize > 256, "This pool is particularly designed for large objects"); static_assert(ObjSize > 256, "This pool is particularly designed for large objects.");
using pool_type = concurrent_fixed_memory_pool<ObjSize, DebugSanitizeAddress>; using pool_type = concurrent_fixed_memory_pool<ObjSize, DebugSanitizeAddress>;
struct obj_storage_t { struct obj_storage_t {
typename std::aligned_storage<ObjSize, alignof(detail::max_alignment_t)>::type buffer; typename std::aligned_storage<ObjSize, alignof(detail::max_alignment_t)>::type buffer;
std::thread::id worker_id;
explicit obj_storage_t(std::thread::id id_) : worker_id(id_) {}
}; };
const static size_t batch_steal_size = 16; const static size_t batch_steal_size = 16;
@ -47,17 +48,17 @@ class concurrent_fixed_memory_pool
// ctor only accessible from singleton get_instance() // ctor only accessible from singleton get_instance()
explicit concurrent_fixed_memory_pool(size_t nof_objects_) explicit concurrent_fixed_memory_pool(size_t nof_objects_)
{ {
srsran_assert(nof_objects_ > 0, "A positive pool size must be provided"); srsran_assert(nof_objects_ > batch_steal_size, "A positive pool size must be provided");
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
allocated_blocks.resize(nof_objects_); allocated_blocks.resize(nof_objects_);
for (std::unique_ptr<obj_storage_t>& b : allocated_blocks) { for (std::unique_ptr<obj_storage_t>& b : allocated_blocks) {
b.reset(new obj_storage_t(std::this_thread::get_id())); b.reset(new obj_storage_t());
srsran_assert(b.get() != nullptr, "Failed to instantiate fixed memory pool"); srsran_assert(b.get() != nullptr, "Failed to instantiate fixed memory pool");
shared_mem_cache.push(static_cast<void*>(b.get())); central_mem_cache.push(static_cast<void*>(b.get()));
} }
max_objs_per_cache = allocated_blocks.size() / 16; local_growth_thres = allocated_blocks.size() / 16;
max_objs_per_cache = max_objs_per_cache < batch_steal_size ? batch_steal_size : max_objs_per_cache; local_growth_thres = local_growth_thres < batch_steal_size ? batch_steal_size : local_growth_thres;
} }
public: public:
@ -89,9 +90,9 @@ public:
if (node == nullptr) { if (node == nullptr) {
// fill the thread local cache enough for this and next allocations // fill the thread local cache enough for this and next allocations
std::array<void*, batch_steal_size> popped_blocks; std::array<void*, batch_steal_size> popped_blocks;
size_t n = shared_mem_cache.try_pop(popped_blocks); size_t n = central_mem_cache.try_pop(popped_blocks);
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
new (popped_blocks[i]) obj_storage_t(worker_ctxt->id); new (popped_blocks[i]) obj_storage_t();
worker_ctxt->cache.push(static_cast<void*>(popped_blocks[i])); worker_ctxt->cache.push(static_cast<void*>(popped_blocks[i]));
} }
node = worker_ctxt->cache.try_pop(); node = worker_ctxt->cache.try_pop();
@ -116,19 +117,17 @@ public:
srsran_assert(std::any_of(allocated_blocks.begin(), srsran_assert(std::any_of(allocated_blocks.begin(),
allocated_blocks.end(), allocated_blocks.end(),
[block_ptr](const std::unique_ptr<obj_storage_t>& b) { return b.get() == block_ptr; }), [block_ptr](const std::unique_ptr<obj_storage_t>& b) { return b.get() == block_ptr; }),
"Error deallocating block with address 0x%lx.", "Error deallocating block with address 0x%lx",
(long unsigned)block_ptr); (long unsigned)block_ptr);
} }
if (block_ptr->worker_id != worker_ctxt->id or worker_ctxt->cache.size() >= max_objs_per_cache) {
// if block was allocated in a different thread or local cache reached max capacity, send block to shared
// container
shared_mem_cache.push(static_cast<void*>(block_ptr));
return;
}
// push to local memory block cache // push to local memory block cache
worker_ctxt->cache.push(static_cast<void*>(p)); worker_ctxt->cache.push(static_cast<void*>(p));
if (worker_ctxt->cache.size() >= local_growth_thres) {
// if local cache reached max capacity, send half of the blocks to central cache
central_mem_cache.steal_blocks(worker_ctxt->cache, worker_ctxt->cache.size() / 2);
}
} }
void enable_logger(bool enabled) void enable_logger(bool enabled)
@ -142,9 +141,17 @@ public:
} }
void print_all_buffers() void print_all_buffers()
{
auto* worker = get_worker_cache();
size_t tot_blocks = 0;
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
printf("There are %zd/%zd buffers in shared block container.\n", shared_mem_cache.size(), allocated_blocks.size()); tot_blocks = allocated_blocks.size();
}
printf("There are %zd/%zd buffers in shared block container. This thread contains %zd in its local cache\n",
central_mem_cache.size(),
tot_blocks,
worker->cache.size());
} }
private: private:
@ -155,9 +162,8 @@ private:
worker_ctxt() : id(std::this_thread::get_id()) {} worker_ctxt() : id(std::this_thread::get_id()) {}
~worker_ctxt() ~worker_ctxt()
{ {
while (not cache.is_empty()) { mutexed_memblock_cache& central_cache = pool_type::get_instance()->central_mem_cache;
pool_type::get_instance()->shared_mem_cache.push(cache.try_pop()); central_cache.steal_blocks(cache, cache.size());
}
} }
}; };
@ -178,10 +184,10 @@ private:
} }
} }
size_t max_objs_per_cache = 0; size_t local_growth_thres = 0;
srslog::basic_logger* logger = nullptr; srslog::basic_logger* logger = nullptr;
mutexed_memblock_cache shared_mem_cache; mutexed_memblock_cache central_mem_cache;
std::mutex mutex; std::mutex mutex;
std::vector<std::unique_ptr<obj_storage_t> > allocated_blocks; std::vector<std::unique_ptr<obj_storage_t> > allocated_blocks;
}; };

@ -106,6 +106,14 @@ public:
stack.push(block); stack.push(block);
} }
void steal_blocks(memblock_cache& other, size_t max_n) noexcept
{
std::lock_guard<std::mutex> lock(mutex);
for (size_t i = 0; i < max_n and not other.is_empty(); ++i) {
stack.push(other.try_pop());
}
}
uint8_t* try_pop() noexcept uint8_t* try_pop() noexcept
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);

Loading…
Cancel
Save