fix issue with thread pool shut down

master
Francisco Paisana 5 years ago committed by Andre Puschmann
parent 3f1fad3f4e
commit a2f1998350

@ -107,7 +107,7 @@ class task_thread_pool
using task_t = std::function<void(uint32_t worker_id)>;
public:
task_thread_pool(uint32_t nof_workers);
explicit task_thread_pool(uint32_t nof_workers);
~task_thread_pool();
void start(int32_t prio = -1, uint32_t mask = 255);
void stop();
@ -121,6 +121,7 @@ private:
{
public:
explicit worker_t(task_thread_pool* parent_, uint32_t id);
void stop();
void setup(int32_t prio, uint32_t mask);
bool is_running() const { return running; }
uint32_t id() const { return id_; }
@ -138,9 +139,8 @@ private:
std::queue<task_t> pending_tasks;
std::vector<worker_t> workers;
std::mutex queue_mutex;
std::condition_variable cv_empty, cv_exit;
std::condition_variable cv_empty;
bool running;
uint32_t nof_workers_running = 0;
};
} // namespace srslte

@ -311,20 +311,21 @@ void task_thread_pool::start(int32_t prio, uint32_t mask)
void task_thread_pool::stop()
{
std::unique_lock<std::mutex> lock(queue_mutex);
running = false;
nof_workers_running = 0;
// next worker that is still running
for (worker_t& w : workers) {
if (w.is_running()) {
nof_workers_running++;
if (running) {
running = false;
bool workers_running = false;
for (worker_t& w : workers) {
if (w.is_running()) {
workers_running = true;
break;
}
}
}
if (nof_workers_running > 0) {
lock.unlock();
cv_empty.notify_all();
lock.lock();
while (nof_workers_running > 0) {
cv_exit.wait(lock);
if (workers_running) {
cv_empty.notify_all();
}
for (worker_t& w : workers) {
w.stop();
}
}
}
@ -360,8 +361,14 @@ task_thread_pool::worker_t::worker_t(srslte::task_thread_pool* parent_, uint32_t
{
}
void task_thread_pool::worker_t::stop()
{
wait_thread_finish();
}
void task_thread_pool::worker_t::setup(int32_t prio, uint32_t mask)
{
running = true;
if (mask == 255) {
start(prio);
} else {
@ -387,8 +394,6 @@ bool task_thread_pool::worker_t::wait_task(task_t* task)
void task_thread_pool::worker_t::run_thread()
{
running = true;
// main loop
task_t task;
while (wait_task(&task)) {
@ -398,11 +403,6 @@ void task_thread_pool::worker_t::run_thread()
// on exit, notify pool class
std::unique_lock<std::mutex> lock(parent->queue_mutex);
running = false;
parent->nof_workers_running--;
if (parent->nof_workers_running == 0) {
lock.unlock();
parent->cv_exit.notify_one();
}
}
} // namespace srslte

@ -305,6 +305,21 @@ int test_task_thread_pool2()
return 0;
}
int test_task_thread_pool3()
{
std::cout << "\n====== TEST task thread pool test 3: start ======\n";
// Description: create many workers and shut down the pool before all of them started yet. Should exit cleanly
uint32_t nof_workers = 100;
task_thread_pool thread_pool(nof_workers);
thread_pool.start();
std::cout << "outcome: Success\n";
std::cout << "===================================================\n";
return 0;
}
int main()
{
TESTASSERT(test_multiqueue() == 0);
@ -314,4 +329,5 @@ int main()
TESTASSERT(test_task_thread_pool() == 0);
TESTASSERT(test_task_thread_pool2() == 0);
TESTASSERT(test_task_thread_pool3() == 0);
}

@ -285,7 +285,7 @@ void phy::configure_mbsfn(sib_type2_s* sib2, sib_type13_r9_s* sib13, mcch_msg_s
// Start GUI
void phy::start_plot()
{
((sf_worker*)&workers[0])->start_plot();
workers[0].start_plot();
}
} // namespace srsenb

Loading…
Cancel
Save