/* * Copyright 2013-2019 Software Radio Systems Limited * * This file is part of srsLTE. * * srsLTE is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * srsLTE is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * A copy of the GNU Affero General Public License can be found in * the LICENSE file in the top-level directory of this distribution * and at http://www.gnu.org/licenses/. * */ /****************************************************************************** * File: multiqueue.h * Description: General-purpose non-blocking multiqueue. It behaves as a list * of bounded/unbounded queues. *****************************************************************************/ #ifndef SRSLTE_MULTIQUEUE_H #define SRSLTE_MULTIQUEUE_H #include #include #include #include #include #include namespace srslte { template class multiqueue_handler { // NOTE: needed to create a queue wrapper to make its move ctor noexcept. // otherwise we couldnt use the resize method of std::vector> if myobj is move-only class queue_wrapper : private std::queue { public: queue_wrapper() = default; queue_wrapper(queue_wrapper&& other) noexcept : std::queue(std::move(other)) {} using std::queue::push; using std::queue::pop; using std::queue::size; using std::queue::empty; using std::queue::front; std::condition_variable cv_full; bool active = true; }; public: class queue_handler { public: queue_handler() = default; queue_handler(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} template void push(FwdRef&& value) { parent->push(queue_id, std::forward(value)); } bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } private: multiqueue_handler* parent = nullptr; int queue_id = -1; }; explicit multiqueue_handler(uint32_t capacity_ = std::numeric_limits::max()) : capacity(capacity_) {} ~multiqueue_handler() { reset(); } void reset() { std::unique_lock lock(mutex); running = false; while (nof_threads_waiting > 0) { uint32_t size = queues.size(); lock.unlock(); cv_empty.notify_one(); for (uint32_t i = 0; i < size; ++i) { queues[i].cv_full.notify_all(); } lock.lock(); // wait for all threads to unblock cv_exit.wait(lock); } queues.clear(); } int add_queue() { uint32_t qidx = 0; std::lock_guard lock(mutex); if (not running) { return -1; } for (; qidx < queues.size() and queues[qidx].active; ++qidx) ; if (qidx == queues.size()) { // create new queue queues.emplace_back(); } else { queues[qidx].active = true; } return (int)qidx; } int nof_queues() { std::lock_guard lock(mutex); uint32_t count = 0; for (uint32_t i = 0; i < queues.size(); ++i) { count += queues[i].active ? 1 : 0; } return count; } template void push(int q_idx, FwdRef&& value) { { std::unique_lock lock(mutex); while (is_queue_active_(q_idx) and queues[q_idx].size() >= capacity) { nof_threads_waiting++; queues[q_idx].cv_full.wait(lock); nof_threads_waiting--; } if (not is_queue_active_(q_idx)) { cv_exit.notify_one(); return; } queues[q_idx].push(std::forward(value)); } cv_empty.notify_one(); } bool try_push(int q_idx, const myobj& value) { { std::lock_guard lock(mutex); if (not is_queue_active_(q_idx) or queues[q_idx].size() >= capacity) { return false; } queues[q_idx].push(value); } cv_empty.notify_one(); return true; } std::pair try_push(int q_idx, myobj&& value) { { std::lock_guard lck(mutex); if (not is_queue_active_(q_idx) or queues[q_idx].size() >= capacity) { return {false, std::move(value)}; } queues[q_idx].push(std::move(value)); } cv_empty.notify_one(); return {true, std::move(value)}; } int wait_pop(myobj* value) { std::unique_lock lock(mutex); while (running) { // Round-robin for all queues for (const queue_wrapper& q : queues) { spin_idx = (spin_idx + 1) % queues.size(); if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { if (value) { *value = std::move(queues[spin_idx].front()); } queues[spin_idx].pop(); if (nof_threads_waiting > 0) { lock.unlock(); queues[spin_idx].cv_full.notify_one(); } return spin_idx; } } nof_threads_waiting++; cv_empty.wait(lock); nof_threads_waiting--; } cv_exit.notify_one(); return -1; } bool empty(int qidx) { std::lock_guard lck(mutex); return queues[qidx].empty(); } size_t size(int qidx) { std::lock_guard lck(mutex); return queues[qidx].size(); } const myobj& front(int qidx) { std::lock_guard lck(mutex); return queues[qidx].front(); } void erase_queue(int qidx) { std::lock_guard lck(mutex); if (is_queue_active_(qidx)) { queues[qidx].active = false; while (not queues[qidx].empty()) { queues[qidx].pop(); } } } bool is_queue_active(int qidx) { std::lock_guard lck(mutex); return is_queue_active_(qidx); } queue_handler get_queue_handler() { return {this, add_queue()}; } private: bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } std::mutex mutex; std::condition_variable cv_empty, cv_exit; uint32_t spin_idx = 0; bool running = true; std::vector queues; uint32_t capacity = 0; uint32_t nof_threads_waiting = 0; }; /*********************************************************** * Specialization for tasks with content that is move-only **********************************************************/ class move_task_t { public: move_task_t() = default; template move_task_t(Func&& f) : task_ptr(new derived_task(std::forward(f))) { } void operator()() { (*task_ptr)(); } private: struct base_task { virtual ~base_task() {} virtual void operator()() = 0; }; template struct derived_task : public base_task { derived_task(Func&& f_) : f(std::forward(f_)) {} void operator()() final { f(); } private: Func f; }; std::unique_ptr task_ptr; }; using task_multiqueue = multiqueue_handler; } // namespace srslte #endif // SRSLTE_MULTIQUEUE_H