diff --git a/lib/include/srslte/common/block_queue.h b/lib/include/srslte/common/block_queue.h index 1d17e9339..3df22b40d 100644 --- a/lib/include/srslte/common/block_queue.h +++ b/lib/include/srslte/common/block_queue.h @@ -49,11 +49,23 @@ template class block_queue { public: + + // Callback functions for mutexed operations inside pop/push methods + class call_mutexed_itf { + public: + virtual void popping(myobj obj) = 0; + virtual void pushing(myobj obj) = 0; + }; + block_queue(int capacity = -1) { pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cv_empty, NULL); pthread_cond_init(&cv_full, NULL); this->capacity = capacity; + mutexed_callback = NULL; + } + void set_mutexed_itf(call_mutexed_itf *itf) { + mutexed_callback = itf; } void resize(int new_capacity) { capacity = new_capacity; @@ -71,6 +83,9 @@ public: } } q.push(value); + if (mutexed_callback) { + mutexed_callback->pushing(value); + } pthread_cond_signal(&cv_empty); pthread_mutex_unlock(&mutex); return true; @@ -94,6 +109,9 @@ public: *value = q.front(); q.pop(); } + if (mutexed_callback) { + mutexed_callback->popping(*value); + } pthread_cond_signal(&cv_full); pthread_mutex_unlock(&mutex); return true; @@ -106,6 +124,9 @@ public: } myobj value = q.front(); q.pop(); + if (mutexed_callback) { + mutexed_callback->popping(value); + } pthread_cond_signal(&cv_full); pthread_mutex_unlock(&mutex); return value; @@ -136,6 +157,7 @@ private: pthread_mutex_t mutex; pthread_cond_t cv_empty; pthread_cond_t cv_full; + call_mutexed_itf *mutexed_callback; int capacity; }; diff --git a/lib/include/srslte/upper/rlc_tx_queue.h b/lib/include/srslte/upper/rlc_tx_queue.h index dee326ab6..3913670e8 100644 --- a/lib/include/srslte/upper/rlc_tx_queue.h +++ b/lib/include/srslte/upper/rlc_tx_queue.h @@ -41,41 +41,38 @@ namespace srslte { -class rlc_tx_queue +class rlc_tx_queue : public block_queue::call_mutexed_itf { public: rlc_tx_queue(uint32_t capacity = 128) : queue((int) capacity) { unread_bytes = 0; + queue.set_mutexed_itf(this); + } + // increase/decrease unread_bytes inside push/pop mutexed operations + void pushing(byte_buffer_t *msg) { + unread_bytes += msg->N_bytes; + } + void popping(byte_buffer_t *msg) { + if (unread_bytes > msg->N_bytes) { + unread_bytes -= msg->N_bytes; + } else { + unread_bytes = 0; + } } void write(byte_buffer_t *msg) { queue.push(msg); - unread_bytes += msg->N_bytes; } void read(byte_buffer_t **msg) { byte_buffer_t *m = queue.wait_pop(); *msg = m; - if (unread_bytes > (*msg)->N_bytes) { - unread_bytes -= (*msg)->N_bytes; - } else { - unread_bytes = 0; - } } bool try_read(byte_buffer_t **msg) { - if (queue.try_pop(msg)) { - if (unread_bytes > (*msg)->N_bytes) { - unread_bytes -= (*msg)->N_bytes; - } else { - unread_bytes = 0; - } - return true; - } else { - return false; - } + return queue.try_pop(msg); } uint32_t size()