|
|
|
@ -11,6 +11,7 @@
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#include "srslte/common/thread_pool.h"
|
|
|
|
|
#include "srslte/srslog/srslog.h"
|
|
|
|
|
#include <assert.h>
|
|
|
|
|
#include <chrono>
|
|
|
|
|
#include <stdio.h>
|
|
|
|
@ -244,11 +245,11 @@ uint32_t thread_pool::get_nof_workers()
|
|
|
|
|
* once a worker is available
|
|
|
|
|
*************************************************************************/
|
|
|
|
|
|
|
|
|
|
task_thread_pool::task_thread_pool(uint32_t nof_workers) : running(false)
|
|
|
|
|
task_thread_pool::task_thread_pool(uint32_t nof_workers, bool start_deferred, int32_t prio_, uint32_t mask_) :
|
|
|
|
|
logger(srslog::fetch_basic_logger("POOL")), workers(std::max(1u, nof_workers))
|
|
|
|
|
{
|
|
|
|
|
workers.reserve(nof_workers);
|
|
|
|
|
for (uint32_t i = 0; i < nof_workers; ++i) {
|
|
|
|
|
workers.emplace_back(this, i);
|
|
|
|
|
if (not start_deferred) {
|
|
|
|
|
start(prio_, mask_);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -257,12 +258,34 @@ task_thread_pool::~task_thread_pool()
|
|
|
|
|
stop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void task_thread_pool::start(int32_t prio, uint32_t mask)
|
|
|
|
|
void task_thread_pool::set_nof_workers(uint32_t nof_workers)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(queue_mutex);
|
|
|
|
|
if (workers.size() > nof_workers) {
|
|
|
|
|
logger.error("Reducing the number of workers dynamically not supported");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
uint32_t old_size = workers.size();
|
|
|
|
|
workers.resize(nof_workers);
|
|
|
|
|
if (running) {
|
|
|
|
|
for (uint32_t i = old_size; i < nof_workers; ++i) {
|
|
|
|
|
workers[i].reset(new worker_t(this, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void task_thread_pool::start(int32_t prio_, uint32_t mask_)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(queue_mutex);
|
|
|
|
|
if (running) {
|
|
|
|
|
logger.error("Starting thread pool that has already started");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
prio = prio_;
|
|
|
|
|
mask = mask_;
|
|
|
|
|
running = true;
|
|
|
|
|
for (worker_t& w : workers) {
|
|
|
|
|
w.setup(prio, mask);
|
|
|
|
|
for (uint32_t i = 0; i < workers.size(); ++i) {
|
|
|
|
|
workers[i].reset(new worker_t(this, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -272,8 +295,8 @@ void task_thread_pool::stop()
|
|
|
|
|
if (running) {
|
|
|
|
|
running = false;
|
|
|
|
|
bool workers_running = false;
|
|
|
|
|
for (worker_t& w : workers) {
|
|
|
|
|
if (w.is_running()) {
|
|
|
|
|
for (std::unique_ptr<worker_t>& w : workers) {
|
|
|
|
|
if (w->is_running()) {
|
|
|
|
|
workers_running = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -282,21 +305,12 @@ void task_thread_pool::stop()
|
|
|
|
|
if (workers_running) {
|
|
|
|
|
cv_empty.notify_all();
|
|
|
|
|
}
|
|
|
|
|
for (worker_t& w : workers) {
|
|
|
|
|
w.stop();
|
|
|
|
|
for (std::unique_ptr<worker_t>& w : workers) {
|
|
|
|
|
w->stop();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void task_thread_pool::push_task(const task_t& task)
|
|
|
|
|
{
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(queue_mutex);
|
|
|
|
|
pending_tasks.push(task);
|
|
|
|
|
}
|
|
|
|
|
cv_empty.notify_one();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void task_thread_pool::push_task(task_t&& task)
|
|
|
|
|
{
|
|
|
|
|
{
|
|
|
|
@ -306,17 +320,20 @@ void task_thread_pool::push_task(task_t&& task)
|
|
|
|
|
cv_empty.notify_one();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint32_t task_thread_pool::nof_pending_tasks()
|
|
|
|
|
uint32_t task_thread_pool::nof_pending_tasks() const
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(queue_mutex);
|
|
|
|
|
return pending_tasks.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
task_thread_pool::worker_t::worker_t(srslte::task_thread_pool* parent_, uint32_t my_id) :
|
|
|
|
|
parent(parent_),
|
|
|
|
|
thread(std::string("TASKWORKER") + std::to_string(my_id)),
|
|
|
|
|
id_(my_id)
|
|
|
|
|
parent(parent_), thread(std::string("TASKWORKER") + std::to_string(my_id)), id_(my_id), running(true)
|
|
|
|
|
{
|
|
|
|
|
if (parent->mask == 255) {
|
|
|
|
|
start(parent->prio);
|
|
|
|
|
} else {
|
|
|
|
|
start_cpu_mask(parent->prio, parent->mask);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void task_thread_pool::worker_t::stop()
|
|
|
|
@ -324,16 +341,6 @@ void task_thread_pool::worker_t::stop()
|
|
|
|
|
wait_thread_finish();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void task_thread_pool::worker_t::setup(int32_t prio, uint32_t mask)
|
|
|
|
|
{
|
|
|
|
|
running = true;
|
|
|
|
|
if (mask == 255) {
|
|
|
|
|
start(prio);
|
|
|
|
|
} else {
|
|
|
|
|
start_cpu_mask(prio, mask);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool task_thread_pool::worker_t::wait_task(task_t* task)
|
|
|
|
|
{
|
|
|
|
|
std::unique_lock<std::mutex> lock(parent->queue_mutex);
|
|
|
|
@ -355,7 +362,7 @@ void task_thread_pool::worker_t::run_thread()
|
|
|
|
|
// main loop
|
|
|
|
|
task_t task;
|
|
|
|
|
while (wait_task(&task)) {
|
|
|
|
|
task(id());
|
|
|
|
|
task();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// on exit, notify pool class
|
|
|
|
@ -363,4 +370,11 @@ void task_thread_pool::worker_t::run_thread()
|
|
|
|
|
running = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Global thread pool for long, low-priority tasks
|
|
|
|
|
task_thread_pool& get_background_workers()
|
|
|
|
|
{
|
|
|
|
|
static task_thread_pool background_workers;
|
|
|
|
|
return background_workers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} // namespace srslte
|
|
|
|
|