From 125f1e72820fa3cd5514acabb41e1fcc6cd794e4 Mon Sep 17 00:00:00 2001 From: Xavier Arteaga Date: Tue, 9 Jul 2019 14:42:45 +0200 Subject: [PATCH] ZMQ: Split Tx and Rx, bug fixes and clean up --- lib/include/srslte/phy/utils/ringbuffer.h | 5 +- lib/src/phy/rf/CMakeLists.txt | 2 +- lib/src/phy/rf/rf_zmq_imp.c | 393 +++++----------------- lib/src/phy/rf/rf_zmq_imp.h | 2 - lib/src/phy/rf/rf_zmq_imp_rx.c | 198 +++++++++++ lib/src/phy/rf/rf_zmq_imp_trx.h | 91 +++++ lib/src/phy/rf/rf_zmq_imp_tx.c | 176 ++++++++++ lib/src/phy/utils/ringbuffer.c | 103 ++++-- 8 files changed, 635 insertions(+), 335 deletions(-) create mode 100644 lib/src/phy/rf/rf_zmq_imp_rx.c create mode 100644 lib/src/phy/rf/rf_zmq_imp_trx.h create mode 100644 lib/src/phy/rf/rf_zmq_imp_tx.c diff --git a/lib/include/srslte/phy/utils/ringbuffer.h b/lib/include/srslte/phy/utils/ringbuffer.h index 83a9715d2..247369a04 100644 --- a/lib/include/srslte/phy/utils/ringbuffer.h +++ b/lib/include/srslte/phy/utils/ringbuffer.h @@ -35,7 +35,8 @@ typedef struct { int wpm; int rpm; pthread_mutex_t mutex; - pthread_cond_t cvar; + pthread_cond_t write_cvar; + pthread_cond_t read_cvar; } srslte_ringbuffer_t; #ifdef __cplusplus @@ -54,6 +55,8 @@ SRSLTE_API int srslte_ringbuffer_space(srslte_ringbuffer_t *q); SRSLTE_API int srslte_ringbuffer_write(srslte_ringbuffer_t* q, void* ptr, int nof_bytes); +SRSLTE_API int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* ptr, int nof_bytes, uint32_t timeout_ms); + SRSLTE_API int srslte_ringbuffer_read(srslte_ringbuffer_t* q, void* ptr, int nof_bytes); SRSLTE_API int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms); diff --git a/lib/src/phy/rf/CMakeLists.txt b/lib/src/phy/rf/CMakeLists.txt index 9ae3494b7..c20a9bd6f 100644 --- a/lib/src/phy/rf/CMakeLists.txt +++ b/lib/src/phy/rf/CMakeLists.txt @@ -44,7 +44,7 @@ if(RF_FOUND) if (ZEROMQ_FOUND) add_definitions(-DENABLE_ZEROMQ) - list(APPEND SOURCES_RF rf_zmq_imp.c) + list(APPEND SOURCES_RF rf_zmq_imp.c rf_zmq_imp_tx.c rf_zmq_imp_rx.c) endif (ZEROMQ_FOUND) add_library(srslte_rf SHARED ${SOURCES_RF}) diff --git a/lib/src/phy/rf/rf_zmq_imp.c b/lib/src/phy/rf/rf_zmq_imp.c index 43d7614d8..d262e67f1 100644 --- a/lib/src/phy/rf/rf_zmq_imp.c +++ b/lib/src/phy/rf/rf_zmq_imp.c @@ -21,17 +21,14 @@ #include "rf_zmq_imp.h" #include "rf_helper.h" +#include "rf_zmq_imp_trx.h" #include -#include #include #include -#include -#include #include #include #include -#include -#include +#include #include typedef struct { @@ -39,11 +36,10 @@ typedef struct { char* devname; srslte_rf_info_t info; uint32_t nof_channels; - bool running; // RF State - double srate; // radio rate configured by upper layers - double base_srate; + uint32_t srate; // radio rate configured by upper layers + uint32_t base_srate; uint32_t decim_factor; // decimation factor between base_srate used on transport on radio's rate double rx_gain; double tx_freq; @@ -52,8 +48,8 @@ typedef struct { // Server void* context; - void* transmitter; - void* receiver; + rf_zmq_tx_t transmitter; + rf_zmq_rx_t receiver; char rx_port[PARAM_LEN]; char tx_port[PARAM_LEN]; @@ -61,31 +57,14 @@ typedef struct { // Various sample buffers cf_t* buffer_decimation; - cf_t* buffer_rx; cf_t* buffer_tx; - // Rx and Tx timestamps + // Rx timestamp uint64_t next_rx_ts; - uint64_t next_tx_ts; - - // Ringbuffer - srslte_ringbuffer_t rx_ringbuffer; pthread_t thread; - pthread_mutex_t mutex; - pthread_mutex_t mutex_tx; } rf_zmq_handler_t; -/* Definitions */ -#define VERBOSE 0 - -#define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t)) -#define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t)) -#define BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz -#define ZMQ_TIMEOUT_MS 1000 -#define ZMQ_MAXTRIALS 3 -#define ZMQ_TRX_MARGIN_MS 1 - void update_rates(rf_zmq_handler_t* handler, double srate); /* @@ -97,14 +76,14 @@ const char zmq_devname[4] = "zmq"; * Static methods */ -static inline void rf_zmq_info(rf_zmq_handler_t* handler, const char* format, ...) +void rf_zmq_info(char* id, const char* format, ...) { #if VERBOSE struct timeval t; gettimeofday(&t, NULL); va_list args; va_start(args, format); - printf("[%s@%02ld.%06ld] ", handler ? handler->id : "zmq", t.tv_sec % 10, t.tv_usec); + printf("[%s@%02ld.%06ld] ", id ? id : "zmq", t.tv_sec % 10, t.tv_usec); vprintf(format, args); va_end(args); #else /* VERBOSE */ @@ -112,7 +91,7 @@ static inline void rf_zmq_info(rf_zmq_handler_t* handler, const char* format, .. #endif /* VERBOSE */ } -static void rf_zmq_error(rf_zmq_handler_t* handler, const char* format, ...) +void rf_zmq_error(char* id, const char* format, ...) { struct timeval t; gettimeofday(&t, NULL); @@ -129,14 +108,12 @@ static inline int update_ts(void* h, uint64_t* ts, int nsamples, const char* dir if (h && nsamples > 0) { rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h; - pthread_mutex_lock(&handler->mutex); (*ts) += nsamples; - pthread_mutex_unlock(&handler->mutex); srslte_timestamp_t _ts = {}; srslte_timestamp_init_uint64(&_ts, *ts, handler->base_srate); - rf_zmq_info(handler, " -> next %s time after %d samples: %d + %.3f\n", dir, nsamples, _ts.full_secs, - _ts.frac_secs); + rf_zmq_info( + handler->id, " -> next %s time after %d samples: %d + %.3f\n", dir, nsamples, _ts.full_secs, _ts.frac_secs); ret = SRSLTE_SUCCESS; } @@ -144,7 +121,7 @@ static inline int update_ts(void* h, uint64_t* ts, int nsamples, const char* dir return ret; } -static inline int rf_zmq_handle_error(rf_zmq_handler_t* handler, const char* text) +int rf_zmq_handle_error(char* id, const char* text) { int ret = SRSLTE_SUCCESS; @@ -154,127 +131,18 @@ static inline int rf_zmq_handle_error(rf_zmq_handler_t* handler, const char* tex // handled errors case EFSM: case EAGAIN: - rf_zmq_info(handler, "Warning %s: %s\n", text, zmq_strerror(err)); + rf_zmq_info(id, "Warning %s: %s\n", text, zmq_strerror(err)); break; - // critical non-handled errors + + // critical non-handled errors default: ret = SRSLTE_ERROR; - rf_zmq_error(handler, "Error %s: %s\n", text, zmq_strerror(err)); + rf_zmq_error(id, "Error %s: %s\n", text, zmq_strerror(err)); } return ret; } -static int rf_zmq_tx(rf_zmq_handler_t* handler, uint8_t* buffer, uint32_t nbytes) -{ - int n, ntrials; - pthread_mutex_lock(&handler->mutex_tx); - - // Receive Transmit request - uint8_t dummy; - - for (ntrials = 0, n = -1; ntrials < ZMQ_MAXTRIALS && n < 0 && handler->running; ntrials++) { - n = zmq_recv(handler->transmitter, &dummy, sizeof(dummy), 0); - if (n < 0) { - if (rf_zmq_handle_error(handler, "tx request receive")) { - n = SRSLTE_ERROR; - goto clean_exit; - } - } else { - rf_zmq_info(handler, " - tx request received\n"); - rf_zmq_info(handler, " - sending %d samples (%d B)\n", NBYTES2NSAMPLES(nbytes), nbytes); - } - } - - // Send zeros - for (ntrials = 0, n = -1; ntrials < ZMQ_MAXTRIALS && n < 0 && handler->running; ntrials++) { - n = zmq_send(handler->transmitter, buffer, nbytes, 0); - if (n < 0) { - if (rf_zmq_handle_error(handler, "tx baseband send")) { - n = SRSLTE_ERROR; - goto clean_exit; - } - } else if (n != nbytes) { - rf_zmq_error(handler, "[zmq] Error: transmitter expected %d bytes and sent %d. %s.\n", nbytes, n, - strerror(zmq_errno())); - n = SRSLTE_ERROR; - goto clean_exit; - } - } - // update both tx timestamp and ringbuffer - update_ts(handler, &handler->next_tx_ts, NBYTES2NSAMPLES(nbytes), "tx"); - -clean_exit: - pthread_mutex_unlock(&handler->mutex_tx); - - return (n > 0) ? nbytes : SRSLTE_ERROR; -} - -static int rf_zmq_tx_zeros(rf_zmq_handler_t* handler, int32_t nsamples) -{ - rf_zmq_info(handler, "Tx %d zero samples\n", nsamples); - - if (NSAMPLES2NBYTES(nsamples) > ZMQ_MAX_RX_BYTES) { - // can't transmit zeros, buffer too small - fprintf(stderr, "[zmq] Error: zero buffer too small (%ld) to transmit %ld samples\n", ZMQ_MAX_RX_BYTES, - NSAMPLES2NBYTES(nsamples)); - return SRSLTE_ERROR; - } - - bzero(handler->buffer_tx, NSAMPLES2NBYTES(nsamples)); - - return rf_zmq_tx(handler, (uint8_t*)handler->buffer_tx, NSAMPLES2NBYTES(nsamples)); -} - -static void* rf_zmq_async_rx_thread(void* h) -{ - rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h; - - while (handler->receiver && handler->running) { - int n = SRSLTE_ERROR; - uint8_t dummy = 0xFF; - int ntrials = 0; - - rf_zmq_info(handler, "-- ASYNC RX wait...\n"); - - // Send request - for (ntrials = 0; n < 0 && ntrials < ZMQ_MAXTRIALS && handler->running; ntrials++) { - rf_zmq_info(handler, " - tx'ing rx request\n"); - n = zmq_send(handler->receiver, &dummy, sizeof(dummy), 0); - if (n < 0) { - if (rf_zmq_handle_error(handler, "synchronous rx request send")) { - return NULL; - } - } - } - - // Receive baseband - for (n = (n < 0) ? 0 : -1; n < 0 && handler->running;) { - n = zmq_recv(handler->receiver, handler->buffer_rx, BUFFER_SIZE, 0); - if (n == -1) { - if (rf_zmq_handle_error(handler, "asynchronous rx baseband receive")) { - return NULL; - } - } else if (n > BUFFER_SIZE) { - fprintf(stderr, "[zmq] Error: receiver expected <= %ld bytes and received %d at channel %d.\n", BUFFER_SIZE, n, - 0); - return NULL; - } - } - - // Write received data in buffer - if (n > 0) { - if (srslte_ringbuffer_write(&handler->rx_ringbuffer, handler->buffer_rx, n) != n) { - rf_zmq_error(handler, "[zmq] error writing asynchronous ring buffer...\n"); - } - rf_zmq_info(handler, " - received %d baseband samples (%d B). %d samples available.\n", NBYTES2NSAMPLES(n), n, - srslte_ringbuffer_status(&handler->rx_ringbuffer)); - } - } - - return NULL; -} - /* * Public methods */ @@ -342,7 +210,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) } bzero(handler, sizeof(rf_zmq_handler_t)); *h = handler; - handler->base_srate = 23.04e6; // Sample rate for 100 PRB cell + handler->base_srate = ZMQ_BASERATE_DEFAULT_HZ; // Sample rate for 100 PRB cell handler->rx_gain = 0.0; handler->info.max_rx_gain = +INFINITY; handler->info.min_rx_gain = -INFINITY; @@ -350,9 +218,6 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) handler->info.min_tx_gain = -INFINITY; strcpy(handler->id, "zmq\0"); - pthread_mutex_init(&handler->mutex, NULL); - pthread_mutex_init(&handler->mutex_tx, NULL); - // parse args if (args) { // base_srate @@ -363,7 +228,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) if (config_ptr) { copy_subdev_string(config_str, config_ptr + strlen(config_arg)); printf("Using base rate=%s\n", config_str); - handler->base_srate = strtod(config_str, NULL); + handler->base_srate = (uint32_t)strtod(config_str, NULL); remove_substring(args, config_arg); remove_substring(args, config_str); } @@ -424,118 +289,44 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) goto clean_exit; } + // initialize transmitter if (strlen(handler->tx_port) != 0) { - // Initialise transmitter - handler->transmitter = zmq_socket(handler->context, ZMQ_REP); - if (!handler->transmitter) { - fprintf(stderr, "[zmq] Error: creating transmitter socket\n"); - goto clean_exit; - } - - rf_zmq_info(handler, "Binding transmitter: %s\n", handler->tx_port); - - ret = zmq_bind(handler->transmitter, handler->tx_port); - if (ret) { - fprintf(stderr, "Error: connecting transmitter socket: %s\n", zmq_strerror(zmq_errno())); - goto clean_exit; - } - -#if ZMQ_TIMEOUT_MS - // set recv timeout for transmitter - int timeout = ZMQ_TIMEOUT_MS; - if (zmq_setsockopt(handler->transmitter, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on tx socket\n"); - goto clean_exit; - } - if (zmq_setsockopt(handler->transmitter, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on tx socket\n"); + if (rf_zmq_tx_open(&handler->transmitter, handler->id, handler->context, handler->tx_port) != SRSLTE_SUCCESS) { + fprintf(stderr, "[zmq] Error: opening transmitter\n"); goto clean_exit; } - - // set linger timeout for transmitter - timeout = 0; - if (zmq_setsockopt(handler->transmitter, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting linger timeout on tx socket\n"); - } -#endif - } else { fprintf(stdout, "[zmq] %s Tx port not specified. Disabling transmitter.\n", handler->id); } // initialize receiver if (strlen(handler->rx_port) != 0) { - handler->receiver = zmq_socket(handler->context, ZMQ_REQ); - if (!handler->receiver) { - fprintf(stderr, "[zmq] Error: creating receiver socket\n"); - goto clean_exit; - } - - rf_zmq_info(handler, "Connecting receiver: %s\n", handler->rx_port); - - ret = zmq_connect(handler->receiver, handler->rx_port); - if (ret) { - fprintf(stderr, "Error: binding receiver socket: %s\n", zmq_strerror(zmq_errno())); - goto clean_exit; - } - -#if ZMQ_TIMEOUT_MS - // set recv timeout for receiver - int timeout = ZMQ_TIMEOUT_MS; - if (zmq_setsockopt(handler->receiver, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on tx socket\n"); - goto clean_exit; - } - if (zmq_setsockopt(handler->receiver, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on tx socket\n"); - goto clean_exit; - } - - timeout = 0; - // set linger timeout for receiver - if (zmq_setsockopt(handler->receiver, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting linger timeout on rx socket\n"); - } -#endif - - // init rx ringbuffer - if (srslte_ringbuffer_init(&handler->rx_ringbuffer, BUFFER_SIZE)) { - fprintf(stderr, "Error, initiating rx ringbuffer\n"); + if (rf_zmq_rx_open(&handler->receiver, handler->id, handler->context, handler->rx_port) != SRSLTE_SUCCESS) { + fprintf(stderr, "[zmq] Error: opening receiver\n"); goto clean_exit; } } else { fprintf(stdout, "[zmq] %s Rx port not specified. Disabling receiver.\n", handler->id); } - if (handler->transmitter == NULL && handler->receiver == NULL) { + if (!handler->transmitter.running && !handler->receiver.running) { fprintf(stderr, "[zmq] Error: Neither Tx port nor Rx port specified.\n"); goto clean_exit; } // Create decimation and overflow buffer - handler->buffer_decimation = srslte_vec_malloc(ZMQ_MAX_RX_BYTES); + handler->buffer_decimation = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); if (!handler->buffer_decimation) { fprintf(stderr, "Error: allocating decimation buffer\n"); goto clean_exit; } - handler->buffer_tx = srslte_vec_malloc(ZMQ_MAX_RX_BYTES); + handler->buffer_tx = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); if (!handler->buffer_tx) { fprintf(stderr, "Error: allocating tx buffer\n"); goto clean_exit; } - handler->buffer_rx = srslte_vec_malloc(ZMQ_MAX_RX_BYTES); - if (!handler->buffer_rx) { - fprintf(stderr, "Error: allocating rx buffer\n"); - goto clean_exit; - } - - handler->running = true; - if (handler->receiver) { - pthread_create(&handler->thread, NULL, rf_zmq_async_rx_thread, handler); - } - ret = SRSLTE_SUCCESS; clean_exit: @@ -552,23 +343,15 @@ int rf_zmq_close(void* h) rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h; - rf_zmq_info(handler, "Closing %s ...\n", handler->id); + rf_zmq_info(handler->id, "Closing ...\n"); - handler->running = false; if (handler->thread) { pthread_join(handler->thread, NULL); pthread_detach(handler->thread); } - if (handler->transmitter) { - zmq_close(handler->transmitter); - handler->transmitter = NULL; - } - if (handler->receiver) { - zmq_close(handler->receiver); - handler->receiver = NULL; - srslte_ringbuffer_free(&handler->rx_ringbuffer); - } + rf_zmq_tx_close(&handler->transmitter); + rf_zmq_rx_close(&handler->receiver); if (handler->context) { zmq_ctx_destroy(handler->context); @@ -582,13 +365,6 @@ int rf_zmq_close(void* h) free(handler->buffer_tx); } - if (handler->buffer_rx) { - free(handler->buffer_rx); - } - - pthread_mutex_destroy(&handler->mutex); - pthread_mutex_destroy(&handler->mutex_tx); - // Free all free(handler); @@ -600,7 +376,7 @@ void update_rates(rf_zmq_handler_t* handler, double srate) if (handler) { // Decimation must be full integer if (((uint64_t)handler->base_srate % (uint64_t)srate) == 0) { - handler->srate = srate; + handler->srate = (uint32_t)srate; handler->decim_factor = handler->base_srate / handler->srate; } else { fprintf(stderr, "Error: couldn't update sample rate. %.2f is not divisible by %.2f\n", srate / 1e6, @@ -724,9 +500,8 @@ int rf_zmq_recv_with_time_multi( uint32_t nbytes = NSAMPLES2NBYTES(nsamples * handler->decim_factor); uint32_t nsamples_baserate = nsamples * handler->decim_factor; - uint32_t nbytes_baserate = NSAMPLES2NBYTES(nsamples_baserate); - rf_zmq_info(handler, "Rx %d samples (%d B)\n", nsamples, nbytes); + rf_zmq_info(handler->id, "Rx %d samples (%d B)\n", nsamples, nbytes); // set timestamp for this reception if (secs != NULL && frac_secs != NULL) { @@ -737,48 +512,55 @@ int rf_zmq_recv_with_time_multi( } // return if receiver is turned off - if (handler->receiver == NULL) { + if (!handler->receiver.running) { update_ts(handler, &handler->next_rx_ts, nsamples_baserate, "rx"); return nsamples; } // Check available buffer size - if (nbytes > ZMQ_MAX_RX_BYTES) { - fprintf(stderr, "[zmq] Error: Trying to receive %d B but buffer is only %ld B at channel %d.\n", nbytes, - ZMQ_MAX_RX_BYTES, 0); + if (nbytes > ZMQ_MAX_BUFFER_SIZE) { + fprintf(stderr, + "[zmq] Error: Trying to receive %d B but buffer is only %ld B at channel %d.\n", + nbytes, + ZMQ_MAX_BUFFER_SIZE, + 0); goto clean_exit; } // receive samples srslte_timestamp_t ts_tx = {}, ts_rx = {}; - srslte_timestamp_init_uint64(&ts_tx, handler->next_tx_ts, handler->base_srate); + srslte_timestamp_init_uint64(&ts_tx, handler->transmitter.nsamples, handler->base_srate); srslte_timestamp_init_uint64(&ts_rx, handler->next_rx_ts, handler->base_srate); - rf_zmq_info(handler, " - next rx time: %d + %.3f\n", ts_rx.full_secs, ts_rx.frac_secs); - rf_zmq_info(handler, " - next tx time: %d + %.3f\n", ts_tx.full_secs, ts_tx.frac_secs); + rf_zmq_info(handler->id, " - next rx time: %d + %.3f\n", ts_rx.full_secs, ts_rx.frac_secs); + rf_zmq_info(handler->id, " - next tx time: %d + %.3f\n", ts_tx.full_secs, ts_tx.frac_secs); // check for tx gap if we're also transmitting on this radio - if (handler->transmitter) { - uint32_t margin_nsamples = - (uint32_t)(handler->tx_used ? (0) : (nsamples_baserate + ZMQ_TRX_MARGIN_MS * handler->base_srate / 1000.0)); - int num_tx_gap_samples_base_rate = (int)(handler->next_rx_ts - handler->next_tx_ts + margin_nsamples); - if (num_tx_gap_samples_base_rate > 0) { - rf_zmq_info(handler, " - tx_gap of %d samples\n", num_tx_gap_samples_base_rate); - - // Transmit zero samples - rf_zmq_tx_zeros(handler, num_tx_gap_samples_base_rate); - } else { - rf_zmq_info(handler, " - no tx gap detected\n"); - } + if (handler->transmitter.running) { + rf_zmq_tx_align(&handler->transmitter, handler->next_rx_ts + nsamples_baserate); } + usleep((1000000 * nsamples) / handler->base_srate); + // copy from rx buffer as many samples as requested into provided buffer cf_t* ptr = (handler->decim_factor != 1) ? handler->buffer_decimation : data[0]; - if (srslte_ringbuffer_read(&handler->rx_ringbuffer, ptr, nbytes_baserate) != nbytes) { - fprintf(stderr, "Error: reading from rx ringbuffer.\n"); - goto clean_exit; + int32_t count = 0; + while (count < nsamples_baserate && handler->receiver.running) { + int32_t n = rf_zmq_rx_baseband(&handler->receiver, &ptr[count], nsamples_baserate); + if (n > 0) { + // No error + count += n; + } else if (n == SRSLTE_ERROR_TIMEOUT) { + // Timeout, do nothing, keep going + } else if (n > 0) { + // Other error, exit + fprintf(stderr, "Error: receiving data.\n"); + goto clean_exit; + } } - rf_zmq_info(handler, " - read %d samples. %d samples available\n", NBYTES2NSAMPLES(nbytes), - NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->rx_ringbuffer))); + rf_zmq_info(handler->id, + " - read %d samples. %d samples available\n", + NBYTES2NSAMPLES(nbytes), + NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->receiver.ringbuffer))); // decimate if needed if (handler->decim_factor != 1) { @@ -793,8 +575,11 @@ int rf_zmq_recv_with_time_multi( } dst[i] = avg; } - rf_zmq_info(handler, " - re-adjust bytes due to %dx decimation %d --> %d samples)\n", handler->decim_factor, - nsamples_baserate, nsamples); + rf_zmq_info(handler->id, + " - re-adjust bytes due to %dx decimation %d --> %d samples)\n", + handler->decim_factor, + nsamples_baserate, + nsamples); } // update rx time @@ -843,49 +628,47 @@ int rf_zmq_send_timed_multi(void* h, uint32_t nsamples_baseband = nsamples * handler->decim_factor; uint32_t nbytes_baseband = NSAMPLES2NBYTES(nsamples_baseband); - if (nbytes_baseband > ZMQ_MAX_RX_BYTES) { - fprintf(stderr, "Error: trying to transmit too many samples (%d > %ld).\n", nbytes, ZMQ_MAX_RX_BYTES); + if (nbytes_baseband > ZMQ_MAX_BUFFER_SIZE) { + fprintf(stderr, "Error: trying to transmit too many samples (%d > %ld).\n", nbytes, ZMQ_MAX_BUFFER_SIZE); goto clean_exit; } - rf_zmq_info(handler, "Tx %d samples (%d B)\n", nsamples, nbytes); + rf_zmq_info(handler->id, "Tx %d samples (%d B)\n", nsamples, nbytes); // return if transmitter is switched off - if (handler->tx_port == 0) { + if (strlen(handler->tx_port) == 0) { return SRSLTE_SUCCESS; } // check if this is a tx in the future if (has_time_spec) { - rf_zmq_info(handler, " - tx time: %d + %.3f\n", secs, frac_secs); + rf_zmq_info(handler->id, " - tx time: %d + %.3f\n", secs, frac_secs); srslte_timestamp_t ts = {}; srslte_timestamp_init(&ts, secs, frac_secs); uint64_t tx_ts = srslte_timestamp_uint64(&ts, handler->base_srate); - int32_t num_tx_gap_samples = (int32_t)((int64_t)tx_ts - (int64_t)handler->next_tx_ts); + int num_tx_gap_samples = rf_zmq_tx_align(&handler->transmitter, tx_ts); if (num_tx_gap_samples < 0) { - fprintf(stderr, "[zmq] Error: tx time is %.3f ms in the past (%ld < %ld)\n", - -1000.0 * num_tx_gap_samples / handler->base_srate, tx_ts, handler->next_tx_ts); + fprintf(stderr, + "[zmq] Error: tx time is %.3f ms in the past (%ld < %ld)\n", + -1000.0 * num_tx_gap_samples / handler->base_srate, + tx_ts, + handler->transmitter.nsamples); goto clean_exit; - } else if (num_tx_gap_samples > 0) { - rf_zmq_info(handler, " - tx gap of %d baseband samples\n", num_tx_gap_samples); - - // send zero samples - int n = rf_zmq_tx_zeros(handler, num_tx_gap_samples); - if (n == -1) { - goto clean_exit; - } - } else { - rf_zmq_info(handler, " - no tx gap detected\n"); } } + // Select buffer pointer depending on interpolation cf_t* buf = (handler->decim_factor != 1) ? handler->buffer_tx : data[0]; + // Interpolate if required if (handler->decim_factor != 1) { - rf_zmq_info(handler, " - re-adjust bytes due to %dx interpolation %d --> %d samples)\n", handler->decim_factor, - nsamples, nsamples_baseband); + rf_zmq_info(handler->id, + " - re-adjust bytes due to %dx interpolation %d --> %d samples)\n", + handler->decim_factor, + nsamples, + nsamples_baseband); int n = 0; cf_t* src = data[0]; @@ -903,8 +686,8 @@ int rf_zmq_send_timed_multi(void* h, } } - // send baseband samples - int n = rf_zmq_tx(handler, (uint8_t*)buf, nbytes_baseband); + // Send base-band samples + int n = rf_zmq_tx_baseband(&handler->transmitter, buf, nsamples_baseband); if (n == SRSLTE_ERROR) { goto clean_exit; } diff --git a/lib/src/phy/rf/rf_zmq_imp.h b/lib/src/phy/rf/rf_zmq_imp.h index be6ec1608..cab2f68e6 100644 --- a/lib/src/phy/rf/rf_zmq_imp.h +++ b/lib/src/phy/rf/rf_zmq_imp.h @@ -28,8 +28,6 @@ #define DEVNAME_ZMQ "ZeroMQ" #define PARAM_LEN (128) #define PARAM_LEN_SHORT (PARAM_LEN / 2) -#define ZMQ_MAX_RX_BYTES \ - (5 * SRSLTE_SF_LEN_MAX * sizeof(cf_t)) // Five subframes at max LTE rate using default FFT-length SRSLTE_API int rf_zmq_open(char* args, void** handler); diff --git a/lib/src/phy/rf/rf_zmq_imp_rx.c b/lib/src/phy/rf/rf_zmq_imp_rx.c new file mode 100644 index 000000000..9a20bbe0b --- /dev/null +++ b/lib/src/phy/rf/rf_zmq_imp_rx.c @@ -0,0 +1,198 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2019 Software Radio Systems Limited + * + * \section LICENSE + * + * This file is part of the srsLTE library. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "rf_zmq_imp_trx.h" +#include +#include +#include +#include +#include + +static void* rf_zmq_async_rx_thread(void* h) +{ + rf_zmq_rx_t* q = (rf_zmq_rx_t*)h; + + while (q->sock && q->running) { + int nbytes = 0; + int n = SRSLTE_ERROR; + uint8_t dummy = 0xFF; + + rf_zmq_info(q->id, "-- ASYNC RX wait...\n"); + + // Send request + while (n < 0 && q->running) { + rf_zmq_info(q->id, " - tx'ing rx request\n"); + n = zmq_send(q->sock, &dummy, sizeof(dummy), 0); + if (n < 0) { + if (rf_zmq_handle_error(q->id, "synchronous rx request send")) { + return NULL; + } + } + } + + // Receive baseband + for (n = (n < 0) ? 0 : -1; n < 0 && q->running;) { + n = zmq_recv(q->sock, q->temp_buffer, ZMQ_MAX_BUFFER_SIZE, 0); + if (n == -1) { + if (rf_zmq_handle_error(q->id, "asynchronous rx baseband receive")) { + return NULL; + } + + } else if (n > ZMQ_MAX_BUFFER_SIZE) { + fprintf(stderr, + "[zmq] Error: receiver expected <= %ld bytes and received %d at channel %d.\n", + ZMQ_MAX_BUFFER_SIZE, + n, + 0); + return NULL; + } else { + nbytes = n; + } + } + + // Write received data in buffer + if (nbytes > 0) { + n = -1; + + // Try to write in ring buffer + while (n < 0 && q->running) { + n = srslte_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, ZMQ_TIMEOUT_MS); + } + + // Check write + if (nbytes == n) { + rf_zmq_info(q->id, + " - received %d baseband samples (%d B). %d samples available.\n", + NBYTES2NSAMPLES(n), + n, + NBYTES2NSAMPLES(srslte_ringbuffer_status(&q->ringbuffer))); + } + } + } + + return NULL; +} + +int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args) +{ + int ret = SRSLTE_ERROR; + + if (q) { + // Copy id + strncpy(q->id, id, 16); + + // Zero object + bzero(q, sizeof(rf_zmq_tx_t)); + + // Create socket + q->sock = zmq_socket(zmq_ctx, ZMQ_REQ); + if (!q->sock) { + fprintf(stderr, "[zmq] Error: creating transmitter socket\n"); + goto clean_exit; + } + + rf_zmq_info(q->id, "Connecting receiver: %s\n", sock_args); + + ret = zmq_connect(q->sock, sock_args); + if (ret) { + fprintf(stderr, "Error: connecting receiver socket: %s\n", zmq_strerror(zmq_errno())); + goto clean_exit; + } + +#if ZMQ_TIMEOUT_MS + int timeout = ZMQ_TIMEOUT_MS; + if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on rx socket\n"); + goto clean_exit; + } + + if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on rx socket\n"); + goto clean_exit; + } + + timeout = 0; + if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting linger timeout on rx socket\n"); + goto clean_exit; + } +#endif + + if (srslte_ringbuffer_init(&q->ringbuffer, ZMQ_MAX_BUFFER_SIZE)) { + fprintf(stderr, "Error: initiating ringbuffer\n"); + goto clean_exit; + } + + q->temp_buffer = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); + if (!q->temp_buffer) { + fprintf(stderr, "Error: allocating rx buffer\n"); + goto clean_exit; + } + + if (pthread_mutex_init(&q->mutex, NULL)) { + fprintf(stderr, "Error: creating mutex\n"); + goto clean_exit; + } + + if (pthread_create(&q->thread, NULL, rf_zmq_async_rx_thread, q)) { + fprintf(stderr, "Error: creating thread\n"); + goto clean_exit; + } + + q->running = true; + ret = SRSLTE_SUCCESS; + } + +clean_exit: + return ret; +} + +int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples) +{ + return srslte_ringbuffer_read_timed(&q->ringbuffer, buffer, NSAMPLES2NBYTES(nsamples), ZMQ_TIMEOUT_MS); +} + +void rf_zmq_rx_close(rf_zmq_rx_t* q) +{ + rf_zmq_info(q->id, "Closing ...\n"); + q->running = false; + + if (q->thread) { + pthread_join(q->thread, NULL); + pthread_detach(q->thread); + } + + srslte_ringbuffer_free(&q->ringbuffer); + + if (q->temp_buffer) { + free(q->temp_buffer); + } + + if (q->sock) { + zmq_close(q->sock); + q->sock = NULL; + } +} diff --git a/lib/src/phy/rf/rf_zmq_imp_trx.h b/lib/src/phy/rf/rf_zmq_imp_trx.h new file mode 100644 index 000000000..2f6f8727f --- /dev/null +++ b/lib/src/phy/rf/rf_zmq_imp_trx.h @@ -0,0 +1,91 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2019 Software Radio Systems Limited + * + * \section LICENSE + * + * This file is part of the srsLTE library. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#ifndef SRSLTE_RF_ZMQ_IMP_TRX_H +#define SRSLTE_RF_ZMQ_IMP_TRX_H + +#include +#include +#include + +/* Definitions */ +#define VERBOSE (0) +#define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t)) +#define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t)) +#define ZMQ_MAX_BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz +#define ZMQ_TIMEOUT_MS (1000) +#define ZMQ_BASERATE_DEFAULT_HZ (23040000) + +typedef struct { + char id[16]; + void* sock; + uint64_t nsamples; + bool running; + pthread_mutex_t mutex; + cf_t* zeros; +} rf_zmq_tx_t; + +typedef struct { + char id[16]; + void* sock; + uint64_t nsamples; + bool running; + pthread_t thread; + pthread_mutex_t mutex; + srslte_ringbuffer_t ringbuffer; + cf_t* temp_buffer; +} rf_zmq_rx_t; + +/* + * Common functions + */ +SRSLTE_API void rf_zmq_info(char* id, const char* format, ...); + +SRSLTE_API void rf_zmq_error(char* id, const char* format, ...); + +SRSLTE_API int rf_zmq_handle_error(char* id, const char* text); + +/* + * Transmitter functions + */ +SRSLTE_API int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_args); + +SRSLTE_API int rf_zmq_tx_align(rf_zmq_tx_t* q, uint64_t ts); + +SRSLTE_API int rf_zmq_tx_baseband(rf_zmq_tx_t* q, cf_t* buffer, uint32_t nsamples); + +SRSLTE_API void rf_zmq_tx_close(rf_zmq_tx_t* q); + +/* + * Receiver functions + */ +SRSLTE_API int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args); + +SRSLTE_API int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples); + +SRSLTE_API void rf_zmq_rx_close(rf_zmq_rx_t* q); + +#endif // SRSLTE_RF_ZMQ_IMP_TRX_H diff --git a/lib/src/phy/rf/rf_zmq_imp_tx.c b/lib/src/phy/rf/rf_zmq_imp_tx.c new file mode 100644 index 000000000..6b1360dca --- /dev/null +++ b/lib/src/phy/rf/rf_zmq_imp_tx.c @@ -0,0 +1,176 @@ +/** + * + * \section COPYRIGHT + * + * Copyright 2013-2019 Software Radio Systems Limited + * + * \section LICENSE + * + * This file is part of the srsLTE library. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "rf_zmq_imp_trx.h" +#include +#include +#include +#include +#include + +int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_args) +{ + int ret = SRSLTE_ERROR; + + if (q) { + // Copy id + strncpy(q->id, id, 16); + + // Zero object + bzero(q, sizeof(rf_zmq_tx_t)); + + // Create socket + q->sock = zmq_socket(zmq_ctx, ZMQ_REP); + if (!q->sock) { + fprintf(stderr, "[zmq] Error: creating transmitter socket\n"); + goto clean_exit; + } + + rf_zmq_info(q->id, "Binding transmitter: %s\n", sock_args); + + ret = zmq_bind(q->sock, sock_args); + if (ret) { + fprintf(stderr, "Error: connecting transmitter socket: %s\n", zmq_strerror(zmq_errno())); + goto clean_exit; + } + +#if ZMQ_TIMEOUT_MS + int timeout = ZMQ_TIMEOUT_MS; + if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on tx socket\n"); + goto clean_exit; + } + + if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on tx socket\n"); + goto clean_exit; + } + + timeout = 0; + if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting linger timeout on tx socket\n"); + goto clean_exit; + } +#endif + + if (pthread_mutex_init(&q->mutex, NULL)) { + fprintf(stderr, "Error: creating mutex\n"); + goto clean_exit; + } + + q->zeros = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); + if (!q->zeros) { + fprintf(stderr, "Error: allocating zeros\n"); + goto clean_exit; + } + bzero(q->zeros, ZMQ_MAX_BUFFER_SIZE); + + q->running = true; + + ret = SRSLTE_SUCCESS; + } + +clean_exit: + return ret; +} + +int rf_zmq_tx_align(rf_zmq_tx_t* q, uint64_t ts) +{ + int64_t nsamples = (int64_t)ts - (int64_t)q->nsamples; + + if (nsamples > 0) { + rf_zmq_info(q->id, " - Detected Tx gap of %d samples.\n", nsamples); + rf_zmq_tx_baseband(q, q->zeros, (uint32_t)nsamples); + } + + return (int)nsamples; +} + +int rf_zmq_tx_baseband(rf_zmq_tx_t* q, cf_t* buffer, uint32_t nsamples) +{ + int n = SRSLTE_ERROR; + pthread_mutex_lock(&q->mutex); + + while (n < 0 && q->running) { + // Receive Transmit request + uint8_t dummy; + n = zmq_recv(q->sock, &dummy, sizeof(dummy), 0); + if (n < 0) { + if (rf_zmq_handle_error(q->id, "tx request receive")) { + n = SRSLTE_ERROR; + goto clean_exit; + } + } else { + // Tx request received successful + rf_zmq_info(q->id, " - tx request received\n"); + rf_zmq_info(q->id, " - sending %d samples (%d B)\n", nsamples, NSAMPLES2NBYTES(nsamples)); + } + + // Send base-band if request was received + if (n > 0) { + n = zmq_send(q->sock, buffer, NSAMPLES2NBYTES(nsamples), 0); + if (n < 0) { + if (rf_zmq_handle_error(q->id, "tx baseband send")) { + n = SRSLTE_ERROR; + goto clean_exit; + } + } else if (n != NSAMPLES2NBYTES(nsamples)) { + rf_zmq_error(q->id, + "[zmq] Error: transmitter expected %d bytes and sent %d. %s.\n", + NSAMPLES2NBYTES(nsamples), + n, + strerror(zmq_errno())); + n = SRSLTE_ERROR; + goto clean_exit; + } + } + + // If failed to receive request or send base-band, keep trying + } + + // Increment sample counter + q->nsamples += nsamples; + n = nsamples; + +clean_exit: + pthread_mutex_unlock(&q->mutex); + + return n; +} + +void rf_zmq_tx_close(rf_zmq_tx_t* q) +{ + q->running = false; + + if (q->zeros) { + free(q->zeros); + } + + if (q->sock) { + zmq_close(q->sock); + q->sock = NULL; + } +} \ No newline at end of file diff --git a/lib/src/phy/utils/ringbuffer.c b/lib/src/phy/utils/ringbuffer.c index 38f6d6f41..22fe6d23c 100644 --- a/lib/src/phy/utils/ringbuffer.c +++ b/lib/src/phy/utils/ringbuffer.c @@ -34,11 +34,12 @@ int srslte_ringbuffer_init(srslte_ringbuffer_t *q, int capacity) } q->active = true; q->capacity = capacity; - pthread_mutex_init(&q->mutex, NULL); - pthread_cond_init(&q->cvar, NULL); + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->write_cvar, NULL); + pthread_cond_init(&q->read_cvar, NULL); srslte_ringbuffer_reset(q); - return 0; + return 0; } void srslte_ringbuffer_free(srslte_ringbuffer_t *q) @@ -47,10 +48,11 @@ void srslte_ringbuffer_free(srslte_ringbuffer_t *q) srslte_ringbuffer_stop(q); if (q->buffer) { free(q->buffer); - q->buffer = NULL; + q->buffer = NULL; } - pthread_mutex_destroy(&q->mutex); - pthread_cond_destroy(&q->cvar); + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->write_cvar); + pthread_cond_destroy(&q->read_cvar); } } @@ -90,20 +92,64 @@ int srslte_ringbuffer_write(srslte_ringbuffer_t *q, void *p, int nof_bytes) ERROR("Buffer overrun: lost %d bytes\n", nof_bytes - w_bytes); } if (w_bytes > q->capacity - q->wpm) { - int x = q->capacity - q->wpm; - memcpy(&q->buffer[q->wpm], ptr, x); - memcpy(q->buffer, &ptr[x], w_bytes - x); + int x = q->capacity - q->wpm; + memcpy(&q->buffer[q->wpm], ptr, x); + memcpy(q->buffer, &ptr[x], w_bytes - x); } else { - memcpy(&q->buffer[q->wpm], ptr, w_bytes); + memcpy(&q->buffer[q->wpm], ptr, w_bytes); } - q->wpm += w_bytes; + q->wpm += w_bytes; if (q->wpm >= q->capacity) { - q->wpm -= q->capacity; + q->wpm -= q->capacity; } - q->count += w_bytes; - pthread_cond_broadcast(&q->cvar); + q->count += w_bytes; + pthread_cond_broadcast(&q->write_cvar); pthread_mutex_unlock(&q->mutex); - return w_bytes; + return w_bytes; +} + +int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms) +{ + int ret = SRSLTE_SUCCESS; + uint8_t* ptr = (uint8_t*)p; + int w_bytes = nof_bytes; + struct timespec towait; + struct timeval now; + + // Get current time and update timeout + gettimeofday(&now, NULL); + towait.tv_sec = now.tv_sec + timeout_ms / 1000U; + towait.tv_nsec = (now.tv_usec + 1000UL * (timeout_ms % 1000U)) * 1000UL; + pthread_mutex_lock(&q->mutex); + + // Wait to have enough space in the buffer + while (q->count + w_bytes > q->capacity && q->active && ret == SRSLTE_SUCCESS) { + ret = pthread_cond_timedwait(&q->read_cvar, &q->mutex, &towait); + } + + if (ret == ETIMEDOUT) { + ret = SRSLTE_ERROR_TIMEOUT; + } else if (!q->active) { + ret = SRSLTE_SUCCESS; + } else if (ret == SRSLTE_SUCCESS) { + if (w_bytes > q->capacity - q->wpm) { + int x = q->capacity - q->wpm; + memcpy(&q->buffer[q->wpm], ptr, x); + memcpy(q->buffer, &ptr[x], w_bytes - x); + } else { + memcpy(&q->buffer[q->wpm], ptr, w_bytes); + } + q->wpm += w_bytes; + if (q->wpm >= q->capacity) { + q->wpm -= q->capacity; + } + q->count += w_bytes; + } else { + ret = SRSLTE_ERROR; + } + pthread_cond_broadcast(&q->write_cvar); + pthread_mutex_unlock(&q->mutex); + return w_bytes; } int srslte_ringbuffer_read(srslte_ringbuffer_t *q, void *p, int nof_bytes) @@ -111,26 +157,27 @@ int srslte_ringbuffer_read(srslte_ringbuffer_t *q, void *p, int nof_bytes) uint8_t *ptr = (uint8_t*) p; pthread_mutex_lock(&q->mutex); while(q->count < nof_bytes && q->active) { - pthread_cond_wait(&q->cvar, &q->mutex); + pthread_cond_wait(&q->write_cvar, &q->mutex); } if (!q->active) { pthread_mutex_unlock(&q->mutex); return 0; } if (nof_bytes + q->rpm > q->capacity) { - int x = q->capacity - q->rpm; + int x = q->capacity - q->rpm; memcpy(ptr, &q->buffer[q->rpm], x); memcpy(&ptr[x], q->buffer, nof_bytes - x); - } else { + } else { memcpy(ptr, &q->buffer[q->rpm], nof_bytes); } - q->rpm += nof_bytes; + q->rpm += nof_bytes; if (q->rpm >= q->capacity) { - q->rpm -= q->capacity; + q->rpm -= q->capacity; } - q->count -= nof_bytes; + q->count -= nof_bytes; + pthread_cond_broadcast(&q->read_cvar); pthread_mutex_unlock(&q->mutex); - return nof_bytes; + return nof_bytes; } int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms) @@ -150,7 +197,7 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, // Wait for having enough samples while (q->count < nof_bytes && q->active && ret == SRSLTE_SUCCESS) { - ret = pthread_cond_timedwait(&q->cvar, &q->mutex, &towait); + ret = pthread_cond_timedwait(&q->write_cvar, &q->mutex, &towait); } if (ret == ETIMEDOUT) { @@ -176,6 +223,7 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, } // Unlock mutex + pthread_cond_broadcast(&q->read_cvar); pthread_mutex_unlock(&q->mutex); return ret; @@ -184,7 +232,8 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, void srslte_ringbuffer_stop(srslte_ringbuffer_t *q) { pthread_mutex_lock(&q->mutex); q->active = false; - pthread_cond_broadcast(&q->cvar); + pthread_cond_broadcast(&q->write_cvar); + pthread_cond_broadcast(&q->read_cvar); pthread_mutex_unlock(&q->mutex); } @@ -195,7 +244,7 @@ int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, f pthread_mutex_lock(&q->mutex); while (q->count < nof_bytes && q->active) { - pthread_cond_wait(&q->cvar, &q->mutex); + pthread_cond_wait(&q->write_cvar, &q->mutex); } if (!q->active) { pthread_mutex_unlock(&q->mutex); @@ -218,6 +267,7 @@ int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, f q->rpm -= q->capacity; } q->count -= nof_bytes; + pthread_cond_broadcast(&q->read_cvar); pthread_mutex_unlock(&q->mutex); return nof_samples; } @@ -230,7 +280,7 @@ int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes /* Wait until enough data is in the buffer */ while (q->count < nof_bytes && q->active) { - pthread_cond_wait(&q->cvar, &q->mutex); + pthread_cond_wait(&q->write_cvar, &q->mutex); } if (!q->active) { @@ -245,6 +295,7 @@ int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes q->rpm -= q->capacity; } } + pthread_cond_broadcast(&q->read_cvar); pthread_mutex_unlock(&q->mutex); return ret; }