diff --git a/lib/include/srslte/common/block_queue.h b/lib/include/srslte/common/block_queue.h index b481022a8..1d17e9339 100644 --- a/lib/include/srslte/common/block_queue.h +++ b/lib/include/srslte/common/block_queue.h @@ -24,6 +24,15 @@ * */ + +/****************************************************************************** + * File: block_queue.h + * Description: General-purpose blocking queue. It can behave as a bounded or + * unbounded blocking queue and allows blocking and non-blocking + * operations in both push and pop + *****************************************************************************/ + + #ifndef SRSLTE_BLOCK_QUEUE_H #define SRSLTE_BLOCK_QUEUE_H @@ -32,21 +41,47 @@ #include #include #include +#include + namespace srslte { template class block_queue { public: - block_queue() { + block_queue(int capacity = -1) { pthread_mutex_init(&mutex, NULL); - pthread_cond_init(&cvar, NULL); + pthread_cond_init(&cv_empty, NULL); + pthread_cond_init(&cv_full, NULL); + this->capacity = capacity; } - void push(const myobj& value) { + void resize(int new_capacity) { + capacity = new_capacity; + } + bool push_(const myobj& value, bool block) { pthread_mutex_lock(&mutex); + if (capacity > 0) { + if (block) { + while(q.size() > (uint32_t) capacity) { + pthread_cond_wait(&cv_full, &mutex); + } + } else { + pthread_mutex_unlock(&mutex); + return false; + } + } q.push(value); - pthread_cond_signal(&cvar); + pthread_cond_signal(&cv_empty); pthread_mutex_unlock(&mutex); + return true; + } + + void push(const myobj& value) { + push_(value, true); + } + + bool try_push(const myobj& value) { + return push_(value, false); } bool try_pop(myobj *value) { @@ -59,6 +94,7 @@ public: *value = q.front(); q.pop(); } + pthread_cond_signal(&cv_full); pthread_mutex_unlock(&mutex); return true; } @@ -66,15 +102,16 @@ public: myobj wait_pop() { // blocking pop pthread_mutex_lock(&mutex); while(q.empty()) { - pthread_cond_wait(&cvar, &mutex); + pthread_cond_wait(&cv_empty, &mutex); } myobj value = q.front(); q.pop(); + pthread_cond_signal(&cv_full); pthread_mutex_unlock(&mutex); return value; } - bool empty() const { // queue is empty? + bool empty() { // queue is empty? pthread_mutex_lock(&mutex); bool ret = q.empty(); pthread_mutex_unlock(&mutex); @@ -86,6 +123,10 @@ public: while (try_pop(item)); } + myobj front() { + return q.front(); + } + size_t size() { return q.size(); } @@ -93,7 +134,9 @@ public: private: std::queue q; pthread_mutex_t mutex; - pthread_cond_t cvar; + pthread_cond_t cv_empty; + pthread_cond_t cv_full; + int capacity; }; } diff --git a/lib/include/srslte/common/msg_queue.h b/lib/include/srslte/common/msg_queue.h deleted file mode 100644 index e3a86941d..000000000 --- a/lib/include/srslte/common/msg_queue.h +++ /dev/null @@ -1,168 +0,0 @@ -/** - * - * \section COPYRIGHT - * - * Copyright 2013-2015 Software Radio Systems Limited - * - * \section LICENSE - * - * This file is part of the srsUE library. - * - * srsUE is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * srsUE is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * A copy of the GNU Affero General Public License can be found in - * the LICENSE file in the top-level directory of this distribution - * and at http://www.gnu.org/licenses/. - * - */ - -/****************************************************************************** - * File: msg_queue.h - * Description: Thread-safe bounded circular buffer of srsue_byte_buffer pointers. - * Reference: - *****************************************************************************/ - -#ifndef SRSLTE_MSG_QUEUE_H -#define SRSLTE_MSG_QUEUE_H - -#include "srslte/common/common.h" -#include - -namespace srslte { - -class msg_queue -{ -public: - msg_queue(uint32_t capacity_ = 128) - :head(0) - ,tail(0) - ,unread(0) - ,unread_bytes(0) - ,capacity(capacity_) - { - buf = new byte_buffer_t*[capacity]; - pthread_mutex_init(&mutex, NULL); - pthread_cond_init(¬_empty, NULL); - pthread_cond_init(¬_full, NULL); - } - - ~msg_queue() - { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(¬_empty); - pthread_cond_destroy(¬_full); - delete [] buf; - } - - void write(byte_buffer_t *msg) - { - pthread_mutex_lock(&mutex); - while(is_full()) { - pthread_cond_wait(¬_full, &mutex); - } - buf[head] = msg; - head = (head+1)%capacity; - unread++; - unread_bytes += msg->N_bytes; - - pthread_cond_signal(¬_empty); - pthread_mutex_unlock(&mutex); - } - - void read(byte_buffer_t **msg) - { - pthread_mutex_lock(&mutex); - while(is_empty()) { - pthread_cond_wait(¬_empty, &mutex); - } - *msg = buf[tail]; - tail = (tail+1)%capacity; - unread--; - /* FIXME: When byte_buffer_t is reset() but is in the queue, it gets corrupted - * because N_bytes becomes 0 and queue is empty but unread_bytes > 0 */ - unread_bytes -= (*msg)->N_bytes; - - pthread_cond_signal(¬_full); - pthread_mutex_unlock(&mutex); - } - - bool try_read(byte_buffer_t **msg) - { - pthread_mutex_lock(&mutex); - if(is_empty()) - { - pthread_mutex_unlock(&mutex); - return false; - }else{ - *msg = buf[tail]; - tail = (tail+1)%capacity; - unread--; - unread_bytes -= (*msg)->N_bytes; - pthread_cond_signal(¬_full); - pthread_mutex_unlock(&mutex); - return true; - } - } - - uint32_t size() - { - pthread_mutex_lock(&mutex); - uint32_t r = unread; - pthread_mutex_unlock(&mutex); - return r; - } - - uint32_t size_bytes() - { - pthread_mutex_lock(&mutex); - uint32_t r = unread_bytes; - pthread_mutex_unlock(&mutex); - return r; - } - - uint32_t size_tail_bytes() - { - pthread_mutex_lock(&mutex); - uint32_t r = 0; - if (buf[tail]) { - r = buf[tail]->N_bytes; - } - pthread_mutex_unlock(&mutex); - return r; - } - - // This is a hack to reset N_bytes counter when queue is corrupted (see line 89) - void reset() { - pthread_mutex_lock(&mutex); - unread_bytes = 0; - unread = 0; - pthread_mutex_unlock(&mutex); - } - -private: - bool is_empty() const { return unread == 0; } - bool is_full() const { return unread == capacity; } - - pthread_cond_t not_empty; - pthread_cond_t not_full; - pthread_mutex_t mutex; - byte_buffer_t **buf; - uint32_t capacity; - uint32_t unread; - uint32_t unread_bytes; - uint32_t head; - uint32_t tail; -}; - -} // namespace srslte - - -#endif // SRSLTE_MSG_QUEUE_H diff --git a/lib/include/srslte/upper/rlc_am.h b/lib/include/srslte/upper/rlc_am.h index 894f96ac5..be6c1f669 100644 --- a/lib/include/srslte/upper/rlc_am.h +++ b/lib/include/srslte/upper/rlc_am.h @@ -31,7 +31,7 @@ #include "srslte/common/log.h" #include "srslte/common/common.h" #include "srslte/interfaces/ue_interfaces.h" -#include "srslte/common/msg_queue.h" +#include "srslte/upper/rlc_tx_queue.h" #include "srslte/common/timeout.h" #include "srslte/upper/rlc_common.h" #include @@ -104,7 +104,7 @@ private: srsue::rrc_interface_rlc *rrc; // TX SDU buffers - msg_queue tx_sdu_queue; + rlc_tx_queue tx_sdu_queue; byte_buffer_t *tx_sdu; // PDU being resegmented diff --git a/lib/include/srslte/upper/rlc_tm.h b/lib/include/srslte/upper/rlc_tm.h index 5408cb835..d3a8aa1ae 100644 --- a/lib/include/srslte/upper/rlc_tm.h +++ b/lib/include/srslte/upper/rlc_tm.h @@ -31,7 +31,7 @@ #include "srslte/common/log.h" #include "srslte/common/common.h" #include "srslte/interfaces/ue_interfaces.h" -#include "srslte/common/msg_queue.h" +#include "srslte/upper/rlc_tx_queue.h" #include "srslte/upper/rlc_common.h" namespace srslte { @@ -72,7 +72,7 @@ private: srsue::rrc_interface_rlc *rrc; // Thread-safe queues for MAC messages - msg_queue ul_queue; + rlc_tx_queue ul_queue; }; } // namespace srsue diff --git a/lib/include/srslte/upper/rlc_tx_queue.h b/lib/include/srslte/upper/rlc_tx_queue.h new file mode 100644 index 000000000..dee326ab6 --- /dev/null +++ b/lib/include/srslte/upper/rlc_tx_queue.h @@ -0,0 +1,117 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2015 Software Radio Systems Limited + * + * \section LICENSE + * + * This file is part of the srsUE library. + * + * srsUE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsUE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +/****************************************************************************** + * File: rlc_tx_queue.h + * Description: Queue used in RLC TM/UM/AM TX queues. + * Uses a blocking queue with bounded capacity to block higher layers + * when pushing Uplink traffic + * Reference: + *****************************************************************************/ + +#ifndef SRSLTE_MSG_QUEUE_H +#define SRSLTE_MSG_QUEUE_H + +#include "srslte/common/block_queue.h" +#include "srslte/common/common.h" +#include + +namespace srslte { + +class rlc_tx_queue +{ +public: + rlc_tx_queue(uint32_t capacity = 128) : queue((int) capacity) { + unread_bytes = 0; + } + void write(byte_buffer_t *msg) + { + queue.push(msg); + unread_bytes += msg->N_bytes; + } + + void read(byte_buffer_t **msg) + { + byte_buffer_t *m = queue.wait_pop(); + *msg = m; + if (unread_bytes > (*msg)->N_bytes) { + unread_bytes -= (*msg)->N_bytes; + } else { + unread_bytes = 0; + } + } + + bool try_read(byte_buffer_t **msg) + { + if (queue.try_pop(msg)) { + if (unread_bytes > (*msg)->N_bytes) { + unread_bytes -= (*msg)->N_bytes; + } else { + unread_bytes = 0; + } + return true; + } else { + return false; + } + } + + uint32_t size() + { + return (uint32_t) queue.size(); + } + + uint32_t size_bytes() + { + return unread_bytes; + } + + uint32_t size_tail_bytes() + { + if (!queue.empty()) { + byte_buffer_t *m = queue.front(); + if (m) { + return m->N_bytes; + } + } + return 0; + } + + // This is a hack to reset N_bytes counter when queue is corrupted (see line 89) + void reset() { + unread_bytes = 0; + } + +private: + bool is_empty() { return queue.empty(); } + + block_queue queue; + uint32_t unread_bytes; +}; + +} // namespace srslte + + +#endif // SRSLTE_MSG_QUEUE_H diff --git a/lib/include/srslte/upper/rlc_um.h b/lib/include/srslte/upper/rlc_um.h index 8e6f527e2..d41192d6d 100644 --- a/lib/include/srslte/upper/rlc_um.h +++ b/lib/include/srslte/upper/rlc_um.h @@ -31,7 +31,7 @@ #include "srslte/common/log.h" #include "srslte/common/common.h" #include "srslte/interfaces/ue_interfaces.h" -#include "srslte/common/msg_queue.h" +#include "srslte/upper/rlc_tx_queue.h" #include "srslte/upper/rlc_common.h" #include #include @@ -89,7 +89,7 @@ private: mac_interface_timers *mac_timers; // TX SDU buffers - msg_queue tx_sdu_queue; + rlc_tx_queue tx_sdu_queue; byte_buffer_t *tx_sdu; // Rx window diff --git a/lib/test/common/msg_queue_test.cc b/lib/test/common/msg_queue_test.cc index 393931ab5..5a4e2cc23 100644 --- a/lib/test/common/msg_queue_test.cc +++ b/lib/test/common/msg_queue_test.cc @@ -27,12 +27,12 @@ #define NMSGS 1000000 #include -#include "srslte/common/msg_queue.h" +#include "srslte/upper/rlc_tx_queue.h" using namespace srslte; typedef struct { - msg_queue *q; + rlc_tx_queue *q; }args_t; void* write_thread(void *a) { @@ -49,27 +49,26 @@ void* write_thread(void *a) { int main(int argc, char **argv) { bool result; - msg_queue *q = new msg_queue; + rlc_tx_queue q; byte_buffer_t *b; pthread_t thread; args_t args; u_int32_t r; result = true; - args.q = q; + args.q = &q; pthread_create(&thread, NULL, &write_thread, &args); for(uint32_t i=0;iread(&b); + q.read(&b); memcpy(&r, b->msg, 4); delete b; if(r != i) result = false; } - delete q; pthread_join(thread, NULL); if(result) {