Merge pull request #201 from softwareradiosystems/mutex_rlc_tx_queue

Increase/decrease nbytes counter inside mutexed queue
master
Andre Puschmann 7 years ago committed by GitHub
commit 8beb3cab10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -49,11 +49,23 @@ template<typename myobj>
class block_queue {
public:
// Callback functions for mutexed operations inside pop/push methods
class call_mutexed_itf {
public:
virtual void popping(myobj obj) = 0;
virtual void pushing(myobj obj) = 0;
};
block_queue<myobj>(int capacity = -1) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cv_empty, NULL);
pthread_cond_init(&cv_full, NULL);
this->capacity = capacity;
mutexed_callback = NULL;
}
void set_mutexed_itf(call_mutexed_itf *itf) {
mutexed_callback = itf;
}
void resize(int new_capacity) {
capacity = new_capacity;
@ -71,6 +83,9 @@ public:
}
}
q.push(value);
if (mutexed_callback) {
mutexed_callback->pushing(value);
}
pthread_cond_signal(&cv_empty);
pthread_mutex_unlock(&mutex);
return true;
@ -94,6 +109,9 @@ public:
*value = q.front();
q.pop();
}
if (mutexed_callback) {
mutexed_callback->popping(*value);
}
pthread_cond_signal(&cv_full);
pthread_mutex_unlock(&mutex);
return true;
@ -106,6 +124,9 @@ public:
}
myobj value = q.front();
q.pop();
if (mutexed_callback) {
mutexed_callback->popping(value);
}
pthread_cond_signal(&cv_full);
pthread_mutex_unlock(&mutex);
return value;
@ -136,6 +157,7 @@ private:
pthread_mutex_t mutex;
pthread_cond_t cv_empty;
pthread_cond_t cv_full;
call_mutexed_itf *mutexed_callback;
int capacity;
};

@ -41,41 +41,38 @@
namespace srslte {
class rlc_tx_queue
class rlc_tx_queue : public block_queue<byte_buffer_t*>::call_mutexed_itf
{
public:
rlc_tx_queue(uint32_t capacity = 128) : queue((int) capacity) {
unread_bytes = 0;
queue.set_mutexed_itf(this);
}
// increase/decrease unread_bytes inside push/pop mutexed operations
void pushing(byte_buffer_t *msg) {
unread_bytes += msg->N_bytes;
}
void popping(byte_buffer_t *msg) {
if (unread_bytes > msg->N_bytes) {
unread_bytes -= msg->N_bytes;
} else {
unread_bytes = 0;
}
}
void write(byte_buffer_t *msg)
{
queue.push(msg);
unread_bytes += msg->N_bytes;
}
void read(byte_buffer_t **msg)
{
byte_buffer_t *m = queue.wait_pop();
*msg = m;
if (unread_bytes > (*msg)->N_bytes) {
unread_bytes -= (*msg)->N_bytes;
} else {
unread_bytes = 0;
}
}
bool try_read(byte_buffer_t **msg)
{
if (queue.try_pop(msg)) {
if (unread_bytes > (*msg)->N_bytes) {
unread_bytes -= (*msg)->N_bytes;
} else {
unread_bytes = 0;
}
return true;
} else {
return false;
}
return queue.try_pop(msg);
}
uint32_t size()

Loading…
Cancel
Save