fixed size buffer pool bugfix - deleted threads now return the cached memory blocks to the pool shared memory block container

master
Francisco 4 years ago committed by Francisco Paisana
parent be771e5c23
commit eb38ff43ab

@ -34,6 +34,7 @@ template <size_t ObjSize, bool DebugSanitizeAddress = false>
class concurrent_fixed_memory_pool class concurrent_fixed_memory_pool
{ {
static_assert(ObjSize > 256, "This pool is particularly designed for large objects"); static_assert(ObjSize > 256, "This pool is particularly designed for large objects");
using pool_type = concurrent_fixed_memory_pool<ObjSize, DebugSanitizeAddress>;
struct obj_storage_t { struct obj_storage_t {
typename std::aligned_storage<ObjSize, alignof(detail::max_alignment_t)>::type buffer; typename std::aligned_storage<ObjSize, alignof(detail::max_alignment_t)>::type buffer;
@ -41,7 +42,7 @@ class concurrent_fixed_memory_pool
explicit obj_storage_t(std::thread::id id_) : worker_id(id_) {} explicit obj_storage_t(std::thread::id id_) : worker_id(id_) {}
}; };
const static size_t batch_steal_size = 10; const static size_t batch_steal_size = 16;
// ctor only accessible from singleton get_instance() // ctor only accessible from singleton get_instance()
explicit concurrent_fixed_memory_pool(size_t nof_objects_) explicit concurrent_fixed_memory_pool(size_t nof_objects_)
@ -82,18 +83,18 @@ public:
void* allocate_node(size_t sz) void* allocate_node(size_t sz)
{ {
srsran_assert(sz <= ObjSize, "Allocated node size=%zd exceeds max object size=%zd", sz, ObjSize); srsran_assert(sz <= ObjSize, "Allocated node size=%zd exceeds max object size=%zd", sz, ObjSize);
memblock_cache* worker_cache = get_worker_cache(); worker_ctxt* worker_ctxt = get_worker_cache();
void* node = worker_cache->try_pop(); void* node = worker_ctxt->cache.try_pop();
if (node == nullptr) { if (node == nullptr) {
// fill the thread local cache enough for this and next allocations // fill the thread local cache enough for this and next allocations
std::array<void*, batch_steal_size> popped_blocks; std::array<void*, batch_steal_size> popped_blocks;
size_t n = shared_mem_cache.try_pop(popped_blocks); size_t n = shared_mem_cache.try_pop(popped_blocks);
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
new (popped_blocks[i]) obj_storage_t(std::this_thread::get_id()); new (popped_blocks[i]) obj_storage_t(worker_ctxt->id);
worker_cache->push(static_cast<uint8_t*>(popped_blocks[i])); worker_ctxt->cache.push(static_cast<void*>(popped_blocks[i]));
} }
node = worker_cache->try_pop(); node = worker_ctxt->cache.try_pop();
} }
#ifdef SRSRAN_BUFFER_POOL_LOG_ENABLED #ifdef SRSRAN_BUFFER_POOL_LOG_ENABLED
@ -107,8 +108,8 @@ public:
void deallocate_node(void* p) void deallocate_node(void* p)
{ {
srsran_assert(p != nullptr, "Deallocated nodes must have valid address"); srsran_assert(p != nullptr, "Deallocated nodes must have valid address");
memblock_cache* worker_cache = get_worker_cache(); worker_ctxt* worker_ctxt = get_worker_cache();
obj_storage_t* block_ptr = static_cast<obj_storage_t*>(p); obj_storage_t* block_ptr = static_cast<obj_storage_t*>(p);
if (DebugSanitizeAddress) { if (DebugSanitizeAddress) {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -119,7 +120,7 @@ public:
(long unsigned)block_ptr); (long unsigned)block_ptr);
} }
if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= max_objs_per_cache) { if (block_ptr->worker_id != worker_ctxt->id or worker_ctxt->cache.size() >= max_objs_per_cache) {
// if block was allocated in a different thread or local cache reached max capacity, send block to shared // if block was allocated in a different thread or local cache reached max capacity, send block to shared
// container // container
shared_mem_cache.push(static_cast<void*>(block_ptr)); shared_mem_cache.push(static_cast<void*>(block_ptr));
@ -127,7 +128,7 @@ public:
} }
// push to local memory block cache // push to local memory block cache
worker_cache->push(static_cast<uint8_t*>(p)); worker_ctxt->cache.push(static_cast<void*>(p));
} }
void enable_logger(bool enabled) void enable_logger(bool enabled)
@ -147,9 +148,22 @@ public:
} }
private: private:
memblock_cache* get_worker_cache() struct worker_ctxt {
std::thread::id id;
memblock_cache cache;
worker_ctxt() : id(std::this_thread::get_id()) {}
~worker_ctxt()
{
while (not cache.is_empty()) {
pool_type::get_instance()->shared_mem_cache.push(cache.try_pop());
}
}
};
worker_ctxt* get_worker_cache()
{ {
thread_local memblock_cache worker_cache; thread_local worker_ctxt worker_cache;
return &worker_cache; return &worker_cache;
} }

@ -134,6 +134,7 @@ void test_fixedsize_pool()
TESTASSERT(obj != nullptr); TESTASSERT(obj != nullptr);
} }
stop.store(true); stop.store(true);
fixed_pool->print_all_buffers();
t.join(); t.join();
} }
fixed_pool->print_all_buffers(); fixed_pool->print_all_buffers();

Loading…
Cancel
Save