merged PR#189 and added check for non-empty queue in get size tail

master
Ismael Gomez 7 years ago
parent cd367617ec
commit 040c33497d

@ -24,6 +24,15 @@
* *
*/ */
/******************************************************************************
* File: block_queue.h
* Description: General-purpose blocking queue. It can behave as a bounded or
* unbounded blocking queue and allows blocking and non-blocking
* operations in both push and pop
*****************************************************************************/
#ifndef SRSLTE_BLOCK_QUEUE_H #ifndef SRSLTE_BLOCK_QUEUE_H
#define SRSLTE_BLOCK_QUEUE_H #define SRSLTE_BLOCK_QUEUE_H
@ -32,21 +41,47 @@
#include <utility> #include <utility>
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <stdint.h>
namespace srslte { namespace srslte {
template<typename myobj> template<typename myobj>
class block_queue { class block_queue {
public: public:
block_queue<myobj>() { block_queue<myobj>(int capacity = -1) {
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cvar, NULL); pthread_cond_init(&cv_empty, NULL);
pthread_cond_init(&cv_full, NULL);
this->capacity = capacity;
} }
void push(const myobj& value) { void resize(int new_capacity) {
capacity = new_capacity;
}
bool push_(const myobj& value, bool block) {
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
if (capacity > 0) {
if (block) {
while(q.size() > (uint32_t) capacity) {
pthread_cond_wait(&cv_full, &mutex);
}
} else {
pthread_mutex_unlock(&mutex);
return false;
}
}
q.push(value); q.push(value);
pthread_cond_signal(&cvar); pthread_cond_signal(&cv_empty);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return true;
}
void push(const myobj& value) {
push_(value, true);
}
bool try_push(const myobj& value) {
return push_(value, false);
} }
bool try_pop(myobj *value) { bool try_pop(myobj *value) {
@ -59,6 +94,7 @@ public:
*value = q.front(); *value = q.front();
q.pop(); q.pop();
} }
pthread_cond_signal(&cv_full);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return true; return true;
} }
@ -66,15 +102,16 @@ public:
myobj wait_pop() { // blocking pop myobj wait_pop() { // blocking pop
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
while(q.empty()) { while(q.empty()) {
pthread_cond_wait(&cvar, &mutex); pthread_cond_wait(&cv_empty, &mutex);
} }
myobj value = q.front(); myobj value = q.front();
q.pop(); q.pop();
pthread_cond_signal(&cv_full);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return value; return value;
} }
bool empty() const { // queue is empty? bool empty() { // queue is empty?
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
bool ret = q.empty(); bool ret = q.empty();
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
@ -86,6 +123,10 @@ public:
while (try_pop(item)); while (try_pop(item));
} }
myobj front() {
return q.front();
}
size_t size() { size_t size() {
return q.size(); return q.size();
} }
@ -93,7 +134,9 @@ public:
private: private:
std::queue<myobj> q; std::queue<myobj> q;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cvar; pthread_cond_t cv_empty;
pthread_cond_t cv_full;
int capacity;
}; };
} }

