/* * Copyright 2013-2020 Software Radio Systems Limited * * This file is part of srsLTE. * * srsLTE is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * srsLTE is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * A copy of the GNU Affero General Public License can be found in * the LICENSE file in the top-level directory of this distribution * and at http://www.gnu.org/licenses/. * */ /****************************************************************************** * File: multiqueue.h * Description: General-purpose non-blocking multiqueue. It behaves as a list * of bounded/unbounded queues. *****************************************************************************/ #ifndef SRSLTE_MULTIQUEUE_H #define SRSLTE_MULTIQUEUE_H #include "inplace_task.h" #include #include #include #include #include #include namespace srslte { template class multiqueue_handler { class circular_buffer { public: circular_buffer(uint32_t cap) : buffer(cap + 1) {} circular_buffer(circular_buffer&& other) noexcept { active = other.active; other.active = false; widx = other.widx; ridx = other.ridx; buffer = std::move(other.buffer); } std::condition_variable cv_full; bool active = true; bool empty() const { return widx == ridx; } size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } size_t capacity() const { return buffer.size() - 1; } template void push(T&& o) noexcept { buffer[widx++] = std::forward(o); if (widx >= buffer.size()) { widx = 0; } } void pop() noexcept { ridx++; if (ridx >= buffer.size()) { ridx = 0; } } myobj& front() noexcept { return buffer[ridx]; } const myobj& front() const noexcept { return buffer[ridx]; } private: std::vector buffer; size_t widx = 0, ridx = 0; }; public: class queue_handler { public: queue_handler() = default; queue_handler(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} template void push(FwdRef&& value) { parent->push(queue_id, std::forward(value)); } bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } private: multiqueue_handler* parent = nullptr; int queue_id = -1; }; explicit multiqueue_handler(uint32_t capacity_ = 8192) : capacity(capacity_) {} ~multiqueue_handler() { reset(); } void reset() { std::unique_lock lock(mutex); running = false; while (nof_threads_waiting > 0) { uint32_t size = queues.size(); lock.unlock(); cv_empty.notify_one(); for (uint32_t i = 0; i < size; ++i) { queues[i].cv_full.notify_all(); } lock.lock(); // wait for all threads to unblock cv_exit.wait(lock); } queues.clear(); } int add_queue() { uint32_t qidx = 0; std::lock_guard lock(mutex); if (not running) { return -1; } for (; qidx < queues.size() and queues[qidx].active; ++qidx) ; if (qidx == queues.size()) { // create new queue queues.emplace_back(capacity); } else { queues[qidx].active = true; } return (int)qidx; } int nof_queues() { std::lock_guard lock(mutex); uint32_t count = 0; for (uint32_t i = 0; i < queues.size(); ++i) { count += queues[i].active ? 1 : 0; } return count; } template void push(int q_idx, FwdRef&& value) { { std::unique_lock lock(mutex); while (is_queue_active_(q_idx) and queues[q_idx].full()) { nof_threads_waiting++; queues[q_idx].cv_full.wait(lock); nof_threads_waiting--; } if (not is_queue_active_(q_idx)) { cv_exit.notify_one(); return; } queues[q_idx].push(std::forward(value)); } cv_empty.notify_one(); } bool try_push(int q_idx, const myobj& value) { { std::lock_guard lock(mutex); if (not is_queue_active_(q_idx) or queues[q_idx].full()) { return false; } queues[q_idx].push(value); } cv_empty.notify_one(); return true; } std::pair try_push(int q_idx, myobj&& value) { { std::lock_guard lck(mutex); if (not is_queue_active_(q_idx) or queues[q_idx].full()) { return {false, std::move(value)}; } queues[q_idx].push(std::move(value)); } cv_empty.notify_one(); return {true, std::move(value)}; } int wait_pop(myobj* value) { std::unique_lock lock(mutex); while (running) { if (round_robin_pop_(value)) { if (nof_threads_waiting > 0) { lock.unlock(); queues[spin_idx].cv_full.notify_one(); } return spin_idx; } nof_threads_waiting++; cv_empty.wait(lock); nof_threads_waiting--; } cv_exit.notify_one(); return -1; } int try_pop(myobj* value) { std::unique_lock lock(mutex); if (running) { if (round_robin_pop_(value)) { if (nof_threads_waiting > 0) { lock.unlock(); queues[spin_idx].cv_full.notify_one(); } return spin_idx; } // didn't find any task return -1; } cv_exit.notify_one(); return -1; } bool empty(int qidx) { std::lock_guard lck(mutex); return queues[qidx].empty(); } size_t size(int qidx) { std::lock_guard lck(mutex); return queues[qidx].size(); } const myobj& front(int qidx) { std::lock_guard lck(mutex); return queues[qidx].front(); } void erase_queue(int qidx) { std::lock_guard lck(mutex); if (is_queue_active_(qidx)) { queues[qidx].active = false; while (not queues[qidx].empty()) { queues[qidx].pop(); } } } bool is_queue_active(int qidx) { std::lock_guard lck(mutex); return is_queue_active_(qidx); } queue_handler get_queue_handler() { return {this, add_queue()}; } private: bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } bool round_robin_pop_(myobj* value) { // Round-robin for all queues for (const circular_buffer& q : queues) { spin_idx = (spin_idx + 1) % queues.size(); if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { if (value) { *value = std::move(queues[spin_idx].front()); } queues[spin_idx].pop(); return true; } } return false; } std::mutex mutex; std::condition_variable cv_empty, cv_exit; uint32_t spin_idx = 0; bool running = true; std::vector queues; uint32_t capacity = 0; uint32_t nof_threads_waiting = 0; }; /*********************************************************** * Specialization for tasks with content that is move-only **********************************************************/ template class move_function { public: move_function() = default; template move_function(Func&& f) : task_ptr(new derived_task(std::forward(f))) {} void operator()(Args&&... args) { (*task_ptr)(std::forward(args)...); } private: struct base_task { virtual ~base_task() {} virtual void operator()(Args&&...) = 0; }; template struct derived_task : public base_task { derived_task(Func&& f_) : f(std::forward(f_)) {} void operator()(Args&&... args) final { f(std::forward(args)...); } private: Func f; }; std::unique_ptr task_ptr; }; // using move_task_t = move_function<>; using move_task_t = inplace_task; using task_multiqueue = multiqueue_handler; } // namespace srslte #endif // SRSLTE_MULTIQUEUE_H