Refactor thread_pool: use std::mutex and fix some hidden potential bugs

master
Ismael Gomez 5 years ago committed by Xavier Arteaga
parent 82cb6baef0
commit fd3d4a7874

@ -51,7 +51,6 @@ public:
public: public:
worker(); worker();
void setup(uint32_t id, thread_pool* parent, uint32_t prio = 0, uint32_t mask = 255); void setup(uint32_t id, thread_pool* parent, uint32_t prio = 0, uint32_t mask = 255);
virtual void stop();
uint32_t get_id(); uint32_t get_id();
void release(); void release();
@ -59,9 +58,9 @@ public:
virtual void work_imp() = 0; virtual void work_imp() = 0;
private: private:
uint32_t my_id; uint32_t my_id = 0;
thread_pool* my_parent; thread_pool* my_parent = nullptr;
bool running;
void run_thread(); void run_thread();
void wait_to_start(); void wait_to_start();
void finished(); void finished();
@ -70,7 +69,7 @@ public:
thread_pool(uint32_t nof_workers); thread_pool(uint32_t nof_workers);
void init_worker(uint32_t id, worker*, uint32_t prio = 0, uint32_t mask = 255); void init_worker(uint32_t id, worker*, uint32_t prio = 0, uint32_t mask = 255);
void stop(); void stop();
worker* wait_worker(); worker* wait_worker_id(uint32_t id);
worker* wait_worker(uint32_t tti); worker* wait_worker(uint32_t tti);
worker* wait_worker_nb(uint32_t tti); worker* wait_worker_nb(uint32_t tti);
void start_worker(worker*); void start_worker(worker*);
@ -81,18 +80,16 @@ public:
private: private:
bool find_finished_worker(uint32_t tti, uint32_t* id); bool find_finished_worker(uint32_t tti, uint32_t* id);
typedef enum { IDLE, START_WORK, WORKER_READY, WORKING } worker_status; typedef enum { STOP, IDLE, START_WORK, WORKER_READY, WORKING } worker_status;
std::vector<worker*> workers; std::vector<worker*> workers = {};
uint32_t nof_workers; uint32_t nof_workers = 0;
uint32_t max_workers; uint32_t max_workers = 0;
bool running; bool running = false;
pthread_cond_t cvar_queue; std::condition_variable cvar_queue = {};
pthread_mutex_t mutex_queue; std::mutex mutex_queue = {};
std::vector<worker_status> status; std::vector<worker_status> status = {};
std::vector<pthread_cond_t> cvar; std::vector<std::condition_variable> cvar_worker = {};
std::vector<pthread_mutex_t> mutex;
std::stack<worker*> available_workers;
}; };
class task_thread_pool class task_thread_pool

@ -133,7 +133,7 @@ int main(int argc, char** argv)
srslte::log_filter radio_log("radio", &logger); srslte::log_filter radio_log("radio", &logger);
std::vector<std::unique_ptr<srslte::log_filter> > worker_logs; std::vector<std::unique_ptr<srslte::log_filter> > worker_logs;
radio_log.set_level("info"); radio_log.set_level("none");
// Radio // Radio
dummy_radio radio(radio_log); dummy_radio radio(radio_log);
@ -146,7 +146,7 @@ int main(int argc, char** argv)
// Create log filter // Create log filter
srslte::log_filter* log_filter = new srslte::log_filter(log_name, &logger); srslte::log_filter* log_filter = new srslte::log_filter(log_name, &logger);
log_filter->set_level("info"); log_filter->set_level("none");
// Create worker // Create worker
auto* worker = new dummy_worker(i, &tti_semaphore, log_filter, &radio); auto* worker = new dummy_worker(i, &tti_semaphore, log_filter, &radio);