@ -1,168 +0,0 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2015 Software Radio Systems Limited
*
* \section LICENSE
*
* This file is part of the srsUE library.
*
* srsUE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsUE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
/******************************************************************************
* File: msg_queue.h
* Description: Thread-safe bounded circular buffer of srsue_byte_buffer pointers.
* Reference:
*****************************************************************************/
#ifndef SRSLTE_MSG_QUEUE_H
#define SRSLTE_MSG_QUEUE_H
#include "srslte/common/common.h"
#include <pthread.h>
namespace srslte {
class msg_queue
{
public:
msg_queue(uint32_t capacity_ = 128)
:head(0)
,tail(0)
,unread(0)
,unread_bytes(0)
,capacity(capacity_)
{
buf = new byte_buffer_t*[capacity];
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&not_empty, NULL);
pthread_cond_init(&not_full, NULL);
}
~msg_queue()
{
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&not_empty);
pthread_cond_destroy(&not_full);
delete [] buf;
}
void write(byte_buffer_t *msg)
{
pthread_mutex_lock(&mutex);
while(is_full()) {
pthread_cond_wait(&not_full, &mutex);
}
buf[head] = msg;
head = (head+1)%capacity;
unread++;
unread_bytes += msg->N_bytes;
pthread_cond_signal(&not_empty);
pthread_mutex_unlock(&mutex);
}
void read(byte_buffer_t **msg)
{
pthread_mutex_lock(&mutex);
while(is_empty()) {
pthread_cond_wait(&not_empty, &mutex);
}
*msg = buf[tail];
tail = (tail+1)%capacity;
unread--;
/* FIXME: When byte_buffer_t is reset() but is in the queue, it gets corrupted
* because N_bytes becomes 0 and queue is empty but unread_bytes > 0 */
unread_bytes -= (*msg)->N_bytes;
pthread_cond_signal(&not_full);
pthread_mutex_unlock(&mutex);
}
bool try_read(byte_buffer_t **msg)
{
pthread_mutex_lock(&mutex);
if(is_empty())
{
pthread_mutex_unlock(&mutex);
return false;
}else{
*msg = buf[tail];
tail = (tail+1)%capacity;
unread--;
unread_bytes -= (*msg)->N_bytes;
pthread_cond_signal(&not_full);
pthread_mutex_unlock(&mutex);
return true;
}
}
uint32_t size()
{
pthread_mutex_lock(&mutex);
uint32_t r = unread;
pthread_mutex_unlock(&mutex);
return r;
}
uint32_t size_bytes()
{
pthread_mutex_lock(&mutex);
uint32_t r = unread_bytes;
pthread_mutex_unlock(&mutex);
return r;
}
uint32_t size_tail_bytes()
{
pthread_mutex_lock(&mutex);
uint32_t r = 0;
if (buf[tail]) {
r = buf[tail]->N_bytes;
}
pthread_mutex_unlock(&mutex);
return r;
}
// This is a hack to reset N_bytes counter when queue is corrupted (see line 89)
void reset() {
pthread_mutex_lock(&mutex);
unread_bytes = 0;
unread = 0;
pthread_mutex_unlock(&mutex);
}
private:
bool is_empty() const { return unread == 0; }
bool is_full() const { return unread == capacity; }
pthread_cond_t not_empty;
pthread_cond_t not_full;
pthread_mutex_t mutex;
byte_buffer_t **buf;
uint32_t capacity;
uint32_t unread;
uint32_t unread_bytes;
uint32_t head;
uint32_t tail;
};
} // namespace srslte
#endif // SRSLTE_MSG_QUEUE_H

@ -31,7 +31,7 @@
#include "srslte/common/log.h" #include "srslte/common/log.h"
#include "srslte/common/common.h" #include "srslte/common/common.h"
#include "srslte/interfaces/ue_interfaces.h" #include "srslte/interfaces/ue_interfaces.h"
#include "srslte/common/msg_queue.h" #include "srslte/upper/rlc_tx_queue.h"
#include "srslte/common/timeout.h" #include "srslte/common/timeout.h"
#include "srslte/upper/rlc_common.h" #include "srslte/upper/rlc_common.h"
#include <map> #include <map>
@ -104,7 +104,7 @@ private:
srsue::rrc_interface_rlc *rrc; srsue::rrc_interface_rlc *rrc;
// TX SDU buffers // TX SDU buffers
msg_queue tx_sdu_queue; rlc_tx_queue tx_sdu_queue;
byte_buffer_t *tx_sdu; byte_buffer_t *tx_sdu;
// PDU being resegmented // PDU being resegmented

@ -31,7 +31,7 @@
#include "srslte/common/log.h" #include "srslte/common/log.h"
#include "srslte/common/common.h" #include "srslte/common/common.h"
#include "srslte/interfaces/ue_interfaces.h" #include "srslte/interfaces/ue_interfaces.h"
#include "srslte/common/msg_queue.h" #include "srslte/upper/rlc_tx_queue.h"
#include "srslte/upper/rlc_common.h" #include "srslte/upper/rlc_common.h"
namespace srslte { namespace srslte {
@ -72,7 +72,7 @@ private:
srsue::rrc_interface_rlc *rrc; srsue::rrc_interface_rlc *rrc;
// Thread-safe queues for MAC messages // Thread-safe queues for MAC messages
msg_queue ul_queue; rlc_tx_queue ul_queue;
}; };
} // namespace srsue } // namespace srsue

