diff --git a/lib/include/srslte/common/buffer_pool.h b/lib/include/srslte/common/buffer_pool.h index 0a87c0df1..283501ff4 100644 --- a/lib/include/srslte/common/buffer_pool.h +++ b/lib/include/srslte/common/buffer_pool.h @@ -63,6 +63,7 @@ public: nof_buffers = (uint32_t) capacity_; } pthread_mutex_init(&mutex, NULL); + pthread_cond_init(&cv_not_empty, NULL); for(uint32_t i=0;i 0) - { + if (available.size() > 0) { b = available.top(); used.push_back(b); available.pop(); - + if (is_almost_empty()) { - printf("Warning buffer pool capacity is %f %%\n", (float) 100*available.size()/capacity); + printf("Warning buffer pool capacity is %f %%\n", (float) 100 * available.size() / capacity); } #ifdef SRSLTE_BUFFER_POOL_LOG_ENABLED - if (debug_name) { - strncpy(b->debug_name, debug_name, SRSLTE_BUFFER_POOL_LOG_NAME_LEN); - b->debug_name[SRSLTE_BUFFER_POOL_LOG_NAME_LEN-1] = 0; - } + if (debug_name) { + strncpy(b->debug_name, debug_name, SRSLTE_BUFFER_POOL_LOG_NAME_LEN); + b->debug_name[SRSLTE_BUFFER_POOL_LOG_NAME_LEN - 1] = 0; + } #endif - - } else { + } else if (blocking) { + // blocking allocation + while(available.size() == 0) { + pthread_cond_wait(&cv_not_empty, &mutex); + } + + // retrieve the new buffer + b = available.top(); + used.push_back(b); + available.pop(); + + // do not print any warning + } + else { printf("Error - buffer pool is empty\n"); #ifdef SRSLTE_BUFFER_POOL_LOG_ENABLED @@ -148,6 +161,7 @@ public: available.push(b); ret = true; } + pthread_cond_signal(&cv_not_empty); pthread_mutex_unlock(&mutex); return ret; } @@ -157,7 +171,8 @@ private: static const int POOL_SIZE = 2048; std::stack available; std::vector used; - pthread_mutex_t mutex; + pthread_mutex_t mutex; + pthread_cond_t cv_not_empty; uint32_t capacity; }; @@ -174,8 +189,8 @@ public: ~byte_buffer_pool() { delete pool; } - byte_buffer_t* allocate(const char *debug_name = NULL) { - return pool->allocate(debug_name); + byte_buffer_t* allocate(const char *debug_name = NULL, bool blocking = false) { + return pool->allocate(debug_name, blocking); } void deallocate(byte_buffer_t *b) { if(!b) { diff --git a/lib/include/srslte/common/common.h b/lib/include/srslte/common/common.h index 3313d0eb4..ed5c9acc1 100644 --- a/lib/include/srslte/common/common.h +++ b/lib/include/srslte/common/common.h @@ -69,6 +69,7 @@ #ifdef SRSLTE_BUFFER_POOL_LOG_ENABLED #define pool_allocate (pool->allocate(__PRETTY_FUNCTION__)) +#define pool_allocate_blocking (pool->allocate(__PRETTY_FUNCTION__, true)) #define SRSLTE_BUFFER_POOL_LOG_NAME_LEN 128 #else #define pool_allocate (pool->allocate()) diff --git a/lib/include/srslte/upper/rlc_um.h b/lib/include/srslte/upper/rlc_um.h index f099f60cf..aae8877bb 100644 --- a/lib/include/srslte/upper/rlc_um.h +++ b/lib/include/srslte/upper/rlc_um.h @@ -45,8 +45,7 @@ struct rlc_umd_pdu_t{ }; class rlc_um - :public timer_callback - ,public rlc_common + :public rlc_common { public: rlc_um(uint32_t queue_len = 32); @@ -59,7 +58,7 @@ public: void configure(srslte_rlc_config_t cnfg); void reestablish(); void stop(); - void empty_queue(); + void empty_queue(); bool is_mrb(); rlc_mode_t get_mode(); @@ -75,72 +74,133 @@ public: int read_pdu(uint8_t *payload, uint32_t nof_bytes); void write_pdu(uint8_t *payload, uint32_t nof_bytes); int get_increment_sequence_num(); - // Timeout callback interface - void timer_expired(uint32_t timeout_id); - - bool reordering_timeout_running(); private: - byte_buffer_pool *pool; - srslte::log *log; - uint32_t lcid; - srsue::pdcp_interface_rlc *pdcp; - srsue::rrc_interface_rlc *rrc; - mac_interface_timers *mac_timers; - - // TX SDU buffers - rlc_tx_queue tx_sdu_queue; - byte_buffer_t *tx_sdu; - byte_buffer_t tx_sdu_temp; - - // Rx window - std::map rx_window; - - // RX SDU buffers - byte_buffer_t *rx_sdu; - uint32_t vr_ur_in_rx_sdu; - - // Mutexes - pthread_mutex_t mutex; - - /**************************************************************************** - * Configurable parameters - * Ref: 3GPP TS 36.322 v10.0.0 Section 7 - ***************************************************************************/ - - srslte_rlc_um_config_t cfg; - - /**************************************************************************** - * State variables and counters - * Ref: 3GPP TS 36.322 v10.0.0 Section 7 - ***************************************************************************/ - - // Tx state variables - uint32_t vt_us; // Send state. SN to be assigned for next PDU. - - // Rx state variables - uint32_t vr_ur; // Receive state. SN of earliest PDU still considered for reordering. - uint32_t vr_ux; // t_reordering state. SN following PDU which triggered t_reordering. - uint32_t vr_uh; // Highest rx state. SN following PDU with highest SN among rxed PDUs. - - /**************************************************************************** - * Timers - * Ref: 3GPP TS 36.322 v10.0.0 Section 7 - ***************************************************************************/ - srslte::timers::timer *reordering_timer; - uint32_t reordering_timer_id; - - bool tx_enabled; - bool pdu_lost; - - int build_data_pdu(uint8_t *payload, uint32_t nof_bytes); - void handle_data_pdu(uint8_t *payload, uint32_t nof_bytes); - void reassemble_rx_sdus(); - bool inside_reordering_window(uint16_t sn); - void debug_state(); - - std::string rb_name(); + // Transmitter sub-class + class rlc_um_tx + { + public: + rlc_um_tx(uint32_t queue_len); + ~rlc_um_tx(); + void init(srslte::log *log_); + void configure(srslte_rlc_config_t cfg, std::string rb_name); + int build_data_pdu(uint8_t *payload, uint32_t nof_bytes); + void stop(); + void reestablish(); + void empty_queue(); + void write_sdu(byte_buffer_t *sdu); + void try_write_sdu(byte_buffer_t *sdu); + uint32_t get_buffer_size_bytes(); + + private: + byte_buffer_pool *pool; + srslte::log *log; + std::string rb_name; + + /**************************************************************************** + * Configurable parameters + * Ref: 3GPP TS 36.322 v10.0.0 Section 7 + ***************************************************************************/ + srslte_rlc_um_config_t cfg; + + // TX SDU buffers + rlc_tx_queue tx_sdu_queue; + byte_buffer_t *tx_sdu; + + /**************************************************************************** + * State variables and counters + * Ref: 3GPP TS 36.322 v10.0.0 Section 7 + ***************************************************************************/ + uint32_t vt_us; // Send state. SN to be assigned for next PDU. + + // Mutexes + pthread_mutex_t mutex; + + bool tx_enabled; + + // helper functions + void debug_state(); + const char* get_rb_name(); + }; + + // Receiver sub-class + class rlc_um_rx : public timer_callback { + public: + rlc_um_rx(); + ~rlc_um_rx(); + void init(srslte::log *log_, + uint32_t lcid_, + srsue::pdcp_interface_rlc *pdcp_, + srsue::rrc_interface_rlc *rrc_, + srslte::mac_interface_timers *mac_timers_); + void stop(); + void reestablish(); + void configure(srslte_rlc_config_t cfg, std::string rb_name); + void handle_data_pdu(uint8_t *payload, uint32_t nof_bytes); + void reassemble_rx_sdus(); + bool inside_reordering_window(uint16_t sn); + + // Timeout callback interface + void timer_expired(uint32_t timeout_id); + + private: + byte_buffer_pool *pool; + srslte::log *log; + mac_interface_timers *mac_timers; + std::string rb_name; + + /**************************************************************************** + * Configurable parameters + * Ref: 3GPP TS 36.322 v10.0.0 Section 7 + ***************************************************************************/ + srslte_rlc_um_config_t cfg; + + // Rx window + std::map rx_window; + + // RX SDU buffers + byte_buffer_t *rx_sdu; + uint32_t vr_ur_in_rx_sdu; + + // Rx state variables + uint32_t vr_ur; // Receive state. SN of earliest PDU still considered for reordering. + uint32_t vr_ux; // t_reordering state. SN following PDU which triggered t_reordering. + uint32_t vr_uh; // Highest rx state. SN following PDU with highest SN among rxed PDUs. + bool pdu_lost; + + // Upper layer handles and variables + srsue::pdcp_interface_rlc *pdcp; + srsue::rrc_interface_rlc *rrc; + uint32_t lcid; + + // Mutexes + pthread_mutex_t mutex; + + /**************************************************************************** + * Timers + * Ref: 3GPP TS 36.322 v10.0.0 Section 7 + ***************************************************************************/ + srslte::timers::timer *reordering_timer; + uint32_t reordering_timer_id; + + // helper functions + void debug_state(); + bool reordering_timeout_running(); + const char* get_rb_name(); + }; + + // Rx and Tx objects + rlc_um_tx tx; + rlc_um_rx rx; + + // Common variables needed by parent class + srsue::rrc_interface_rlc *rrc; + uint32_t lcid; + srslte_rlc_um_config_t cfg; + std::string rb_name; + + std::string get_rb_name(srsue::rrc_interface_rlc *rrc, uint32_t lcid, bool is_mrb); }; /**************************************************************************** diff --git a/lib/src/upper/rlc_um.cc b/lib/src/upper/rlc_um.cc index 77c40aa36..f676bdfc8 100644 --- a/lib/src/upper/rlc_um.cc +++ b/lib/src/upper/rlc_um.cc @@ -33,44 +33,27 @@ namespace srslte { -rlc_um::rlc_um(uint32_t queue_len) : tx_sdu_queue(queue_len) -{ - log = NULL; - pdcp = NULL; - rrc = NULL; - reordering_timer = NULL; - lcid = 0; - reordering_timer_id = 0; +rlc_um::rlc_um(uint32_t queue_len) + :lcid(0) + ,tx(queue_len) + ,rrc(NULL) +{ bzero(&cfg, sizeof(srslte_rlc_um_config_t)); - - tx_sdu = NULL; - - rx_sdu = NULL; - pool = byte_buffer_pool::get_instance(); - - pthread_mutex_init(&mutex, NULL); - - vt_us = 0; - vr_ur = 0; - vr_ux = 0; - vr_uh = 0; - - vr_ur_in_rx_sdu = 0; - - mac_timers = NULL; - - pdu_lost = false; } // Warning: must call stop() to properly deallocate all buffers rlc_um::~rlc_um() { +<<<<<<< HEAD pthread_mutex_destroy(&mutex); pool = NULL; if (mac_timers && reordering_timer) { mac_timers->timer_release_id(reordering_timer_id); reordering_timer = NULL; } +======= + stop(); +>>>>>>> 1f7e9187906c824b2092c71374291042e571d631 } void rlc_um::init(srslte::log *log_, @@ -79,102 +62,81 @@ void rlc_um::init(srslte::log *log_, srsue::rrc_interface_rlc *rrc_, srslte::mac_interface_timers *mac_timers_) { - log = log_; - lcid = lcid_; - pdcp = pdcp_; - rrc = rrc_; - mac_timers = mac_timers_; - reordering_timer_id = mac_timers->timer_get_unique_id(); - reordering_timer = mac_timers->timer_get(reordering_timer_id); - tx_enabled = true; + tx.init(log_); + rx.init(log_, lcid_, pdcp_, rrc_, mac_timers_); + lcid = lcid_; + rrc = rrc_; // needed to determine bearer name during configuration } + void rlc_um::configure(srslte_rlc_config_t cnfg_) { + // determine bearer name and configure Rx/Tx objects + rb_name = get_rb_name(rrc, lcid, cnfg_.um.is_mrb); + + rx.configure(cnfg_, rb_name); + tx.configure(cnfg_, rb_name); + + // store config cfg = cnfg_.um; - if(cnfg_.um.is_mrb){ - tx_sdu_queue.resize(512); - } - switch(cnfg_.rlc_mode) - { - case LIBLTE_RRC_RLC_MODE_UM_BI: - log->warning("%s configured in %s mode: " - "t_reordering=%d ms, rx_sn_field_length=%u bits, tx_sn_field_length=%u bits\n", - rb_name().c_str(), liblte_rrc_rlc_mode_text[cnfg_.rlc_mode], - cfg.t_reordering, rlc_umd_sn_size_num[cfg.rx_sn_field_length], rlc_umd_sn_size_num[cfg.rx_sn_field_length]); - break; - case LIBLTE_RRC_RLC_MODE_UM_UNI_UL: - log->warning("%s configured in %s mode: tx_sn_field_length=%u bits\n", - rrc->get_rb_name(lcid).c_str(), liblte_rrc_rlc_mode_text[cnfg_.rlc_mode], - - rlc_umd_sn_size_num[cfg.rx_sn_field_length]); - break; - case LIBLTE_RRC_RLC_MODE_UM_UNI_DL: - log->warning("%s configured in %s mode: " - "t_reordering=%d ms, rx_sn_field_length=%u bits\n", - rb_name().c_str(), liblte_rrc_rlc_mode_text[cnfg_.rlc_mode], - cfg.t_reordering, rlc_umd_sn_size_num[cfg.rx_sn_field_length]); - break; - default: - log->error("RLC configuration mode not recognized\n"); +} + + +void rlc_um::rlc_um_rx::configure(srslte_rlc_config_t cnfg_, std::string rb_name_) +{ + cfg = cnfg_.um; + rb_name = rb_name_; + switch(cnfg_.rlc_mode) { + case LIBLTE_RRC_RLC_MODE_UM_BI: + log->warning("%s configured in %s mode: " + "t_reordering=%d ms, rx_sn_field_length=%u bits, tx_sn_field_length=%u bits\n", + get_rb_name(), liblte_rrc_rlc_mode_text[cnfg_.rlc_mode], + cfg.t_reordering, rlc_umd_sn_size_num[cfg.rx_sn_field_length], rlc_umd_sn_size_num[cfg.rx_sn_field_length]); + break; + case LIBLTE_RRC_RLC_MODE_UM_UNI_UL: + log->warning("%s configured in %s mode: tx_sn_field_length=%u bits\n", + get_rb_name(), liblte_rrc_rlc_mode_text[cnfg_.rlc_mode], + + rlc_umd_sn_size_num[cfg.rx_sn_field_length]); + break; + case LIBLTE_RRC_RLC_MODE_UM_UNI_DL: + log->warning("%s configured in %s mode: " + "t_reordering=%d ms, rx_sn_field_length=%u bits\n", + get_rb_name(), liblte_rrc_rlc_mode_text[cnfg_.rlc_mode], + cfg.t_reordering, rlc_umd_sn_size_num[cfg.rx_sn_field_length]); + break; + default: + log->error("RLC configuration mode not recognized\n"); } } + void rlc_um::empty_queue() { // Drop all messages in TX SDU queue - byte_buffer_t *buf; - while(tx_sdu_queue.try_read(&buf)) { - pool->deallocate(buf); - } - tx_sdu_queue.reset(); + tx.empty_queue(); } + bool rlc_um::is_mrb() { return cfg.is_mrb; } -void rlc_um::reestablish() { - stop(); - tx_enabled = true; -} -void rlc_um::stop() +void rlc_um::reestablish() { - // Empty tx_sdu_queue before locking the mutex - tx_enabled = false; - empty_queue(); - - pthread_mutex_lock(&mutex); - vt_us = 0; - vr_ur = 0; - vr_ux = 0; - vr_uh = 0; - pdu_lost = false; - if(rx_sdu) { - pool->deallocate(rx_sdu); - rx_sdu = NULL; - } + tx.reestablish(); // calls stop and enables tx again + rx.reestablish(); // nothing else needed +} - if(tx_sdu) { - pool->deallocate(tx_sdu); - tx_sdu = NULL; - } - - if(reordering_timer) { - reordering_timer->stop(); - } - - // Drop all messages in RX window - std::map::iterator it; - for(it = rx_window.begin(); it != rx_window.end(); it++) { - pool->deallocate(it->second.buf); - } - rx_window.clear(); - pthread_mutex_unlock(&mutex); +void rlc_um::stop() +{ + tx.stop(); + rx.stop(); } + rlc_mode_t rlc_um::get_mode() { return RLC_MODE_UM; @@ -190,34 +152,12 @@ uint32_t rlc_um::get_bearer() ***************************************************************************/ void rlc_um::write_sdu(byte_buffer_t *sdu) { - if (!tx_enabled) { - byte_buffer_pool::get_instance()->deallocate(sdu); - return; - } - if (sdu) { - tx_sdu_queue.write(sdu); - log->info_hex(sdu->msg, sdu->N_bytes, "%s Tx SDU (%d B ,tx_sdu_queue_len=%d)", rrc->get_rb_name(lcid).c_str(), sdu->N_bytes, tx_sdu_queue.size()); - } else { - log->warning("NULL SDU pointer in write_sdu()\n"); - } + tx.write_sdu(sdu); } void rlc_um::write_sdu_nb(byte_buffer_t *sdu) { - if (!tx_enabled) { - byte_buffer_pool::get_instance()->deallocate(sdu); - return; - } - if (sdu) { - if (tx_sdu_queue.try_write(sdu)) { - log->info_hex(sdu->msg, sdu->N_bytes, "%s Tx SDU (%d B,tx_sdu_queue_len=%d)", rrc->get_rb_name(lcid).c_str(), sdu->N_bytes, tx_sdu_queue.size()); - } else { - log->debug_hex(sdu->msg, sdu->N_bytes, "[Dropped SDU] %s Tx SDU (%d B,tx_sdu_queue_len=%d)", rrc->get_rb_name(lcid).c_str(), sdu->N_bytes, tx_sdu_queue.size()); - pool->deallocate(sdu); - } - } else { - log->warning("NULL SDU pointer in write_sdu()\n"); - } + tx.try_write_sdu(sdu); } /**************************************************************************** @@ -226,109 +166,191 @@ void rlc_um::write_sdu_nb(byte_buffer_t *sdu) uint32_t rlc_um::get_buffer_state() { - // Bytes needed for tx SDUs - uint32_t n_sdus = tx_sdu_queue.size(); + return tx.get_buffer_size_bytes(); +} - uint32_t n_bytes = tx_sdu_queue.size_bytes(); - if(tx_sdu) - { - n_sdus++; - n_bytes += tx_sdu->N_bytes; +uint32_t rlc_um::get_total_buffer_state() +{ + return get_buffer_state(); +} + +int rlc_um::read_pdu(uint8_t *payload, uint32_t nof_bytes) +{ + return tx.build_data_pdu(payload, nof_bytes); +} + +void rlc_um::write_pdu(uint8_t *payload, uint32_t nof_bytes) +{ + rx.handle_data_pdu(payload, nof_bytes); +} + + +/**************************************************************************** + * Helper functions + ***************************************************************************/ + +std::string rlc_um::get_rb_name(srsue::rrc_interface_rlc *rrc, uint32_t lcid, bool is_mrb) +{ + if(is_mrb) { + std::stringstream ss; + ss << "MRB" << lcid; + return ss.str(); + } else { + return rrc->get_rb_name(lcid); } +} - // Room needed for header extensions? (integer rounding) - if(n_sdus > 1) - n_bytes += ((n_sdus-1)*1.5)+0.5; - // Room needed for fixed header? - if(n_bytes > 0) - n_bytes += (cfg.is_mrb)?2:3; +/**************************************************************************** + * Tx subclass implementation + ***************************************************************************/ - return n_bytes; +rlc_um::rlc_um_tx::rlc_um_tx(uint32_t queue_len) + :tx_sdu_queue(queue_len) + ,pool(byte_buffer_pool::get_instance()) + ,log(NULL) + ,tx_sdu(NULL) + ,vt_us(0) + ,tx_enabled(false) +{ + pthread_mutex_init(&mutex, NULL); } -uint32_t rlc_um::get_total_buffer_state() + +rlc_um::rlc_um_tx::~rlc_um_tx() { - return get_buffer_state(); + pthread_mutex_destroy(&mutex); } -int rlc_um::read_pdu(uint8_t *payload, uint32_t nof_bytes) + +void rlc_um::rlc_um_tx::init(srslte::log *log_) { - int r; - log->debug("MAC opportunity - %d bytes\n", nof_bytes); - pthread_mutex_lock(&mutex); - r = build_data_pdu(payload, nof_bytes); - pthread_mutex_unlock(&mutex); - return r; + log = log_; + tx_enabled = true; } -void rlc_um::write_pdu(uint8_t *payload, uint32_t nof_bytes) + +void rlc_um::rlc_um_tx::configure(srslte_rlc_config_t cnfg_, std::string rb_name_) +{ + cfg = cnfg_.um; + if(cfg.is_mrb){ + tx_sdu_queue.resize(512); + } + rb_name = rb_name_; +} + + +void rlc_um::rlc_um_tx::stop() +{ + tx_enabled = false; + empty_queue(); +} + + +void rlc_um::rlc_um_tx::reestablish() +{ + stop(); + tx_enabled = true; +} + + +void rlc_um::rlc_um_tx::empty_queue() { pthread_mutex_lock(&mutex); - handle_data_pdu(payload, nof_bytes); + + // deallocate all SDUs in transmit queue + while(tx_sdu_queue.size() > 0) { + byte_buffer_t *buf; + tx_sdu_queue.read(&buf); + pool->deallocate(buf); + } + + // deallocate SDU that is currently processed + if(tx_sdu) { + pool->deallocate(tx_sdu); + tx_sdu = NULL; + } + pthread_mutex_unlock(&mutex); } -/**************************************************************************** - * Timeout callback interface - ***************************************************************************/ -void rlc_um::timer_expired(uint32_t timeout_id) +uint32_t rlc_um::rlc_um_tx::get_buffer_size_bytes() { - if(reordering_timer_id == timeout_id) - { - pthread_mutex_lock(&mutex); + // Bytes needed for tx SDUs + uint32_t n_sdus = tx_sdu_queue.size(); + uint32_t n_bytes = tx_sdu_queue.size_bytes(); + if(tx_sdu) { + n_sdus++; + n_bytes += tx_sdu->N_bytes; + } - // 36.322 v10 Section 5.1.2.2.4 - log->info("%s reordering timeout expiry - updating vr_ur and reassembling\n", - rb_name().c_str()); + // Room needed for header extensions? (integer rounding) + if(n_sdus > 1) { + n_bytes += ((n_sdus-1)*1.5)+0.5; + } - log->warning("Lost PDU SN: %d\n", vr_ur); - pdu_lost = true; - rx_sdu->reset(); - while(RX_MOD_BASE(vr_ur) < RX_MOD_BASE(vr_ux)) - { - vr_ur = (vr_ur + 1)%cfg.rx_mod; - log->debug("Entering Reassemble from timeout id=%d\n", timeout_id); - reassemble_rx_sdus(); - log->debug("Finished reassemble from timeout id=%d\n", timeout_id); - } - reordering_timer->stop(); - if(RX_MOD_BASE(vr_uh) > RX_MOD_BASE(vr_ur)) - { - reordering_timer->set(this, cfg.t_reordering); - reordering_timer->run(); - vr_ux = vr_uh; - } + // Room needed for fixed header? + if(n_bytes > 0) + n_bytes += (cfg.is_mrb)?2:3; - debug_state(); - pthread_mutex_unlock(&mutex); + return n_bytes; +} + + +void rlc_um::rlc_um_tx::write_sdu(byte_buffer_t *sdu) +{ + if (!tx_enabled) { + byte_buffer_pool::get_instance()->deallocate(sdu); + return; + } + + if (sdu) { + tx_sdu_queue.write(sdu); + log->info_hex(sdu->msg, sdu->N_bytes, "%s Tx SDU (%d B ,tx_sdu_queue_len=%d)", get_rb_name(), sdu->N_bytes, tx_sdu_queue.size()); + } else { + log->warning("NULL SDU pointer in write_sdu()\n"); } } -bool rlc_um::reordering_timeout_running() + +void rlc_um::rlc_um_tx::try_write_sdu(byte_buffer_t *sdu) { - return reordering_timer->is_running(); + if (!tx_enabled) { + byte_buffer_pool::get_instance()->deallocate(sdu); + return; + } + + if (sdu) { + if (tx_sdu_queue.try_write(sdu)) { + log->info_hex(sdu->msg, sdu->N_bytes, "%s Tx SDU (%d B ,tx_sdu_queue_len=%d)", get_rb_name(), sdu->N_bytes, tx_sdu_queue.size()); + } else { + log->warning_hex(sdu->msg, sdu->N_bytes, "[Dropped SDU] %s Tx SDU (%d B ,tx_sdu_queue_len=%d)", get_rb_name(), sdu->N_bytes, tx_sdu_queue.size()); + pool->deallocate(sdu); + } + } else { + log->warning("NULL SDU pointer in write_sdu()\n"); + } } -/**************************************************************************** - * Helpers - ***************************************************************************/ -int rlc_um::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) +int rlc_um::rlc_um_tx::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) { - if(!tx_sdu && tx_sdu_queue.size() == 0) - { + pthread_mutex_lock(&mutex); + log->debug("MAC opportunity - %d bytes\n", nof_bytes); + if(!tx_sdu && tx_sdu_queue.size() == 0) { log->info("No data available to be sent\n"); + pthread_mutex_unlock(&mutex); return 0; } byte_buffer_t *pdu = pool_allocate; - if(!pdu || pdu->N_bytes != 0) - { + if(!pdu || pdu->N_bytes != 0) { log->error("Failed to allocate PDU buffer\n"); - return -1; + pthread_mutex_unlock(&mutex); + return 0; } + rlc_umd_pdu_header_t header; header.fi = RLC_FI_FIELD_START_AND_END_ALIGNED; header.sn = vt_us; @@ -346,17 +368,17 @@ int rlc_um::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) { pool->deallocate(pdu); log->warning("%s Cannot build a PDU - %d bytes available, %d bytes required for header\n", - rb_name().c_str(), nof_bytes, head_len); + get_rb_name(), nof_bytes, head_len); + pthread_mutex_unlock(&mutex); return 0; } // Check for SDU segment - if(tx_sdu) - { + if(tx_sdu) { uint32_t space = pdu_space-head_len; to_move = space >= tx_sdu->N_bytes ? tx_sdu->N_bytes : space; log->debug("%s adding remainder of SDU segment - %d bytes of %d remaining\n", - rb_name().c_str(), to_move, tx_sdu->N_bytes); + get_rb_name(), to_move, tx_sdu->N_bytes); memcpy(pdu_ptr, tx_sdu->msg, to_move); last_li = to_move; pdu_ptr += to_move; @@ -366,7 +388,7 @@ int rlc_um::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) if(tx_sdu->N_bytes == 0) { log->debug("%s Complete SDU scheduled for tx. Stack latency: %ld us\n", - rrc->get_rb_name(lcid).c_str(), tx_sdu->get_latency_us()); + get_rb_name(), tx_sdu->get_latency_us()); pool->deallocate(tx_sdu); tx_sdu = NULL; @@ -376,8 +398,7 @@ int rlc_um::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) } // Pull SDUs from queue - while(pdu_space > head_len + 1 && tx_sdu_queue.size() > 0) - { + while(pdu_space > head_len + 1 && tx_sdu_queue.size() > 0) { log->debug("pdu_space=%d, head_len=%d\n", pdu_space, head_len); if(last_li > 0) header.li[header.N_li++] = last_li; @@ -386,17 +407,16 @@ int rlc_um::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) uint32_t space = pdu_space-head_len; to_move = space >= tx_sdu->N_bytes ? tx_sdu->N_bytes : space; log->debug("%s adding new SDU segment - %d bytes of %d remaining\n", - rb_name().c_str(), to_move, tx_sdu->N_bytes); + get_rb_name(), to_move, tx_sdu->N_bytes); memcpy(pdu_ptr, tx_sdu->msg, to_move); last_li = to_move; pdu_ptr += to_move; pdu->N_bytes += to_move; tx_sdu->N_bytes -= to_move; tx_sdu->msg += to_move; - if(tx_sdu->N_bytes == 0) - { + if(tx_sdu->N_bytes == 0) { log->debug("%s Complete SDU scheduled for tx. Stack latency: %ld us\n", - rrc->get_rb_name(lcid).c_str(), tx_sdu->get_latency_us()); + get_rb_name(), tx_sdu->get_latency_us()); pool->deallocate(tx_sdu); tx_sdu = NULL; @@ -404,89 +424,179 @@ int rlc_um::build_data_pdu(uint8_t *payload, uint32_t nof_bytes) pdu_space -= to_move; } - if(tx_sdu) + if(tx_sdu) { header.fi |= RLC_FI_FIELD_NOT_END_ALIGNED; // Last byte does not correspond to last byte of SDU + } // Set SN - header.sn = vt_us; vt_us = (vt_us + 1)%cfg.tx_mod; // Add header and TX - log->debug("%s packing PDU with length %d\n", rb_name().c_str(), pdu->N_bytes); + log->debug("%s packing PDU with length %d\n", get_rb_name(), pdu->N_bytes); rlc_um_write_data_pdu_header(&header, pdu); memcpy(payload, pdu->msg, pdu->N_bytes); uint32_t ret = pdu->N_bytes; - log->debug("%s returning length %d\n", rrc->get_rb_name(lcid).c_str(), pdu->N_bytes); + log->debug("%s returning length %d\n", get_rb_name(), pdu->N_bytes); pool->deallocate(pdu); debug_state(); + + pthread_mutex_unlock(&mutex); return ret; } -void rlc_um::handle_data_pdu(uint8_t *payload, uint32_t nof_bytes) + +const char* rlc_um::rlc_um_tx::get_rb_name() +{ + return rb_name.c_str(); +} + + +void rlc_um::rlc_um_tx::debug_state() +{ + log->debug("%s vt_us = %d\n", get_rb_name(), vt_us); +} + +/**************************************************************************** + * Rx subclass implementation + ***************************************************************************/ + +rlc_um::rlc_um_rx::rlc_um_rx() + :reordering_timer(NULL) + ,reordering_timer_id(0) + ,pool(byte_buffer_pool::get_instance()) + ,log(NULL) + ,pdcp(NULL) + ,rrc(NULL) + ,rx_sdu(NULL) + ,vr_ur(0) + ,vr_ux (0) + ,vr_uh(0) + ,vr_ur_in_rx_sdu(0) + ,pdu_lost(false) + ,mac_timers(NULL) + ,lcid(0) +{ + pthread_mutex_init(&mutex, NULL); +} + + +rlc_um::rlc_um_rx::~rlc_um_rx() { + pthread_mutex_destroy(&mutex); +} + + +void rlc_um::rlc_um_rx::init(srslte::log *log_, uint32_t lcid_, srsue::pdcp_interface_rlc *pdcp_, srsue::rrc_interface_rlc *rrc_, srslte::mac_interface_timers *mac_timers_) +{ + log = log_; + lcid = lcid_; + pdcp = pdcp_; + rrc = rrc_; + mac_timers = mac_timers_; + reordering_timer_id = mac_timers->timer_get_unique_id(); + reordering_timer = mac_timers->timer_get(reordering_timer_id); +} + + +void rlc_um::rlc_um_rx::reestablish() +{ + stop(); +} + + +void rlc_um::rlc_um_rx::stop() +{ + pthread_mutex_lock(&mutex); + if(reordering_timer) { + reordering_timer->stop(); + } + vr_ur = 0; + vr_ux = 0; + vr_uh = 0; + pdu_lost = false; + if(rx_sdu) { + pool->deallocate(rx_sdu); + rx_sdu = NULL; + } + + if (mac_timers && reordering_timer) { + mac_timers->timer_release_id(reordering_timer_id); + reordering_timer = NULL; + } + + // Drop all messages in RX window + std::map::iterator it; + for(it = rx_window.begin(); it != rx_window.end(); it++) { + pool->deallocate(it->second.buf); + } + rx_window.clear(); + pthread_mutex_unlock(&mutex); +} + + +void rlc_um::rlc_um_rx::handle_data_pdu(uint8_t *payload, uint32_t nof_bytes) +{ + pthread_mutex_lock(&mutex); + rlc_umd_pdu_t pdu; + int header_len = 0; std::map::iterator it; rlc_umd_pdu_header_t header; rlc_um_read_data_pdu_header(payload, nof_bytes, cfg.rx_sn_field_length, &header); - log->info_hex(payload, nof_bytes, "RX %s Rx data PDU SN: %d", - rb_name().c_str(), header.sn); + log->info_hex(payload, nof_bytes, "RX %s Rx data PDU SN: %d", get_rb_name(), header.sn); if(RX_MOD_BASE(header.sn) >= RX_MOD_BASE(vr_uh-cfg.rx_window_size) && RX_MOD_BASE(header.sn) < RX_MOD_BASE(vr_ur)) { log->info("%s SN: %d outside rx window [%d:%d] - discarding\n", - rb_name().c_str(), header.sn, vr_ur, vr_uh); - return; + get_rb_name(), header.sn, vr_ur, vr_uh); + goto unlock_and_exit; } it = rx_window.find(header.sn); if(rx_window.end() != it) { - log->info("%s Discarding duplicate SN: %d\n", - rb_name().c_str(), header.sn); - return; + log->info("%s Discarding duplicate SN: %d\n", get_rb_name(), header.sn); + goto unlock_and_exit; } // Write to rx window - rlc_umd_pdu_t pdu; pdu.buf = pool_allocate; if (!pdu.buf) { log->error("Discarting packet: no space in buffer pool\n"); - return; + goto unlock_and_exit; } memcpy(pdu.buf->msg, payload, nof_bytes); pdu.buf->N_bytes = nof_bytes; //Strip header from PDU - int header_len = rlc_um_packed_length(&header); + header_len = rlc_um_packed_length(&header); pdu.buf->msg += header_len; pdu.buf->N_bytes -= header_len; pdu.header = header; rx_window[header.sn] = pdu; - + // Update vr_uh - if(!inside_reordering_window(header.sn)) + if(!inside_reordering_window(header.sn)) { vr_uh = (header.sn + 1)%cfg.rx_mod; + } // Reassemble and deliver SDUs, while updating vr_ur log->debug("Entering Reassemble from received PDU\n"); reassemble_rx_sdus(); log->debug("Finished reassemble from received PDU\n"); - + // Update reordering variables and timers - if(reordering_timer->is_running()) - { + if(reordering_timer->is_running()) { if(RX_MOD_BASE(vr_ux) <= RX_MOD_BASE(vr_ur) || (!inside_reordering_window(vr_ux) && vr_ux != vr_uh)) { reordering_timer->stop(); } } - if(!reordering_timer->is_running()) - { - if(RX_MOD_BASE(vr_uh) > RX_MOD_BASE(vr_ur)) - { + if(!reordering_timer->is_running()) { + if(RX_MOD_BASE(vr_uh) > RX_MOD_BASE(vr_ur)) { reordering_timer->set(this, cfg.t_reordering); reordering_timer->run(); vr_ux = vr_uh; @@ -494,9 +604,14 @@ void rlc_um::handle_data_pdu(uint8_t *payload, uint32_t nof_bytes) } debug_state(); + + unlock_and_exit: + pthread_mutex_unlock(&mutex); } -void rlc_um::reassemble_rx_sdus() + +// No locking required as only called from within handle_data_pdu and timer_expired which lock +void rlc_um::rlc_um_rx::reassemble_rx_sdus() { if(!rx_sdu) { rx_sdu = pool_allocate; @@ -536,7 +651,7 @@ void rlc_um::reassemble_rx_sdus() log->warning("Dropping remainder of lost PDU (lower edge middle segments, vr_ur=%d, vr_ur_in_rx_sdu=%d)\n", vr_ur, vr_ur_in_rx_sdu); rx_sdu->reset(); } else { - log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d, i=%d (lower edge middle segments)", rb_name().c_str(), vr_ur, i); + log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d, i=%d (lower edge middle segments)", get_rb_name(), vr_ur, i); rx_sdu->set_timestamp(); if(cfg.is_mrb){ pdcp->write_pdu_mch(lcid, rx_sdu); @@ -553,7 +668,6 @@ void rlc_um::reassemble_rx_sdus() } // Handle last segment - if (rx_sdu->N_bytes > 0 || rlc_um_start_aligned(rx_window[vr_ur].header.fi)) { log->debug("Writing last segment in SDU buffer. Lower edge vr_ur=%d, Buffer size=%d, segment size=%d\n", vr_ur, rx_sdu->N_bytes, rx_window[vr_ur].buf->N_bytes); @@ -567,7 +681,7 @@ void rlc_um::reassemble_rx_sdus() log->warning("Dropping remainder of lost PDU (lower edge last segments)\n"); rx_sdu->reset(); } else { - log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d (lower edge last segments)", rrc->get_rb_name(lcid).c_str(), vr_ur); + log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d (lower edge last segments)", get_rb_name(), vr_ur); rx_sdu->set_timestamp(); if(cfg.is_mrb){ pdcp->write_pdu_mch(lcid, rx_sdu); @@ -592,13 +706,10 @@ void rlc_um::reassemble_rx_sdus() vr_ur = (vr_ur + 1)%cfg.rx_mod; } - // Now update vr_ur until we reach an SN we haven't yet received - while(rx_window.end() != rx_window.find(vr_ur)) - { + while(rx_window.end() != rx_window.find(vr_ur)) { // Handle any SDU segments - for(uint32_t i=0; idebug("Concatenating %d bytes in to current length %d. rx_window remaining bytes=%d, vr_ur_in_rx_sdu=%d, vr_ur=%d, rx_mod=%d, last_mod=%d\n", - len, rx_sdu->N_bytes, rx_window[vr_ur].buf->N_bytes, vr_ur_in_rx_sdu, vr_ur, cfg.rx_mod, (vr_ur_in_rx_sdu+1)%cfg.rx_mod); + len, rx_sdu->N_bytes, rx_window[vr_ur].buf->N_bytes, vr_ur_in_rx_sdu, vr_ur, cfg.rx_mod, (vr_ur_in_rx_sdu+1)%cfg.rx_mod); memcpy(&rx_sdu->msg[rx_sdu->N_bytes], rx_window[vr_ur].buf->msg, len); rx_sdu->N_bytes += len; rx_window[vr_ur].buf->msg += len; @@ -628,7 +739,7 @@ void rlc_um::reassemble_rx_sdus() log->warning("Dropping remainder of lost PDU (update vr_ur middle segments, vr_ur=%d, vr_ur_in_rx_sdu=%d)\n", vr_ur, vr_ur_in_rx_sdu); rx_sdu->reset(); } else { - log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d, i=%d, (update vr_ur middle segments)", rb_name().c_str(), vr_ur, i); + log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d, i=%d, (update vr_ur middle segments)", get_rb_name(), vr_ur, i); rx_sdu->set_timestamp(); if(cfg.is_mrb){ pdcp->write_pdu_mch(lcid, rx_sdu); @@ -665,13 +776,12 @@ void rlc_um::reassemble_rx_sdus() rx_sdu->N_bytes, rx_window[vr_ur].buf->N_bytes, vr_ur); } vr_ur_in_rx_sdu = vr_ur; - if(rlc_um_end_aligned(rx_window[vr_ur].header.fi)) - { + if(rlc_um_end_aligned(rx_window[vr_ur].header.fi)) { if(pdu_lost && !rlc_um_start_aligned(rx_window[vr_ur].header.fi)) { log->warning("Dropping remainder of lost PDU (update vr_ur last segments)\n"); rx_sdu->reset(); } else { - log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d (update vr_ur last segments)", rb_name().c_str(), vr_ur); + log->info_hex(rx_sdu->msg, rx_sdu->N_bytes, "%s Rx SDU vr_ur=%d (update vr_ur last segments)", get_rb_name(), vr_ur); rx_sdu->set_timestamp(); if(cfg.is_mrb){ pdcp->write_pdu_mch(lcid, rx_sdu); @@ -687,7 +797,7 @@ void rlc_um::reassemble_rx_sdus() pdu_lost = false; } -clean_up_rx_window: + clean_up_rx_window: // Clean up rx_window pool->deallocate(rx_window[vr_ur].buf); @@ -697,7 +807,8 @@ clean_up_rx_window: } } -bool rlc_um::inside_reordering_window(uint16_t sn) +// Only called when lock is hold +bool rlc_um::rlc_um_rx::inside_reordering_window(uint16_t sn) { if(cfg.rx_window_size == 0) { return true; @@ -711,23 +822,64 @@ bool rlc_um::inside_reordering_window(uint16_t sn) } } -void rlc_um::debug_state() + +/**************************************************************************** + * Timeout callback interface + ***************************************************************************/ + +void rlc_um::rlc_um_rx::timer_expired(uint32_t timeout_id) { - log->debug("%s vt_us = %d, vr_ur = %d, vr_ux = %d, vr_uh = %d \n", - rb_name().c_str(), vt_us, vr_ur, vr_ux, vr_uh); + if(reordering_timer_id == timeout_id) + { + pthread_mutex_lock(&mutex); -} + // 36.322 v10 Section 5.1.2.2.4 + log->info("%s reordering timeout expiry - updating vr_ur and reassembling\n", + get_rb_name()); -std::string rlc_um::rb_name() { - if(cfg.is_mrb) { - std::stringstream ss; - ss << "MRB" << lcid; - return ss.str(); - } else { - return rrc->get_rb_name(lcid); + log->warning("Lost PDU SN: %d\n", vr_ur); + pdu_lost = true; + rx_sdu->reset(); + while(RX_MOD_BASE(vr_ur) < RX_MOD_BASE(vr_ux)) + { + vr_ur = (vr_ur + 1)%cfg.rx_mod; + log->debug("Entering Reassemble from timeout id=%d\n", timeout_id); + reassemble_rx_sdus(); + log->debug("Finished reassemble from timeout id=%d\n", timeout_id); + } + reordering_timer->stop(); + if(RX_MOD_BASE(vr_uh) > RX_MOD_BASE(vr_ur)) + { + reordering_timer->set(this, cfg.t_reordering); + reordering_timer->run(); + vr_ux = vr_uh; + } + + debug_state(); + pthread_mutex_unlock(&mutex); } } +bool rlc_um::rlc_um_rx::reordering_timeout_running() +{ + return reordering_timer->is_running(); +} + +/**************************************************************************** + * Helper functions + ***************************************************************************/ + +void rlc_um::rlc_um_rx::debug_state() +{ + log->debug("%s vr_ur = %d, vr_ux = %d, vr_uh = %d\n", get_rb_name(), vr_ur, vr_ux, vr_uh); +} + +const char* rlc_um::rlc_um_rx::get_rb_name() +{ + return rb_name.c_str(); +} + + /**************************************************************************** * Header pack/unpack helper functions * Ref: 3GPP TS 36.322 v10.0.0 Section 6.2.1 diff --git a/srsue/src/upper/nas.cc b/srsue/src/upper/nas.cc index 5601896b5..df581e21a 100644 --- a/srsue/src/upper/nas.cc +++ b/srsue/src/upper/nas.cc @@ -209,7 +209,12 @@ bool nas::rrc_connect() { } // Generate service request or attach request message - byte_buffer_t *dedicatedInfoNAS = pool_allocate; + byte_buffer_t *dedicatedInfoNAS = pool_allocate_blocking; + if (!dedicatedInfoNAS) { + nas_log->error("Fatal Error: Couldn't allocate PDU in rrc_connect().\n"); + return false; + } + if (state == EMM_STATE_REGISTERED) { gen_service_request(dedicatedInfoNAS); } else { @@ -1111,7 +1116,7 @@ void nas::gen_pdn_connectivity_request(LIBLTE_BYTE_MSG_STRUCT *msg) { } void nas::send_security_mode_reject(uint8_t cause) { - byte_buffer_t *msg = pool_allocate; + byte_buffer_t *msg = pool_allocate_blocking; if (!msg) { nas_log->error("Fatal Error: Couldn't allocate PDU in send_security_mode_reject().\n"); return; @@ -1129,7 +1134,7 @@ void nas::send_security_mode_reject(uint8_t cause) { void nas::send_authentication_response(const uint8_t* res, const size_t res_len, const uint8_t sec_hdr_type) { - byte_buffer_t *pdu = pool_allocate; + byte_buffer_t *pdu = pool_allocate_blocking; if (!pdu) { nas_log->error("Fatal Error: Couldn't allocate PDU in send_authentication_response().\n"); return; @@ -1164,7 +1169,7 @@ void nas::send_authentication_response(const uint8_t* res, const size_t res_len, void nas::send_authentication_failure(const uint8_t cause, const uint8_t* auth_fail_param) { - byte_buffer_t *msg = pool_allocate; + byte_buffer_t *msg = pool_allocate_blocking; if (!msg) { nas_log->error("Fatal Error: Couldn't allocate PDU in send_authentication_failure().\n"); return; @@ -1192,7 +1197,7 @@ void nas::send_authentication_failure(const uint8_t cause, const uint8_t* auth_f void nas::send_identity_response() {} void nas::send_service_request() { - byte_buffer_t *msg = pool_allocate; + byte_buffer_t *msg = pool_allocate_blocking; if (!msg) { nas_log->error("Fatal Error: Couldn't allocate PDU in send_service_request().\n"); return;