changed block_queue api to return back the unique buffer in case it fails to push it to the queue

master
Francisco Paisana 6 years ago committed by Andre Puschmann
parent f4aa03154d
commit bc01a5ecda

@ -100,7 +100,7 @@ public:
return push_(value, false); return push_(value, false);
} }
bool try_push(myobj&& value) { return push_(std::move(value), false); } std::pair<bool, myobj> try_push(myobj&& value) { return push_(std::move(value), false); }
bool try_pop(myobj *value) { bool try_pop(myobj *value) {
return pop_(value, false); return pop_(value, false);
@ -163,13 +163,8 @@ private:
return ret; return ret;
} }
template <typename MyObj> // universal ref bool check_queue_space_unlocked(bool block)
bool push_(MyObj&& value, bool block)
{ {
if (!enable) {
return false;
}
pthread_mutex_lock(&mutex);
num_threads++; num_threads++;
bool ret = false; bool ret = false;
if (capacity > 0) { if (capacity > 0) {
@ -178,20 +173,48 @@ private:
pthread_cond_wait(&cv_full, &mutex); pthread_cond_wait(&cv_full, &mutex);
} }
if (!enable) { if (!enable) {
goto exit; return false;
} }
} else if (q.size() >= (uint32_t) capacity) { } else if (q.size() >= (uint32_t) capacity) {
goto exit; return false;
} }
} }
if (mutexed_callback) {
mutexed_callback->pushing(value);
}
q.push(std::forward<MyObj>(value));
ret = true;
pthread_cond_signal(&cv_empty);
exit:
num_threads--; num_threads--;
return true;
}
std::pair<bool, myobj> push_(myobj&& value, bool block)
{
if (!enable) {
return std::make_pair(false, std::move(value));
}
pthread_mutex_lock(&mutex);
bool ret = check_queue_space_unlocked(block);
if (ret) {
if (mutexed_callback) {
mutexed_callback->pushing(value);
}
q.push(std::move(value));
pthread_cond_signal(&cv_empty);
}
pthread_mutex_unlock(&mutex);
return std::make_pair(ret, std::move(value));
}
bool push_(const myobj& value, bool block)
{
if (!enable) {
return false;
}
pthread_mutex_lock(&mutex);
bool ret = check_queue_space_unlocked(block);
if (ret) {
if (mutexed_callback) {
mutexed_callback->pushing(value);
}
q.push(value);
pthread_cond_signal(&cv_empty);
}
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return ret; return ret;
} }

@ -55,7 +55,7 @@ public:
} }
void write(unique_byte_buffer msg) { queue.push(std::move(msg)); } void write(unique_byte_buffer msg) { queue.push(std::move(msg)); }
bool try_write(unique_byte_buffer msg) { return queue.try_push(std::move(msg)); } std::pair<bool, unique_byte_buffer> try_write(unique_byte_buffer&& msg) { return queue.try_push(std::move(msg)); }
unique_byte_buffer read() { return queue.wait_pop(); } unique_byte_buffer read() { return queue.wait_pop(); }

@ -367,14 +367,19 @@ void rlc_am::rlc_am_tx::write_sdu(unique_byte_buffer sdu, bool blocking)
// non-blocking write // non-blocking write
uint8_t* msg_ptr = sdu->msg; uint8_t* msg_ptr = sdu->msg;
uint32_t nof_bytes = sdu->N_bytes; uint32_t nof_bytes = sdu->N_bytes;
if (tx_sdu_queue.try_write(std::move(sdu))) { std::pair<bool, unique_byte_buffer> ret = tx_sdu_queue.try_write(std::move(sdu));
if (ret.first) {
log->info_hex( log->info_hex(
msg_ptr, nof_bytes, "%s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, nof_bytes, tx_sdu_queue.size()); msg_ptr, nof_bytes, "%s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, nof_bytes, tx_sdu_queue.size());
} else { } else {
#warning Find a more elegant solution - the msg was already deallocated at this point // in case of fail, the try_write returns back the sdu
log->info("[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, nof_bytes, tx_sdu_queue.size()); log->info("[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, nof_bytes, tx_sdu_queue.size());
// log->info_hex(msg_ptr, nof_bytes, "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, log->info_hex(ret.second->msg,
// nof_bytes, tx_sdu_queue.size()); ret.second->N_bytes,
"[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)",
RB_NAME,
ret.second->N_bytes,
tx_sdu_queue.size());
} }
} }
} else { } else {

@ -104,7 +104,8 @@ void rlc_tm::write_sdu(unique_byte_buffer sdu, bool blocking)
} else { } else {
uint8_t* msg_ptr = sdu->msg; uint8_t* msg_ptr = sdu->msg;
uint32_t nof_bytes = sdu->N_bytes; uint32_t nof_bytes = sdu->N_bytes;
if (ul_queue.try_write(std::move(sdu))) { std::pair<bool, unique_byte_buffer> ret = ul_queue.try_write(std::move(sdu));
if (ret.first) {
log->info_hex(msg_ptr, log->info_hex(msg_ptr,
nof_bytes, nof_bytes,
"%s Tx SDU, queue size=%d, bytes=%d", "%s Tx SDU, queue size=%d, bytes=%d",
@ -112,13 +113,12 @@ void rlc_tm::write_sdu(unique_byte_buffer sdu, bool blocking)
ul_queue.size(), ul_queue.size(),
ul_queue.size_bytes()); ul_queue.size_bytes());
} else { } else {
#warning Find a more elegant solution - the msg was already deallocated at this point log->info_hex(ret.second->msg,
log->info("[Dropped SDU] %s Tx SDU, queue size=%d, bytes=%d", ret.second->N_bytes,
rrc->get_rb_name(lcid).c_str(), "[Dropped SDU] %s Tx SDU, queue size=%d, bytes=%d",
ul_queue.size(), rrc->get_rb_name(lcid).c_str(),
ul_queue.size()); ul_queue.size(),
// log->info_hex(sdu->msg, sdu->N_bytes, "[Dropped SDU] %s Tx SDU, queue size=%d, bytes=%d", ul_queue.size_bytes());
// rrc->get_rb_name(lcid).c_str(), ul_queue.size(), ul_queue.size_bytes());
} }
} }
} else { } else {

@ -347,14 +347,17 @@ void rlc_um::rlc_um_tx::try_write_sdu(unique_byte_buffer sdu)
if (sdu) { if (sdu) {
uint8_t* msg_ptr = sdu->msg; uint8_t* msg_ptr = sdu->msg;
uint32_t nof_bytes = sdu->N_bytes; uint32_t nof_bytes = sdu->N_bytes;
if (tx_sdu_queue.try_write(std::move(sdu))) { std::pair<bool, unique_byte_buffer> ret = tx_sdu_queue.try_write(std::move(sdu));
if (ret.first) {
log->info_hex( log->info_hex(
msg_ptr, nof_bytes, "%s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), nof_bytes, tx_sdu_queue.size()); msg_ptr, nof_bytes, "%s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), nof_bytes, tx_sdu_queue.size());
} else { } else {
#warning Find a more elegant solution - the msg was already deallocated at this point log->info_hex(ret.second->msg,
log->info("[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), nof_bytes, tx_sdu_queue.size()); ret.second->N_bytes,
// log->info_hex(msg_ptr, nof_bytes, "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)",
// nof_bytes, tx_sdu_queue.size()); get_rb_name(),
ret.second->N_bytes,
tx_sdu_queue.size());
} }
} else { } else {
log->warning("NULL SDU pointer in write_sdu()\n"); log->warning("NULL SDU pointer in write_sdu()\n");

Loading…
Cancel
Save