cleanup of multiqueue unused methods, and made queue_handle move semantics correct

master
Francisco 4 years ago committed by Francisco Paisana
parent ef9d1b8c13
commit d574afcd33

@ -32,290 +32,6 @@ namespace srsran {
#define MULTIQUEUE_DEFAULT_CAPACITY (8192) // Default per-queue capacity
// template <typename myobj>
// class multiqueue_handler
//{
// class circular_buffer
// {
// public:
// circular_buffer(uint32_t cap) : buffer(cap + 1) {}
// circular_buffer(circular_buffer&& other) noexcept
// {
// active = other.active;
// other.active = false;
// widx = other.widx;
// ridx = other.ridx;
// buffer = std::move(other.buffer);
// }
//
// std::condition_variable cv_full;
// bool active = true;
//
// bool empty() const { return widx == ridx; }
// size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); }
// bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; }
// size_t capacity() const { return buffer.size() - 1; }
//
// template <typename T>
// void push(T&& o) noexcept
// {
// buffer[widx++] = std::forward<T>(o);
// if (widx >= buffer.size()) {
// widx = 0;
// }
// }
//
// void pop() noexcept
// {
// ridx++;
// if (ridx >= buffer.size()) {
// ridx = 0;
// }
// }
//
// myobj& front() noexcept { return buffer[ridx]; }
// const myobj& front() const noexcept { return buffer[ridx]; }
//
// private:
// std::vector<myobj> buffer;
// size_t widx = 0, ridx = 0;
// };
//
// public:
// class queue_handle
// {
// public:
// queue_handle() = default;
// queue_handle(multiqueue_handler<myobj>* parent_, int id) : parent(parent_), queue_id(id) {}
// template <typename FwdRef>
// void push(FwdRef&& value)
// {
// parent->push(queue_id, std::forward<FwdRef>(value));
// }
// bool try_push(const myobj& value) { return parent->try_push(queue_id, value); }
// std::pair<bool, myobj> try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); }
// size_t size() { return parent->size(queue_id); }
//
// private:
// multiqueue_handler<myobj>* parent = nullptr;
// int queue_id = -1;
// };
//
// explicit multiqueue_handler(uint32_t capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(capacity_) {}
// ~multiqueue_handler() { reset(); }
//
// void reset()
// {
// std::unique_lock<std::mutex> lock(mutex);
// running = false;
// while (nof_threads_waiting > 0) {
// uint32_t size = queues.size();
// cv_empty.notify_one();
// for (uint32_t i = 0; i < size; ++i) {
// queues[i].cv_full.notify_all();
// }
// // wait for all threads to unblock
// cv_exit.wait(lock);
// }
// queues.clear();
// }
//
// /**
// * Adds a new queue with fixed capacity
// * @param capacity_ The capacity of the queue.
// * @return The index of the newly created (or reused) queue within the vector of queues.
// */
// int add_queue(uint32_t capacity_)
// {
// uint32_t qidx = 0;
// std::lock_guard<std::mutex> lock(mutex);
// if (not running) {
// return -1;
// }
// for (; qidx < queues.size() and queues[qidx].active; ++qidx)
// ;
//
// // check if there is a free queue of the required size
// if (qidx == queues.size() || queues[qidx].capacity() != capacity_) {
// // create new queue
// queues.emplace_back(capacity_);
// qidx = queues.size() - 1; // update qidx to the last element
// } else {
// queues[qidx].active = true;
// }
// return (int)qidx;
// }
//
// /**
// * Add queue using the default capacity of the underlying multiqueue
// * @return The queue index
// */
// int add_queue() { return add_queue(capacity); }
//
// int nof_queues()
// {
// std::lock_guard<std::mutex> lock(mutex);
// uint32_t count = 0;
// for (uint32_t i = 0; i < queues.size(); ++i) {
// count += queues[i].active ? 1 : 0;
// }
// return count;
// }
//
// template <typename FwdRef>
// void push(int q_idx, FwdRef&& value)
// {
// {
// std::unique_lock<std::mutex> lock(mutex);
// while (is_queue_active_(q_idx) and queues[q_idx].full()) {
// nof_threads_waiting++;
// queues[q_idx].cv_full.wait(lock);
// nof_threads_waiting--;
// }
// if (not is_queue_active_(q_idx)) {
// cv_exit.notify_one();
// return;
// }
// queues[q_idx].push(std::forward<FwdRef>(value));
// }
// cv_empty.notify_one();
// }
//
// bool try_push(int q_idx, const myobj& value)
// {
// {
// std::lock_guard<std::mutex> lock(mutex);
// if (not is_queue_active_(q_idx) or queues[q_idx].full()) {
// return false;
// }
// queues[q_idx].push(value);
// }
// cv_empty.notify_one();
// return true;
// }
//
// std::pair<bool, myobj> try_push(int q_idx, myobj&& value)
// {
// {
// std::lock_guard<std::mutex> lck(mutex);
// if (not is_queue_active_(q_idx) or queues[q_idx].full()) {
// return {false, std::move(value)};
// }
// queues[q_idx].push(std::move(value));
// }
// cv_empty.notify_one();
// return {true, std::move(value)};
// }
//
// int wait_pop(myobj* value)
// {
// std::unique_lock<std::mutex> lock(mutex);
// while (running) {
// if (round_robin_pop_(value)) {
// if (nof_threads_waiting > 0) {
// lock.unlock();
// queues[spin_idx].cv_full.notify_one();
// }
// return spin_idx;
// }
// nof_threads_waiting++;
// cv_empty.wait(lock);
// nof_threads_waiting--;
// }
// cv_exit.notify_one();
// return -1;
// }
//
// int try_pop(myobj* value)
// {
// std::unique_lock<std::mutex> lock(mutex);
// if (running) {
// if (round_robin_pop_(value)) {
// if (nof_threads_waiting > 0) {
// lock.unlock();
// queues[spin_idx].cv_full.notify_one();
// }
// return spin_idx;
// }
// // didn't find any task
// return -1;
// }
// cv_exit.notify_one();
// return -1;
// }
//
// bool empty(int qidx)
// {
// std::lock_guard<std::mutex> lck(mutex);
// return queues[qidx].empty();
// }
//
// size_t size(int qidx)
// {
// std::lock_guard<std::mutex> lck(mutex);
// return queues[qidx].size();
// }
//
// size_t max_size(int qidx)
// {
// std::lock_guard<std::mutex> lck(mutex);
// return queues[qidx].capacity();
// }
//
// const myobj& front(int qidx)
// {
// std::lock_guard<std::mutex> lck(mutex);
// return queues[qidx].front();
// }
//
// void erase_queue(int qidx)
// {
// std::lock_guard<std::mutex> lck(mutex);
// if (is_queue_active_(qidx)) {
// queues[qidx].active = false;
// while (not queues[qidx].empty()) {
// queues[qidx].pop();
// }
// }
// }
//
// bool is_queue_active(int qidx)
// {
// std::lock_guard<std::mutex> lck(mutex);
// return is_queue_active_(qidx);
// }
//
// queue_handle get_queue_handler() { return {this, add_queue()}; }
// queue_handle get_queue_handler(uint32_t size) { return {this, add_queue(size)}; }
//
// private:
// bool is_queue_active_(int qidx) const { return running and queues[qidx].active; }
//
// bool round_robin_pop_(myobj* value)
// {
// // Round-robin for all queues
// for (const circular_buffer& q : queues) {
// spin_idx = (spin_idx + 1) % queues.size();
// if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) {
// if (value) {
// *value = std::move(queues[spin_idx].front());
// }
// queues[spin_idx].pop();
// return true;
// }
// }
// return false;
// }
//
// std::mutex mutex;
// std::condition_variable cv_empty, cv_exit;
// uint32_t spin_idx = 0;
// bool running = true;
// std::vector<circular_buffer> queues;
// uint32_t capacity = 0;
// uint32_t nof_threads_waiting = 0;
// };
/**
* N-to-1 Message-Passing Broker that manages the creation, destruction of input ports, and popping of messages that
* are pushed to these ports.
@ -332,15 +48,11 @@ class multiqueue_handler
{
public:
input_port_impl(uint32_t cap, multiqueue_handler<myobj>* parent_) : buffer(cap), parent(parent_) {}
input_port_impl(input_port_impl&& other) noexcept
{
std::lock_guard<std::mutex> lock(other.q_mutex);
active_ = other.active_;
parent = other.parent_;
other.active_ = false;
buffer = std::move(other.buffer);
}
~input_port_impl() { set_active_blocking(false); }
input_port_impl(const input_port_impl&) = delete;
input_port_impl(input_port_impl&&) = delete;
input_port_impl& operator=(const input_port_impl&) = delete;
input_port_impl& operator=(input_port_impl&&) = delete;
~input_port_impl() { deactivate_blocking(); }
size_t capacity() const { return buffer.max_size(); }
size_t size() const
@ -369,18 +81,16 @@ class multiqueue_handler
}
}
void set_active_blocking(bool val)
void deactivate_blocking()
{
set_active(val);
set_active(false);
if (not val) {
// wait for all the pushers to unlock
std::unique_lock<std::mutex> lock(q_mutex);
while (nof_waiting > 0) {
cv_exit.wait(lock);
}
}
}
template <typename T>
void push(T&& o) noexcept
@ -448,8 +158,7 @@ public:
class queue_handle
{
public:
queue_handle() = default;
queue_handle(input_port_impl* impl_) : impl(impl_) {}
explicit queue_handle(input_port_impl* impl_ = nullptr) : impl(impl_) {}
template <typename FwdRef>
void push(FwdRef&& value)
{
@ -460,7 +169,7 @@ public:
void reset()
{
if (impl != nullptr) {
impl->set_active_blocking(false);
impl->deactivate_blocking();
impl = nullptr;
}
}
@ -474,10 +183,20 @@ public:
bool operator!=(const queue_handle& other) const { return impl != other.impl; }
private:
input_port_impl* impl = nullptr;
struct recycle_op {
void operator()(input_port_impl* p)
{
if (p != nullptr) {
p->deactivate_blocking();
}
}
};
std::unique_ptr<input_port_impl, recycle_op> impl;
};
explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(default_capacity_) {}
explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) :
default_capacity(default_capacity_)
{}
~multiqueue_handler() { reset(); }
void reset()
@ -492,10 +211,7 @@ public:
cv_empty.notify_one();
cv_exit.wait(lock);
}
for (auto& q : queues) {
// ensure that all queues are completed with the deactivation before clearing the memory
q.set_active_blocking(false);
}
// queue destructor ensures that the pushing threads have been notified of the queue deactivation in a blocking way
queues.clear();
}
@ -529,7 +245,7 @@ public:
* Add queue using the default capacity of the underlying multiqueue
* @return The queue index
*/
queue_handle add_queue() { return add_queue(capacity); }
queue_handle add_queue() { return add_queue(default_capacity); }
uint32_t nof_queues() const
{
@ -561,10 +277,7 @@ public:
bool try_pop(myobj* value)
{
std::unique_lock<std::mutex> lock(mutex);
if (running and round_robin_pop_(value)) {
return true;
}
return false;
return running and round_robin_pop_(value);
}
private:
@ -590,7 +303,7 @@ private:
uint32_t spin_idx = 0;
bool running = true, wait_pop_state = false;
std::deque<input_port_impl> queues;
uint32_t capacity = 0;
uint32_t default_capacity = 0;
};
template <typename T>

Loading…
Cancel
Save