@ -31,11 +31,9 @@
printf(fmt, __VA_ARGS__); \ printf(fmt, __VA_ARGS__); \
} while (0) } while (0)
#define USE_QUEUE
namespace srslte { namespace srslte {
thread_pool::worker::worker() : my_id(0), running(false), my_parent(NULL), thread("THREAD_POOL_WORKER") {} thread_pool::worker::worker() : thread("THREAD_POOL_WORKER") {}
void thread_pool::worker::setup(uint32_t id, thread_pool* parent, uint32_t prio, uint32_t mask) void thread_pool::worker::setup(uint32_t id, thread_pool* parent, uint32_t prio, uint32_t mask)
{ {
@ -52,10 +50,9 @@ void thread_pool::worker::setup(uint32_t id, thread_pool* parent, uint32_t prio,
void thread_pool::worker::run_thread() void thread_pool::worker::run_thread()
{ {
set_name(std::string("WORKER") + std::to_string(my_id)); set_name(std::string("WORKER") + std::to_string(my_id));
running = true; while (my_parent->status[my_id] != STOP) {
while (running) {
wait_to_start(); wait_to_start();
if (running) { if (my_parent->status[my_id] != STOP) {
work_imp(); work_imp();
finished(); finished();
} }
@ -67,65 +64,53 @@ uint32_t thread_pool::worker::get_id()
return my_id; return my_id;
} }
void thread_pool::worker::stop() thread_pool::thread_pool(uint32_t max_workers_) : workers(max_workers_), status(max_workers_), cvar_worker(max_workers_)
{
running = false;
pthread_cond_signal(&my_parent->cvar[my_id]);
wait_thread_finish();
}
thread_pool::thread_pool(uint32_t max_workers_) :
workers(max_workers_),
status(max_workers_),
cvar(max_workers_),
mutex(max_workers_)
{ {
max_workers = max_workers_; max_workers = max_workers_;
for (uint32_t i = 0; i < max_workers; i++) { for (uint32_t i = 0; i < max_workers; i++) {
workers[i] = NULL; workers[i] = NULL;
status[i] = IDLE; status[i] = IDLE;
pthread_mutex_init(&mutex[i], NULL);
pthread_cond_init(&cvar[i], NULL);
} }
pthread_mutex_init(&mutex_queue, NULL);
pthread_cond_init(&cvar_queue, NULL);
running = true; running = true;
nof_workers = 0; nof_workers = 0;
} }
void thread_pool::init_worker(uint32_t id, worker* obj, uint32_t prio, uint32_t mask) void thread_pool::init_worker(uint32_t id, worker* obj, uint32_t prio, uint32_t mask)
{ {
std::lock_guard<std::mutex> lock(mutex_queue);
if (id < max_workers) { if (id < max_workers) {
if (id >= nof_workers) { if (id >= nof_workers) {
nof_workers = id + 1; nof_workers = id + 1;
} }
pthread_mutex_lock(&mutex_queue);
workers[id] = obj; workers[id] = obj;
available_workers.push(obj);
obj->setup(id, this, prio, mask); obj->setup(id, this, prio, mask);
pthread_cond_signal(&cvar_queue); cvar_queue.notify_all();
pthread_mutex_unlock(&mutex_queue);
} }
} }
void thread_pool::stop() void thread_pool::stop()
{ {
mutex_queue.lock();
/* Stop any thread waiting for available worker */ /* Stop any thread waiting for available worker */
running = false; running = false;
/* Now stop all workers */ /* Now stop all workers */
for (uint32_t i = 0; i < nof_workers; i++) { for (uint32_t i = 0; i < nof_workers; i++) {
if (workers[i]) { if (workers[i]) {
workers[i]->stop(); debug_thread("stop(): stoping %d\n", i);
// Need to call start to wake it up status[i] = STOP;
start_worker(i); cvar_worker[i].notify_all();
workers[i]->wait_thread_finish(); cvar_queue.notify_all();
} }
pthread_cond_destroy(&cvar[i]);
pthread_mutex_destroy(&mutex[i]);
} }
pthread_cond_destroy(&cvar_queue); mutex_queue.unlock();
pthread_mutex_destroy(&mutex_queue);
for (uint32_t i = 0; i < nof_workers; i++) {
debug_thread("stop(): waiting %d\n", i);
workers[i]->wait_thread_finish();
debug_thread("stop(): done %d\n", i);
}
} }
void thread_pool::worker::release() void thread_pool::worker::release()
@ -135,40 +120,28 @@ void thread_pool::worker::release()
void thread_pool::worker::wait_to_start() void thread_pool::worker::wait_to_start()
{ {
std::unique_lock<std::mutex> lock(my_parent->mutex_queue);
debug_thread("wait_to_start() id=%d, status=%d, enter\n", my_id, my_parent->status[my_id]); debug_thread("wait_to_start() id=%d, status=%d, enter\n", my_id, my_parent->status[my_id]);
pthread_mutex_lock(&my_parent->mutex[my_id]); while (my_parent->status[my_id] != START_WORK && my_parent->status[my_id] != STOP) {
while (my_parent->status[my_id] != START_WORK && running) { my_parent->cvar_worker[my_id].wait(lock);
pthread_cond_wait(&my_parent->cvar[my_id], &my_parent->mutex[my_id]); }
if (my_parent->status[my_id] != STOP) {
my_parent->status[my_id] = WORKING;
} }
my_parent->status[my_id] = WORKING;
pthread_mutex_unlock(&my_parent->mutex[my_id]);
debug_thread("wait_to_start() id=%d, status=%d, exit\n", my_id, my_parent->status[my_id]); debug_thread("wait_to_start() id=%d, status=%d, exit\n", my_id, my_parent->status[my_id]);
} }
void thread_pool::worker::finished() void thread_pool::worker::finished()
{ {
#ifdef USE_QUEUE std::lock_guard<std::mutex> lock(my_parent->mutex_queue);
pthread_mutex_lock(&my_parent->mutex[my_id]); if (my_parent->status[my_id] != STOP) {
my_parent->status[my_id] = IDLE; my_parent->status[my_id] = IDLE;
pthread_mutex_unlock(&my_parent->mutex[my_id]); my_parent->cvar_worker[my_id].notify_all();
my_parent->cvar_queue.notify_all();
pthread_mutex_lock(&my_parent->mutex_queue); }
pthread_cond_signal(&my_parent->cvar_queue);
pthread_mutex_unlock(&my_parent->mutex_queue);
#else
pthread_mutex_lock(&my_parent->mutex[my_id]);
my_parent->status[my_id] = IDLE;
pthread_cond_signal(&my_parent->cvar[my_id]);
pthread_mutex_unlock(&my_parent->mutex[my_id]);
#endif
}
thread_pool::worker* thread_pool::wait_worker()
{
return wait_worker(0);
} }
bool thread_pool::find_finished_worker(uint32_t tti, uint32_t* id) bool thread_pool::find_finished_worker(uint32_t tti, uint32_t* id)
@ -182,77 +155,73 @@ bool thread_pool::find_finished_worker(uint32_t tti, uint32_t* id)
return false; return false;
} }
thread_pool::worker* thread_pool::wait_worker(uint32_t tti) thread_pool::worker* thread_pool::wait_worker_id(uint32_t id)
{ {
thread_pool::worker* x; std::unique_lock<std::mutex> lock(mutex_queue);
#ifdef USE_QUEUE debug_thread("wait_worker_id() - enter - id=%d, state0=%d, state1=%d\n", id, status[0], status[1]);
debug_thread("wait_worker() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]);
pthread_mutex_lock(&mutex_queue); thread_pool::worker* ret = nullptr;
uint32_t id = 0;
while (!find_finished_worker(tti, &id) && running) { while (status[id] != IDLE && running) {
pthread_cond_wait(&cvar_queue, &mutex_queue); cvar_queue.wait(lock);
} }
pthread_mutex_unlock(&mutex_queue);
if (running) { if (running) {
x = workers[id]; ret = workers[id];
pthread_mutex_lock(&mutex[id]);
status[id] = WORKER_READY; status[id] = WORKER_READY;
pthread_mutex_unlock(&mutex[id]);
} else {
x = NULL;
} }
debug_thread("wait_worker() - exit - id=%d\n", id); debug_thread("wait_worker_id() - exit - id=%d\n", id);
#else return ret;
}
uint32_t id = tti % nof_workers; thread_pool::worker* thread_pool::wait_worker(uint32_t tti)
pthread_mutex_lock(&mutex[id]); {
while (status[id] != IDLE && running) { std::unique_lock<std::mutex> lock(mutex_queue);
pthread_cond_wait(&cvar[id], &mutex[id]);
debug_thread("wait_worker() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]);
thread_pool::worker* ret = nullptr;
uint32_t id = 0;
while (!find_finished_worker(tti, &id) && running) {
cvar_queue.wait(lock);
} }
if (running) { if (running) {
x = (worker*)workers[id]; ret = workers[id];
status[id] = WORKER_READY; status[id] = WORKER_READY;
} else {
x = NULL;
} }
pthread_mutex_unlock(&mutex[id]); debug_thread("wait_worker() - exit - id=%d\n", id);
#endif return ret;
return x;
} }
thread_pool::worker* thread_pool::wait_worker_nb(uint32_t tti) thread_pool::worker* thread_pool::wait_worker_nb(uint32_t tti)
{ {
thread_pool::worker* x; std::unique_lock<std::mutex> lock(mutex_queue);
debug_thread("wait_worker() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]); debug_thread("wait_worker_nb() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]);
pthread_mutex_lock(&mutex_queue);
uint32_t id = 0; thread_pool::worker* ret = nullptr;
if (find_finished_worker(tti, &id)) { uint32_t id = 0;
x = workers[id];
} else { if (find_finished_worker(tti, &id) && running) {
x = NULL; ret = workers[id];
}
pthread_mutex_unlock(&mutex_queue);
if (running && x) {
pthread_mutex_lock(&mutex[id]);
status[id] = WORKER_READY; status[id] = WORKER_READY;
pthread_mutex_unlock(&mutex[id]);
} else {
x = NULL;
} }
debug_thread("wait_worker() - exit - id=%d\n", id);
return x; debug_thread("wait_worker_nb() - exit - id=%d\n", id);
return ret;
} }
void thread_pool::start_worker(uint32_t id) void thread_pool::start_worker(uint32_t id)
{ {
std::unique_lock<std::mutex> lock(mutex_queue);
if (id < nof_workers) { if (id < nof_workers) {
pthread_mutex_lock(&mutex[id]);
status[id] = START_WORK;
pthread_cond_signal(&cvar[id]);
pthread_mutex_unlock(&mutex[id]);
debug_thread("start_worker() id=%d, status=%d\n", id, status[id]); debug_thread("start_worker() id=%d, status=%d\n", id, status[id]);
if (status[id] != STOP) {
status[id] = START_WORK;
cvar_worker[id].notify_all();
cvar_queue.notify_all();
}
} }
} }

@ -37,7 +37,6 @@ public:
sf_worker() = default; sf_worker() = default;
~sf_worker(); ~sf_worker();
void init(phy_common* phy, srslte::log* log_h); void init(phy_common* phy, srslte::log* log_h);
void stop() final;
cf_t* get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx); cf_t* get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx);
void set_time(uint32_t tti, uint32_t tx_worker_cnt, srslte_timestamp_t tx_time); void set_time(uint32_t tti, uint32_t tx_worker_cnt, srslte_timestamp_t tx_time);

@ -106,13 +106,6 @@ void sf_worker::init(phy_common* phy_, srslte::log* log_h_)
#endif #endif
} }
void sf_worker::stop()
{
std::lock_guard<std::mutex> lock(work_mutex);
running = false;
srslte::thread_pool::worker::stop();
}
cf_t* sf_worker::get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx) cf_t* sf_worker::get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx)
{ {
return cc_workers[cc_idx]->get_buffer_rx(antenna_idx); return cc_workers[cc_idx]->get_buffer_rx(antenna_idx);

@ -90,7 +90,7 @@ void sf_worker::reset()
bool sf_worker::set_cell(uint32_t cc_idx, srslte_cell_t cell_) bool sf_worker::set_cell(uint32_t cc_idx, srslte_cell_t cell_)
{ {
std::lock_guard<std::mutex> lock(mutex); // std::lock_guard<std::mutex> lock(mutex);
if (cc_idx < cc_workers.size()) { if (cc_idx < cc_workers.size()) {
if (!cc_workers[cc_idx]->set_cell(cell_)) { if (!cc_workers[cc_idx]->set_cell(cell_)) {

Loading…
Cancel
Save