Increase/decrease nbytes counter inside mutexed queue

master
Ismael Gomez 7 years ago
parent 467ba4e326
commit bad007cdd9

@ -49,11 +49,23 @@ template<typename myobj>
class block_queue { class block_queue {
public: public:
// Callback functions for mutexed operations inside pop/push methods
class call_mutexed_itf {
public:
virtual void popping(myobj obj) = 0;
virtual void pushing(myobj obj) = 0;
};
block_queue<myobj>(int capacity = -1) { block_queue<myobj>(int capacity = -1) {
pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cv_empty, NULL); pthread_cond_init(&cv_empty, NULL);
pthread_cond_init(&cv_full, NULL); pthread_cond_init(&cv_full, NULL);
this->capacity = capacity; this->capacity = capacity;
mutexed_callback = NULL;
}
void set_mutexed_itf(call_mutexed_itf *itf) {
mutexed_callback = itf;
} }
void resize(int new_capacity) { void resize(int new_capacity) {
capacity = new_capacity; capacity = new_capacity;
@ -71,6 +83,9 @@ public:
} }
} }
q.push(value); q.push(value);
if (mutexed_callback) {
mutexed_callback->pushing(value);
}
pthread_cond_signal(&cv_empty); pthread_cond_signal(&cv_empty);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return true; return true;
@ -94,6 +109,9 @@ public:
*value = q.front(); *value = q.front();
q.pop(); q.pop();
} }
if (mutexed_callback) {
mutexed_callback->popping(*value);
}
pthread_cond_signal(&cv_full); pthread_cond_signal(&cv_full);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return true; return true;
@ -106,6 +124,9 @@ public:
} }
myobj value = q.front(); myobj value = q.front();
q.pop(); q.pop();
if (mutexed_callback) {
mutexed_callback->popping(value);
}
pthread_cond_signal(&cv_full); pthread_cond_signal(&cv_full);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return value; return value;
@ -136,6 +157,7 @@ private:
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cv_empty; pthread_cond_t cv_empty;
pthread_cond_t cv_full; pthread_cond_t cv_full;
call_mutexed_itf *mutexed_callback;
int capacity; int capacity;
}; };

@ -41,41 +41,38 @@
namespace srslte { namespace srslte {
class rlc_tx_queue class rlc_tx_queue : public block_queue<byte_buffer_t*>::call_mutexed_itf
{ {
public: public:
rlc_tx_queue(uint32_t capacity = 128) : queue((int) capacity) { rlc_tx_queue(uint32_t capacity = 128) : queue((int) capacity) {
unread_bytes = 0; unread_bytes = 0;
queue.set_mutexed_itf(this);
}
// increase/decrease unread_bytes inside push/pop mutexed operations
void pushing(byte_buffer_t *msg) {
unread_bytes += msg->N_bytes;
}
void popping(byte_buffer_t *msg) {
if (unread_bytes > msg->N_bytes) {
unread_bytes -= msg->N_bytes;
} else {
unread_bytes = 0;
}
} }
void write(byte_buffer_t *msg) void write(byte_buffer_t *msg)
{ {
queue.push(msg); queue.push(msg);
unread_bytes += msg->N_bytes;
} }
void read(byte_buffer_t **msg) void read(byte_buffer_t **msg)
{ {
byte_buffer_t *m = queue.wait_pop(); byte_buffer_t *m = queue.wait_pop();
*msg = m; *msg = m;
if (unread_bytes > (*msg)->N_bytes) {
unread_bytes -= (*msg)->N_bytes;
} else {
unread_bytes = 0;
}
} }
bool try_read(byte_buffer_t **msg) bool try_read(byte_buffer_t **msg)
{ {
if (queue.try_pop(msg)) { return queue.try_pop(msg);
if (unread_bytes > (*msg)->N_bytes) {
unread_bytes -= (*msg)->N_bytes;
} else {
unread_bytes = 0;
}
return true;
} else {
return false;
}
} }
uint32_t size() uint32_t size()

Loading…
Cancel
Save