@ -0,0 +1,117 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2015 Software Radio Systems Limited
*
* \section LICENSE
*
* This file is part of the srsUE library.
*
* srsUE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsUE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
/******************************************************************************
* File: rlc_tx_queue.h
* Description: Queue used in RLC TM/UM/AM TX queues.
* Uses a blocking queue with bounded capacity to block higher layers
* when pushing Uplink traffic
* Reference:
*****************************************************************************/
#ifndef SRSLTE_MSG_QUEUE_H
#define SRSLTE_MSG_QUEUE_H
#include "srslte/common/block_queue.h"
#include "srslte/common/common.h"
#include <pthread.h>
namespace srslte {
class rlc_tx_queue
{
public:
rlc_tx_queue(uint32_t capacity = 128) : queue((int) capacity) {
unread_bytes = 0;
}
void write(byte_buffer_t *msg)
{
queue.push(msg);
unread_bytes += msg->N_bytes;
}
void read(byte_buffer_t **msg)
{
byte_buffer_t *m = queue.wait_pop();
*msg = m;
if (unread_bytes > (*msg)->N_bytes) {
unread_bytes -= (*msg)->N_bytes;
} else {
unread_bytes = 0;
}
}
bool try_read(byte_buffer_t **msg)
{
if (queue.try_pop(msg)) {
if (unread_bytes > (*msg)->N_bytes) {
unread_bytes -= (*msg)->N_bytes;
} else {
unread_bytes = 0;
}
return true;
} else {
return false;
}
}
uint32_t size()
{
return (uint32_t) queue.size();
}
uint32_t size_bytes()
{
return unread_bytes;
}
uint32_t size_tail_bytes()
{
if (!queue.empty()) {
byte_buffer_t *m = queue.front();
if (m) {
return m->N_bytes;
}
}
return 0;
}
// This is a hack to reset N_bytes counter when queue is corrupted (see line 89)
void reset() {
unread_bytes = 0;
}
private:
bool is_empty() { return queue.empty(); }
block_queue<byte_buffer_t*> queue;
uint32_t unread_bytes;
};
} // namespace srslte
#endif // SRSLTE_MSG_QUEUE_H

@ -31,7 +31,7 @@
#include "srslte/common/log.h" #include "srslte/common/log.h"
#include "srslte/common/common.h" #include "srslte/common/common.h"
#include "srslte/interfaces/ue_interfaces.h" #include "srslte/interfaces/ue_interfaces.h"
#include "srslte/common/msg_queue.h" #include "srslte/upper/rlc_tx_queue.h"
#include "srslte/upper/rlc_common.h" #include "srslte/upper/rlc_common.h"
#include <pthread.h> #include <pthread.h>
#include <map> #include <map>
@ -89,7 +89,7 @@ private:
mac_interface_timers *mac_timers; mac_interface_timers *mac_timers;
// TX SDU buffers // TX SDU buffers
msg_queue tx_sdu_queue; rlc_tx_queue tx_sdu_queue;
byte_buffer_t *tx_sdu; byte_buffer_t *tx_sdu;
// Rx window // Rx window

@ -27,12 +27,12 @@
#define NMSGS 1000000 #define NMSGS 1000000
#include <stdio.h> #include <stdio.h>
#include "srslte/common/msg_queue.h" #include "srslte/upper/rlc_tx_queue.h"
using namespace srslte; using namespace srslte;
typedef struct { typedef struct {
msg_queue *q; rlc_tx_queue *q;
}args_t; }args_t;
void* write_thread(void *a) { void* write_thread(void *a) {
@ -49,27 +49,26 @@ void* write_thread(void *a) {
int main(int argc, char **argv) { int main(int argc, char **argv) {
bool result; bool result;
msg_queue *q = new msg_queue; rlc_tx_queue q;
byte_buffer_t *b; byte_buffer_t *b;
pthread_t thread; pthread_t thread;
args_t args; args_t args;
u_int32_t r; u_int32_t r;
result = true; result = true;
args.q = q; args.q = &q;
pthread_create(&thread, NULL, &write_thread, &args); pthread_create(&thread, NULL, &write_thread, &args);
for(uint32_t i=0;i<NMSGS;i++) for(uint32_t i=0;i<NMSGS;i++)
{ {
q->read(&b); q.read(&b);
memcpy(&r, b->msg, 4); memcpy(&r, b->msg, 4);
delete b; delete b;
if(r != i) if(r != i)
result = false; result = false;
} }
delete q;
pthread_join(thread, NULL); pthread_join(thread, NULL);
if(result) { if(result) {

Loading…
Cancel
Save