ZMQ: Split Tx and Rx, bug fixes and clean up

master
Xavier Arteaga 5 years ago committed by Xavier Arteaga
parent cf550f6e56
commit 125f1e7282

@ -35,7 +35,8 @@ typedef struct {
int wpm;
int rpm;
pthread_mutex_t mutex;
pthread_cond_t cvar;
pthread_cond_t write_cvar;
pthread_cond_t read_cvar;
} srslte_ringbuffer_t;
#ifdef __cplusplus
@ -54,6 +55,8 @@ SRSLTE_API int srslte_ringbuffer_space(srslte_ringbuffer_t *q);
SRSLTE_API int srslte_ringbuffer_write(srslte_ringbuffer_t* q, void* ptr, int nof_bytes);
SRSLTE_API int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* ptr, int nof_bytes, uint32_t timeout_ms);
SRSLTE_API int srslte_ringbuffer_read(srslte_ringbuffer_t* q, void* ptr, int nof_bytes);
SRSLTE_API int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms);

@ -44,7 +44,7 @@ if(RF_FOUND)
if (ZEROMQ_FOUND)
add_definitions(-DENABLE_ZEROMQ)
list(APPEND SOURCES_RF rf_zmq_imp.c)
list(APPEND SOURCES_RF rf_zmq_imp.c rf_zmq_imp_tx.c rf_zmq_imp_rx.c)
endif (ZEROMQ_FOUND)
add_library(srslte_rf SHARED ${SOURCES_RF})

@ -21,17 +21,14 @@
#include "rf_zmq_imp.h"
#include "rf_helper.h"
#include "rf_zmq_imp_trx.h"
#include <math.h>
#include <signal.h>
#include <srslte/phy/common/phy_common.h>
#include <srslte/phy/common/timestamp.h>
#include <srslte/phy/rf/rf.h>
#include <srslte/phy/utils/ringbuffer.h>
#include <srslte/phy/utils/vector.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <zconf.h>
#include <zmq.h>
typedef struct {
@ -39,11 +36,10 @@ typedef struct {
char* devname;
srslte_rf_info_t info;
uint32_t nof_channels;
bool running;
// RF State
double srate; // radio rate configured by upper layers
double base_srate;
uint32_t srate; // radio rate configured by upper layers
uint32_t base_srate;
uint32_t decim_factor; // decimation factor between base_srate used on transport on radio's rate
double rx_gain;
double tx_freq;
@ -52,8 +48,8 @@ typedef struct {
// Server
void* context;
void* transmitter;
void* receiver;
rf_zmq_tx_t transmitter;
rf_zmq_rx_t receiver;
char rx_port[PARAM_LEN];
char tx_port[PARAM_LEN];
@ -61,31 +57,14 @@ typedef struct {
// Various sample buffers
cf_t* buffer_decimation;
cf_t* buffer_rx;
cf_t* buffer_tx;
// Rx and Tx timestamps
// Rx timestamp
uint64_t next_rx_ts;
uint64_t next_tx_ts;
// Ringbuffer
srslte_ringbuffer_t rx_ringbuffer;
pthread_t thread;
pthread_mutex_t mutex;
pthread_mutex_t mutex_tx;
} rf_zmq_handler_t;
/* Definitions */
#define VERBOSE 0
#define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t))
#define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t))
#define BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz
#define ZMQ_TIMEOUT_MS 1000
#define ZMQ_MAXTRIALS 3
#define ZMQ_TRX_MARGIN_MS 1
void update_rates(rf_zmq_handler_t* handler, double srate);
/*
@ -97,14 +76,14 @@ const char zmq_devname[4] = "zmq";
* Static methods
*/
static inline void rf_zmq_info(rf_zmq_handler_t* handler, const char* format, ...)
void rf_zmq_info(char* id, const char* format, ...)
{
#if VERBOSE
struct timeval t;
gettimeofday(&t, NULL);
va_list args;
va_start(args, format);
printf("[%s@%02ld.%06ld] ", handler ? handler->id : "zmq", t.tv_sec % 10, t.tv_usec);
printf("[%s@%02ld.%06ld] ", id ? id : "zmq", t.tv_sec % 10, t.tv_usec);
vprintf(format, args);
va_end(args);
#else /* VERBOSE */
@ -112,7 +91,7 @@ static inline void rf_zmq_info(rf_zmq_handler_t* handler, const char* format, ..
#endif /* VERBOSE */
}
static void rf_zmq_error(rf_zmq_handler_t* handler, const char* format, ...)
void rf_zmq_error(char* id, const char* format, ...)
{
struct timeval t;
gettimeofday(&t, NULL);
@ -129,14 +108,12 @@ static inline int update_ts(void* h, uint64_t* ts, int nsamples, const char* dir
if (h && nsamples > 0) {
rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h;
pthread_mutex_lock(&handler->mutex);
(*ts) += nsamples;
pthread_mutex_unlock(&handler->mutex);
srslte_timestamp_t _ts = {};
srslte_timestamp_init_uint64(&_ts, *ts, handler->base_srate);
rf_zmq_info(handler, " -> next %s time after %d samples: %d + %.3f\n", dir, nsamples, _ts.full_secs,
_ts.frac_secs);
rf_zmq_info(
handler->id, " -> next %s time after %d samples: %d + %.3f\n", dir, nsamples, _ts.full_secs, _ts.frac_secs);
ret = SRSLTE_SUCCESS;
}
@ -144,7 +121,7 @@ static inline int update_ts(void* h, uint64_t* ts, int nsamples, const char* dir
return ret;
}
static inline int rf_zmq_handle_error(rf_zmq_handler_t* handler, const char* text)
int rf_zmq_handle_error(char* id, const char* text)
{
int ret = SRSLTE_SUCCESS;
@ -154,127 +131,18 @@ static inline int rf_zmq_handle_error(rf_zmq_handler_t* handler, const char* tex
// handled errors
case EFSM:
case EAGAIN:
rf_zmq_info(handler, "Warning %s: %s\n", text, zmq_strerror(err));
rf_zmq_info(id, "Warning %s: %s\n", text, zmq_strerror(err));
break;
// critical non-handled errors
// critical non-handled errors
default:
ret = SRSLTE_ERROR;
rf_zmq_error(handler, "Error %s: %s\n", text, zmq_strerror(err));
rf_zmq_error(id, "Error %s: %s\n", text, zmq_strerror(err));
}
return ret;
}
static int rf_zmq_tx(rf_zmq_handler_t* handler, uint8_t* buffer, uint32_t nbytes)
{
int n, ntrials;
pthread_mutex_lock(&handler->mutex_tx);
// Receive Transmit request
uint8_t dummy;
for (ntrials = 0, n = -1; ntrials < ZMQ_MAXTRIALS && n < 0 && handler->running; ntrials++) {
n = zmq_recv(handler->transmitter, &dummy, sizeof(dummy), 0);
if (n < 0) {
if (rf_zmq_handle_error(handler, "tx request receive")) {
n = SRSLTE_ERROR;
goto clean_exit;
}
} else {
rf_zmq_info(handler, " - tx request received\n");
rf_zmq_info(handler, " - sending %d samples (%d B)\n", NBYTES2NSAMPLES(nbytes), nbytes);
}
}
// Send zeros
for (ntrials = 0, n = -1; ntrials < ZMQ_MAXTRIALS && n < 0 && handler->running; ntrials++) {
n = zmq_send(handler->transmitter, buffer, nbytes, 0);
if (n < 0) {
if (rf_zmq_handle_error(handler, "tx baseband send")) {
n = SRSLTE_ERROR;
goto clean_exit;
}
} else if (n != nbytes) {
rf_zmq_error(handler, "[zmq] Error: transmitter expected %d bytes and sent %d. %s.\n", nbytes, n,
strerror(zmq_errno()));
n = SRSLTE_ERROR;
goto clean_exit;
}
}
// update both tx timestamp and ringbuffer
update_ts(handler, &handler->next_tx_ts, NBYTES2NSAMPLES(nbytes), "tx");
clean_exit:
pthread_mutex_unlock(&handler->mutex_tx);
return (n > 0) ? nbytes : SRSLTE_ERROR;
}
static int rf_zmq_tx_zeros(rf_zmq_handler_t* handler, int32_t nsamples)
{
rf_zmq_info(handler, "Tx %d zero samples\n", nsamples);
if (NSAMPLES2NBYTES(nsamples) > ZMQ_MAX_RX_BYTES) {
// can't transmit zeros, buffer too small
fprintf(stderr, "[zmq] Error: zero buffer too small (%ld) to transmit %ld samples\n", ZMQ_MAX_RX_BYTES,
NSAMPLES2NBYTES(nsamples));
return SRSLTE_ERROR;
}
bzero(handler->buffer_tx, NSAMPLES2NBYTES(nsamples));
return rf_zmq_tx(handler, (uint8_t*)handler->buffer_tx, NSAMPLES2NBYTES(nsamples));
}
static void* rf_zmq_async_rx_thread(void* h)
{
rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h;
while (handler->receiver && handler->running) {
int n = SRSLTE_ERROR;
uint8_t dummy = 0xFF;
int ntrials = 0;
rf_zmq_info(handler, "-- ASYNC RX wait...\n");
// Send request
for (ntrials = 0; n < 0 && ntrials < ZMQ_MAXTRIALS && handler->running; ntrials++) {
rf_zmq_info(handler, " - tx'ing rx request\n");
n = zmq_send(handler->receiver, &dummy, sizeof(dummy), 0);
if (n < 0) {
if (rf_zmq_handle_error(handler, "synchronous rx request send")) {
return NULL;
}
}
}
// Receive baseband
for (n = (n < 0) ? 0 : -1; n < 0 && handler->running;) {
n = zmq_recv(handler->receiver, handler->buffer_rx, BUFFER_SIZE, 0);
if (n == -1) {
if (rf_zmq_handle_error(handler, "asynchronous rx baseband receive")) {
return NULL;
}
} else if (n > BUFFER_SIZE) {
fprintf(stderr, "[zmq] Error: receiver expected <= %ld bytes and received %d at channel %d.\n", BUFFER_SIZE, n,
0);
return NULL;
}
}
// Write received data in buffer
if (n > 0) {
if (srslte_ringbuffer_write(&handler->rx_ringbuffer, handler->buffer_rx, n) != n) {
rf_zmq_error(handler, "[zmq] error writing asynchronous ring buffer...\n");
}
rf_zmq_info(handler, " - received %d baseband samples (%d B). %d samples available.\n", NBYTES2NSAMPLES(n), n,
srslte_ringbuffer_status(&handler->rx_ringbuffer));
}
}
return NULL;
}
/*
* Public methods
*/
@ -342,7 +210,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels)
}
bzero(handler, sizeof(rf_zmq_handler_t));
*h = handler;
handler->base_srate = 23.04e6; // Sample rate for 100 PRB cell
handler->base_srate = ZMQ_BASERATE_DEFAULT_HZ; // Sample rate for 100 PRB cell
handler->rx_gain = 0.0;
handler->info.max_rx_gain = +INFINITY;
handler->info.min_rx_gain = -INFINITY;
@ -350,9 +218,6 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels)
handler->info.min_tx_gain = -INFINITY;
strcpy(handler->id, "zmq\0");
pthread_mutex_init(&handler->mutex, NULL);
pthread_mutex_init(&handler->mutex_tx, NULL);
// parse args
if (args) {
// base_srate
@ -363,7 +228,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels)
if (config_ptr) {
copy_subdev_string(config_str, config_ptr + strlen(config_arg));
printf("Using base rate=%s\n", config_str);
handler->base_srate = strtod(config_str, NULL);
handler->base_srate = (uint32_t)strtod(config_str, NULL);
remove_substring(args, config_arg);
remove_substring(args, config_str);
}
@ -424,118 +289,44 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels)
goto clean_exit;
}
// initialize transmitter
if (strlen(handler->tx_port) != 0) {
// Initialise transmitter
handler->transmitter = zmq_socket(handler->context, ZMQ_REP);
if (!handler->transmitter) {
fprintf(stderr, "[zmq] Error: creating transmitter socket\n");
goto clean_exit;
}
rf_zmq_info(handler, "Binding transmitter: %s\n", handler->tx_port);
ret = zmq_bind(handler->transmitter, handler->tx_port);
if (ret) {
fprintf(stderr, "Error: connecting transmitter socket: %s\n", zmq_strerror(zmq_errno()));
goto clean_exit;
}
#if ZMQ_TIMEOUT_MS
// set recv timeout for transmitter
int timeout = ZMQ_TIMEOUT_MS;
if (zmq_setsockopt(handler->transmitter, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(handler->transmitter, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
if (rf_zmq_tx_open(&handler->transmitter, handler->id, handler->context, handler->tx_port) != SRSLTE_SUCCESS) {
fprintf(stderr, "[zmq] Error: opening transmitter\n");
goto clean_exit;
}
// set linger timeout for transmitter
timeout = 0;
if (zmq_setsockopt(handler->transmitter, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on tx socket\n");
}
#endif
} else {
fprintf(stdout, "[zmq] %s Tx port not specified. Disabling transmitter.\n", handler->id);
}
// initialize receiver
if (strlen(handler->rx_port) != 0) {
handler->receiver = zmq_socket(handler->context, ZMQ_REQ);
if (!handler->receiver) {
fprintf(stderr, "[zmq] Error: creating receiver socket\n");
goto clean_exit;
}
rf_zmq_info(handler, "Connecting receiver: %s\n", handler->rx_port);
ret = zmq_connect(handler->receiver, handler->rx_port);
if (ret) {
fprintf(stderr, "Error: binding receiver socket: %s\n", zmq_strerror(zmq_errno()));
goto clean_exit;
}
#if ZMQ_TIMEOUT_MS
// set recv timeout for receiver
int timeout = ZMQ_TIMEOUT_MS;
if (zmq_setsockopt(handler->receiver, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(handler->receiver, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
timeout = 0;
// set linger timeout for receiver
if (zmq_setsockopt(handler->receiver, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on rx socket\n");
}
#endif
// init rx ringbuffer
if (srslte_ringbuffer_init(&handler->rx_ringbuffer, BUFFER_SIZE)) {
fprintf(stderr, "Error, initiating rx ringbuffer\n");
if (rf_zmq_rx_open(&handler->receiver, handler->id, handler->context, handler->rx_port) != SRSLTE_SUCCESS) {
fprintf(stderr, "[zmq] Error: opening receiver\n");
goto clean_exit;
}
} else {
fprintf(stdout, "[zmq] %s Rx port not specified. Disabling receiver.\n", handler->id);
}
if (handler->transmitter == NULL && handler->receiver == NULL) {
if (!handler->transmitter.running && !handler->receiver.running) {
fprintf(stderr, "[zmq] Error: Neither Tx port nor Rx port specified.\n");
goto clean_exit;
}
// Create decimation and overflow buffer
handler->buffer_decimation = srslte_vec_malloc(ZMQ_MAX_RX_BYTES);
handler->buffer_decimation = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE);
if (!handler->buffer_decimation) {
fprintf(stderr, "Error: allocating decimation buffer\n");
goto clean_exit;
}
handler->buffer_tx = srslte_vec_malloc(ZMQ_MAX_RX_BYTES);
handler->buffer_tx = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE);
if (!handler->buffer_tx) {
fprintf(stderr, "Error: allocating tx buffer\n");
goto clean_exit;
}
handler->buffer_rx = srslte_vec_malloc(ZMQ_MAX_RX_BYTES);
if (!handler->buffer_rx) {
fprintf(stderr, "Error: allocating rx buffer\n");
goto clean_exit;
}
handler->running = true;
if (handler->receiver) {
pthread_create(&handler->thread, NULL, rf_zmq_async_rx_thread, handler);
}
ret = SRSLTE_SUCCESS;
clean_exit:
@ -552,23 +343,15 @@ int rf_zmq_close(void* h)
rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h;
rf_zmq_info(handler, "Closing %s ...\n", handler->id);
rf_zmq_info(handler->id, "Closing ...\n");
handler->running = false;
if (handler->thread) {
pthread_join(handler->thread, NULL);
pthread_detach(handler->thread);
}
if (handler->transmitter) {
zmq_close(handler->transmitter);
handler->transmitter = NULL;
}
if (handler->receiver) {
zmq_close(handler->receiver);
handler->receiver = NULL;
srslte_ringbuffer_free(&handler->rx_ringbuffer);
}
rf_zmq_tx_close(&handler->transmitter);
rf_zmq_rx_close(&handler->receiver);
if (handler->context) {
zmq_ctx_destroy(handler->context);
@ -582,13 +365,6 @@ int rf_zmq_close(void* h)
free(handler->buffer_tx);
}
if (handler->buffer_rx) {
free(handler->buffer_rx);
}
pthread_mutex_destroy(&handler->mutex);
pthread_mutex_destroy(&handler->mutex_tx);
// Free all
free(handler);
@ -600,7 +376,7 @@ void update_rates(rf_zmq_handler_t* handler, double srate)
if (handler) {
// Decimation must be full integer
if (((uint64_t)handler->base_srate % (uint64_t)srate) == 0) {
handler->srate = srate;
handler->srate = (uint32_t)srate;
handler->decim_factor = handler->base_srate / handler->srate;
} else {
fprintf(stderr, "Error: couldn't update sample rate. %.2f is not divisible by %.2f\n", srate / 1e6,
@ -724,9 +500,8 @@ int rf_zmq_recv_with_time_multi(
uint32_t nbytes = NSAMPLES2NBYTES(nsamples * handler->decim_factor);
uint32_t nsamples_baserate = nsamples * handler->decim_factor;
uint32_t nbytes_baserate = NSAMPLES2NBYTES(nsamples_baserate);
rf_zmq_info(handler, "Rx %d samples (%d B)\n", nsamples, nbytes);
rf_zmq_info(handler->id, "Rx %d samples (%d B)\n", nsamples, nbytes);
// set timestamp for this reception
if (secs != NULL && frac_secs != NULL) {
@ -737,48 +512,55 @@ int rf_zmq_recv_with_time_multi(
}
// return if receiver is turned off
if (handler->receiver == NULL) {
if (!handler->receiver.running) {
update_ts(handler, &handler->next_rx_ts, nsamples_baserate, "rx");
return nsamples;
}
// Check available buffer size
if (nbytes > ZMQ_MAX_RX_BYTES) {
fprintf(stderr, "[zmq] Error: Trying to receive %d B but buffer is only %ld B at channel %d.\n", nbytes,
ZMQ_MAX_RX_BYTES, 0);
if (nbytes > ZMQ_MAX_BUFFER_SIZE) {
fprintf(stderr,
"[zmq] Error: Trying to receive %d B but buffer is only %ld B at channel %d.\n",
nbytes,
ZMQ_MAX_BUFFER_SIZE,
0);
goto clean_exit;
}
// receive samples
srslte_timestamp_t ts_tx = {}, ts_rx = {};
srslte_timestamp_init_uint64(&ts_tx, handler->next_tx_ts, handler->base_srate);
srslte_timestamp_init_uint64(&ts_tx, handler->transmitter.nsamples, handler->base_srate);
srslte_timestamp_init_uint64(&ts_rx, handler->next_rx_ts, handler->base_srate);
rf_zmq_info(handler, " - next rx time: %d + %.3f\n", ts_rx.full_secs, ts_rx.frac_secs);
rf_zmq_info(handler, " - next tx time: %d + %.3f\n", ts_tx.full_secs, ts_tx.frac_secs);
rf_zmq_info(handler->id, " - next rx time: %d + %.3f\n", ts_rx.full_secs, ts_rx.frac_secs);
rf_zmq_info(handler->id, " - next tx time: %d + %.3f\n", ts_tx.full_secs, ts_tx.frac_secs);
// check for tx gap if we're also transmitting on this radio
if (handler->transmitter) {
uint32_t margin_nsamples =
(uint32_t)(handler->tx_used ? (0) : (nsamples_baserate + ZMQ_TRX_MARGIN_MS * handler->base_srate / 1000.0));
int num_tx_gap_samples_base_rate = (int)(handler->next_rx_ts - handler->next_tx_ts + margin_nsamples);
if (num_tx_gap_samples_base_rate > 0) {
rf_zmq_info(handler, " - tx_gap of %d samples\n", num_tx_gap_samples_base_rate);
// Transmit zero samples
rf_zmq_tx_zeros(handler, num_tx_gap_samples_base_rate);
} else {
rf_zmq_info(handler, " - no tx gap detected\n");
}
if (handler->transmitter.running) {
rf_zmq_tx_align(&handler->transmitter, handler->next_rx_ts + nsamples_baserate);
}
usleep((1000000 * nsamples) / handler->base_srate);
// copy from rx buffer as many samples as requested into provided buffer
cf_t* ptr = (handler->decim_factor != 1) ? handler->buffer_decimation : data[0];
if (srslte_ringbuffer_read(&handler->rx_ringbuffer, ptr, nbytes_baserate) != nbytes) {
fprintf(stderr, "Error: reading from rx ringbuffer.\n");
goto clean_exit;
int32_t count = 0;
while (count < nsamples_baserate && handler->receiver.running) {
int32_t n = rf_zmq_rx_baseband(&handler->receiver, &ptr[count], nsamples_baserate);
if (n > 0) {
// No error
count += n;
} else if (n == SRSLTE_ERROR_TIMEOUT) {
// Timeout, do nothing, keep going
} else if (n > 0) {
// Other error, exit
fprintf(stderr, "Error: receiving data.\n");
goto clean_exit;
}
}
rf_zmq_info(handler, " - read %d samples. %d samples available\n", NBYTES2NSAMPLES(nbytes),
NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->rx_ringbuffer)));
rf_zmq_info(handler->id,
" - read %d samples. %d samples available\n",
NBYTES2NSAMPLES(nbytes),
NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->receiver.ringbuffer)));
// decimate if needed
if (handler->decim_factor != 1) {
@ -793,8 +575,11 @@ int rf_zmq_recv_with_time_multi(
}
dst[i] = avg;
}
rf_zmq_info(handler, " - re-adjust bytes due to %dx decimation %d --> %d samples)\n", handler->decim_factor,
nsamples_baserate, nsamples);
rf_zmq_info(handler->id,
" - re-adjust bytes due to %dx decimation %d --> %d samples)\n",
handler->decim_factor,
nsamples_baserate,
nsamples);
}
// update rx time
@ -843,49 +628,47 @@ int rf_zmq_send_timed_multi(void* h,
uint32_t nsamples_baseband = nsamples * handler->decim_factor;
uint32_t nbytes_baseband = NSAMPLES2NBYTES(nsamples_baseband);
if (nbytes_baseband > ZMQ_MAX_RX_BYTES) {
fprintf(stderr, "Error: trying to transmit too many samples (%d > %ld).\n", nbytes, ZMQ_MAX_RX_BYTES);
if (nbytes_baseband > ZMQ_MAX_BUFFER_SIZE) {
fprintf(stderr, "Error: trying to transmit too many samples (%d > %ld).\n", nbytes, ZMQ_MAX_BUFFER_SIZE);
goto clean_exit;
}
rf_zmq_info(handler, "Tx %d samples (%d B)\n", nsamples, nbytes);
rf_zmq_info(handler->id, "Tx %d samples (%d B)\n", nsamples, nbytes);
// return if transmitter is switched off
if (handler->tx_port == 0) {
if (strlen(handler->tx_port) == 0) {
return SRSLTE_SUCCESS;
}
// check if this is a tx in the future
if (has_time_spec) {
rf_zmq_info(handler, " - tx time: %d + %.3f\n", secs, frac_secs);
rf_zmq_info(handler->id, " - tx time: %d + %.3f\n", secs, frac_secs);
srslte_timestamp_t ts = {};
srslte_timestamp_init(&ts, secs, frac_secs);
uint64_t tx_ts = srslte_timestamp_uint64(&ts, handler->base_srate);
int32_t num_tx_gap_samples = (int32_t)((int64_t)tx_ts - (int64_t)handler->next_tx_ts);
int num_tx_gap_samples = rf_zmq_tx_align(&handler->transmitter, tx_ts);
if (num_tx_gap_samples < 0) {
fprintf(stderr, "[zmq] Error: tx time is %.3f ms in the past (%ld < %ld)\n",
-1000.0 * num_tx_gap_samples / handler->base_srate, tx_ts, handler->next_tx_ts);
fprintf(stderr,
"[zmq] Error: tx time is %.3f ms in the past (%ld < %ld)\n",
-1000.0 * num_tx_gap_samples / handler->base_srate,
tx_ts,
handler->transmitter.nsamples);
goto clean_exit;
} else if (num_tx_gap_samples > 0) {
rf_zmq_info(handler, " - tx gap of %d baseband samples\n", num_tx_gap_samples);
// send zero samples
int n = rf_zmq_tx_zeros(handler, num_tx_gap_samples);
if (n == -1) {
goto clean_exit;
}
} else {
rf_zmq_info(handler, " - no tx gap detected\n");
}
}
// Select buffer pointer depending on interpolation
cf_t* buf = (handler->decim_factor != 1) ? handler->buffer_tx : data[0];
// Interpolate if required
if (handler->decim_factor != 1) {
rf_zmq_info(handler, " - re-adjust bytes due to %dx interpolation %d --> %d samples)\n", handler->decim_factor,
nsamples, nsamples_baseband);
rf_zmq_info(handler->id,
" - re-adjust bytes due to %dx interpolation %d --> %d samples)\n",
handler->decim_factor,
nsamples,
nsamples_baseband);
int n = 0;
cf_t* src = data[0];
@ -903,8 +686,8 @@ int rf_zmq_send_timed_multi(void* h,
}
}
// send baseband samples
int n = rf_zmq_tx(handler, (uint8_t*)buf, nbytes_baseband);
// Send base-band samples
int n = rf_zmq_tx_baseband(&handler->transmitter, buf, nsamples_baseband);
if (n == SRSLTE_ERROR) {
goto clean_exit;
}

@ -28,8 +28,6 @@
#define DEVNAME_ZMQ "ZeroMQ"
#define PARAM_LEN (128)
#define PARAM_LEN_SHORT (PARAM_LEN / 2)
#define ZMQ_MAX_RX_BYTES \
(5 * SRSLTE_SF_LEN_MAX * sizeof(cf_t)) // Five subframes at max LTE rate using default FFT-length
SRSLTE_API int rf_zmq_open(char* args, void** handler);

@ -0,0 +1,198 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2019 Software Radio Systems Limited
*
* \section LICENSE
*
* This file is part of the srsLTE library.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include "rf_zmq_imp_trx.h"
#include <srslte/phy/utils/vector.h>
#include <stdlib.h>
#include <string.h>
#include <zconf.h>
#include <zmq.h>
static void* rf_zmq_async_rx_thread(void* h)
{
rf_zmq_rx_t* q = (rf_zmq_rx_t*)h;
while (q->sock && q->running) {
int nbytes = 0;
int n = SRSLTE_ERROR;
uint8_t dummy = 0xFF;
rf_zmq_info(q->id, "-- ASYNC RX wait...\n");
// Send request
while (n < 0 && q->running) {
rf_zmq_info(q->id, " - tx'ing rx request\n");
n = zmq_send(q->sock, &dummy, sizeof(dummy), 0);
if (n < 0) {
if (rf_zmq_handle_error(q->id, "synchronous rx request send")) {
return NULL;
}
}
}
// Receive baseband
for (n = (n < 0) ? 0 : -1; n < 0 && q->running;) {
n = zmq_recv(q->sock, q->temp_buffer, ZMQ_MAX_BUFFER_SIZE, 0);
if (n == -1) {
if (rf_zmq_handle_error(q->id, "asynchronous rx baseband receive")) {
return NULL;
}
} else if (n > ZMQ_MAX_BUFFER_SIZE) {
fprintf(stderr,
"[zmq] Error: receiver expected <= %ld bytes and received %d at channel %d.\n",
ZMQ_MAX_BUFFER_SIZE,
n,
0);
return NULL;
} else {
nbytes = n;
}
}
// Write received data in buffer
if (nbytes > 0) {
n = -1;
// Try to write in ring buffer
while (n < 0 && q->running) {
n = srslte_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, ZMQ_TIMEOUT_MS);
}
// Check write
if (nbytes == n) {
rf_zmq_info(q->id,
" - received %d baseband samples (%d B). %d samples available.\n",
NBYTES2NSAMPLES(n),
n,
NBYTES2NSAMPLES(srslte_ringbuffer_status(&q->ringbuffer)));
}
}
}
return NULL;
}
int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args)
{
int ret = SRSLTE_ERROR;
if (q) {
// Copy id
strncpy(q->id, id, 16);
// Zero object
bzero(q, sizeof(rf_zmq_tx_t));
// Create socket
q->sock = zmq_socket(zmq_ctx, ZMQ_REQ);
if (!q->sock) {
fprintf(stderr, "[zmq] Error: creating transmitter socket\n");
goto clean_exit;
}
rf_zmq_info(q->id, "Connecting receiver: %s\n", sock_args);
ret = zmq_connect(q->sock, sock_args);
if (ret) {
fprintf(stderr, "Error: connecting receiver socket: %s\n", zmq_strerror(zmq_errno()));
goto clean_exit;
}
#if ZMQ_TIMEOUT_MS
int timeout = ZMQ_TIMEOUT_MS;
if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on rx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on rx socket\n");
goto clean_exit;
}
timeout = 0;
if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on rx socket\n");
goto clean_exit;
}
#endif
if (srslte_ringbuffer_init(&q->ringbuffer, ZMQ_MAX_BUFFER_SIZE)) {
fprintf(stderr, "Error: initiating ringbuffer\n");
goto clean_exit;
}
q->temp_buffer = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE);
if (!q->temp_buffer) {
fprintf(stderr, "Error: allocating rx buffer\n");
goto clean_exit;
}
if (pthread_mutex_init(&q->mutex, NULL)) {
fprintf(stderr, "Error: creating mutex\n");
goto clean_exit;
}
if (pthread_create(&q->thread, NULL, rf_zmq_async_rx_thread, q)) {
fprintf(stderr, "Error: creating thread\n");
goto clean_exit;
}
q->running = true;
ret = SRSLTE_SUCCESS;
}
clean_exit:
return ret;
}
int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples)
{
return srslte_ringbuffer_read_timed(&q->ringbuffer, buffer, NSAMPLES2NBYTES(nsamples), ZMQ_TIMEOUT_MS);
}
void rf_zmq_rx_close(rf_zmq_rx_t* q)
{
rf_zmq_info(q->id, "Closing ...\n");
q->running = false;
if (q->thread) {
pthread_join(q->thread, NULL);
pthread_detach(q->thread);
}
srslte_ringbuffer_free(&q->ringbuffer);
if (q->temp_buffer) {
free(q->temp_buffer);
}
if (q->sock) {
zmq_close(q->sock);
q->sock = NULL;
}
}

@ -0,0 +1,91 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2019 Software Radio Systems Limited
*
* \section LICENSE
*
* This file is part of the srsLTE library.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#ifndef SRSLTE_RF_ZMQ_IMP_TRX_H
#define SRSLTE_RF_ZMQ_IMP_TRX_H
#include <pthread.h>
#include <srslte/phy/utils/ringbuffer.h>
#include <stdbool.h>
/* Definitions */
#define VERBOSE (0)
#define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t))
#define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t))
#define ZMQ_MAX_BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz
#define ZMQ_TIMEOUT_MS (1000)
#define ZMQ_BASERATE_DEFAULT_HZ (23040000)
typedef struct {
char id[16];
void* sock;
uint64_t nsamples;
bool running;
pthread_mutex_t mutex;
cf_t* zeros;
} rf_zmq_tx_t;
typedef struct {
char id[16];
void* sock;
uint64_t nsamples;
bool running;
pthread_t thread;
pthread_mutex_t mutex;
srslte_ringbuffer_t ringbuffer;
cf_t* temp_buffer;
} rf_zmq_rx_t;
/*
* Common functions
*/
SRSLTE_API void rf_zmq_info(char* id, const char* format, ...);
SRSLTE_API void rf_zmq_error(char* id, const char* format, ...);
SRSLTE_API int rf_zmq_handle_error(char* id, const char* text);
/*
* Transmitter functions
*/
SRSLTE_API int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_args);
SRSLTE_API int rf_zmq_tx_align(rf_zmq_tx_t* q, uint64_t ts);
SRSLTE_API int rf_zmq_tx_baseband(rf_zmq_tx_t* q, cf_t* buffer, uint32_t nsamples);
SRSLTE_API void rf_zmq_tx_close(rf_zmq_tx_t* q);
/*
* Receiver functions
*/
SRSLTE_API int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args);
SRSLTE_API int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples);
SRSLTE_API void rf_zmq_rx_close(rf_zmq_rx_t* q);
#endif // SRSLTE_RF_ZMQ_IMP_TRX_H

@ -0,0 +1,176 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2019 Software Radio Systems Limited
*
* \section LICENSE
*
* This file is part of the srsLTE library.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include "rf_zmq_imp_trx.h"
#include <srslte/config.h>
#include <srslte/phy/utils/vector.h>
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_args)
{
int ret = SRSLTE_ERROR;
if (q) {
// Copy id
strncpy(q->id, id, 16);
// Zero object
bzero(q, sizeof(rf_zmq_tx_t));
// Create socket
q->sock = zmq_socket(zmq_ctx, ZMQ_REP);
if (!q->sock) {
fprintf(stderr, "[zmq] Error: creating transmitter socket\n");
goto clean_exit;
}
rf_zmq_info(q->id, "Binding transmitter: %s\n", sock_args);
ret = zmq_bind(q->sock, sock_args);
if (ret) {
fprintf(stderr, "Error: connecting transmitter socket: %s\n", zmq_strerror(zmq_errno()));
goto clean_exit;
}
#if ZMQ_TIMEOUT_MS
int timeout = ZMQ_TIMEOUT_MS;
if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
timeout = 0;
if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on tx socket\n");
goto clean_exit;
}
#endif
if (pthread_mutex_init(&q->mutex, NULL)) {
fprintf(stderr, "Error: creating mutex\n");
goto clean_exit;
}
q->zeros = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE);
if (!q->zeros) {
fprintf(stderr, "Error: allocating zeros\n");
goto clean_exit;
}
bzero(q->zeros, ZMQ_MAX_BUFFER_SIZE);
q->running = true;
ret = SRSLTE_SUCCESS;
}
clean_exit:
return ret;
}
int rf_zmq_tx_align(rf_zmq_tx_t* q, uint64_t ts)
{
int64_t nsamples = (int64_t)ts - (int64_t)q->nsamples;
if (nsamples > 0) {
rf_zmq_info(q->id, " - Detected Tx gap of %d samples.\n", nsamples);
rf_zmq_tx_baseband(q, q->zeros, (uint32_t)nsamples);
}
return (int)nsamples;
}
int rf_zmq_tx_baseband(rf_zmq_tx_t* q, cf_t* buffer, uint32_t nsamples)
{
int n = SRSLTE_ERROR;
pthread_mutex_lock(&q->mutex);
while (n < 0 && q->running) {
// Receive Transmit request
uint8_t dummy;
n = zmq_recv(q->sock, &dummy, sizeof(dummy), 0);
if (n < 0) {
if (rf_zmq_handle_error(q->id, "tx request receive")) {
n = SRSLTE_ERROR;
goto clean_exit;
}
} else {
// Tx request received successful
rf_zmq_info(q->id, " - tx request received\n");
rf_zmq_info(q->id, " - sending %d samples (%d B)\n", nsamples, NSAMPLES2NBYTES(nsamples));
}
// Send base-band if request was received
if (n > 0) {
n = zmq_send(q->sock, buffer, NSAMPLES2NBYTES(nsamples), 0);
if (n < 0) {
if (rf_zmq_handle_error(q->id, "tx baseband send")) {
n = SRSLTE_ERROR;
goto clean_exit;
}
} else if (n != NSAMPLES2NBYTES(nsamples)) {
rf_zmq_error(q->id,
"[zmq] Error: transmitter expected %d bytes and sent %d. %s.\n",
NSAMPLES2NBYTES(nsamples),
n,
strerror(zmq_errno()));
n = SRSLTE_ERROR;
goto clean_exit;
}
}
// If failed to receive request or send base-band, keep trying
}
// Increment sample counter
q->nsamples += nsamples;
n = nsamples;
clean_exit:
pthread_mutex_unlock(&q->mutex);
return n;
}
void rf_zmq_tx_close(rf_zmq_tx_t* q)
{
q->running = false;
if (q->zeros) {
free(q->zeros);
}
if (q->sock) {
zmq_close(q->sock);
q->sock = NULL;
}
}

@ -34,11 +34,12 @@ int srslte_ringbuffer_init(srslte_ringbuffer_t *q, int capacity)
}
q->active = true;
q->capacity = capacity;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cvar, NULL);
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->write_cvar, NULL);
pthread_cond_init(&q->read_cvar, NULL);
srslte_ringbuffer_reset(q);
return 0;
return 0;
}
void srslte_ringbuffer_free(srslte_ringbuffer_t *q)
@ -47,10 +48,11 @@ void srslte_ringbuffer_free(srslte_ringbuffer_t *q)
srslte_ringbuffer_stop(q);
if (q->buffer) {
free(q->buffer);
q->buffer = NULL;
q->buffer = NULL;
}
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->cvar);
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->write_cvar);
pthread_cond_destroy(&q->read_cvar);
}
}
@ -90,20 +92,64 @@ int srslte_ringbuffer_write(srslte_ringbuffer_t *q, void *p, int nof_bytes)
ERROR("Buffer overrun: lost %d bytes\n", nof_bytes - w_bytes);
}
if (w_bytes > q->capacity - q->wpm) {
int x = q->capacity - q->wpm;
memcpy(&q->buffer[q->wpm], ptr, x);
memcpy(q->buffer, &ptr[x], w_bytes - x);
int x = q->capacity - q->wpm;
memcpy(&q->buffer[q->wpm], ptr, x);
memcpy(q->buffer, &ptr[x], w_bytes - x);
} else {
memcpy(&q->buffer[q->wpm], ptr, w_bytes);
memcpy(&q->buffer[q->wpm], ptr, w_bytes);
}
q->wpm += w_bytes;
q->wpm += w_bytes;
if (q->wpm >= q->capacity) {
q->wpm -= q->capacity;
q->wpm -= q->capacity;
}
q->count += w_bytes;
pthread_cond_broadcast(&q->cvar);
q->count += w_bytes;
pthread_cond_broadcast(&q->write_cvar);
pthread_mutex_unlock(&q->mutex);
return w_bytes;
return w_bytes;
}
int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms)
{
int ret = SRSLTE_SUCCESS;
uint8_t* ptr = (uint8_t*)p;
int w_bytes = nof_bytes;
struct timespec towait;
struct timeval now;
// Get current time and update timeout
gettimeofday(&now, NULL);
towait.tv_sec = now.tv_sec + timeout_ms / 1000U;
towait.tv_nsec = (now.tv_usec + 1000UL * (timeout_ms % 1000U)) * 1000UL;
pthread_mutex_lock(&q->mutex);
// Wait to have enough space in the buffer
while (q->count + w_bytes > q->capacity && q->active && ret == SRSLTE_SUCCESS) {
ret = pthread_cond_timedwait(&q->read_cvar, &q->mutex, &towait);
}
if (ret == ETIMEDOUT) {
ret = SRSLTE_ERROR_TIMEOUT;
} else if (!q->active) {
ret = SRSLTE_SUCCESS;
} else if (ret == SRSLTE_SUCCESS) {
if (w_bytes > q->capacity - q->wpm) {
int x = q->capacity - q->wpm;
memcpy(&q->buffer[q->wpm], ptr, x);
memcpy(q->buffer, &ptr[x], w_bytes - x);
} else {
memcpy(&q->buffer[q->wpm], ptr, w_bytes);
}
q->wpm += w_bytes;
if (q->wpm >= q->capacity) {
q->wpm -= q->capacity;
}
q->count += w_bytes;
} else {
ret = SRSLTE_ERROR;
}
pthread_cond_broadcast(&q->write_cvar);
pthread_mutex_unlock(&q->mutex);
return w_bytes;
}
int srslte_ringbuffer_read(srslte_ringbuffer_t *q, void *p, int nof_bytes)
@ -111,26 +157,27 @@ int srslte_ringbuffer_read(srslte_ringbuffer_t *q, void *p, int nof_bytes)
uint8_t *ptr = (uint8_t*) p;
pthread_mutex_lock(&q->mutex);
while(q->count < nof_bytes && q->active) {
pthread_cond_wait(&q->cvar, &q->mutex);
pthread_cond_wait(&q->write_cvar, &q->mutex);
}
if (!q->active) {
pthread_mutex_unlock(&q->mutex);
return 0;
}
if (nof_bytes + q->rpm > q->capacity) {
int x = q->capacity - q->rpm;
int x = q->capacity - q->rpm;
memcpy(ptr, &q->buffer[q->rpm], x);
memcpy(&ptr[x], q->buffer, nof_bytes - x);
} else {
} else {
memcpy(ptr, &q->buffer[q->rpm], nof_bytes);
}
q->rpm += nof_bytes;
q->rpm += nof_bytes;
if (q->rpm >= q->capacity) {
q->rpm -= q->capacity;
q->rpm -= q->capacity;
}
q->count -= nof_bytes;
q->count -= nof_bytes;
pthread_cond_broadcast(&q->read_cvar);
pthread_mutex_unlock(&q->mutex);
return nof_bytes;
return nof_bytes;
}
int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms)
@ -150,7 +197,7 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes,
// Wait for having enough samples
while (q->count < nof_bytes && q->active && ret == SRSLTE_SUCCESS) {
ret = pthread_cond_timedwait(&q->cvar, &q->mutex, &towait);
ret = pthread_cond_timedwait(&q->write_cvar, &q->mutex, &towait);
}
if (ret == ETIMEDOUT) {
@ -176,6 +223,7 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes,
}
// Unlock mutex
pthread_cond_broadcast(&q->read_cvar);
pthread_mutex_unlock(&q->mutex);
return ret;
@ -184,7 +232,8 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes,
void srslte_ringbuffer_stop(srslte_ringbuffer_t *q) {
pthread_mutex_lock(&q->mutex);
q->active = false;
pthread_cond_broadcast(&q->cvar);
pthread_cond_broadcast(&q->write_cvar);
pthread_cond_broadcast(&q->read_cvar);
pthread_mutex_unlock(&q->mutex);
}
@ -195,7 +244,7 @@ int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, f
pthread_mutex_lock(&q->mutex);
while (q->count < nof_bytes && q->active) {
pthread_cond_wait(&q->cvar, &q->mutex);
pthread_cond_wait(&q->write_cvar, &q->mutex);
}
if (!q->active) {
pthread_mutex_unlock(&q->mutex);
@ -218,6 +267,7 @@ int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, f
q->rpm -= q->capacity;
}
q->count -= nof_bytes;
pthread_cond_broadcast(&q->read_cvar);
pthread_mutex_unlock(&q->mutex);
return nof_samples;
}
@ -230,7 +280,7 @@ int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes
/* Wait until enough data is in the buffer */
while (q->count < nof_bytes && q->active) {
pthread_cond_wait(&q->cvar, &q->mutex);
pthread_cond_wait(&q->write_cvar, &q->mutex);
}
if (!q->active) {
@ -245,6 +295,7 @@ int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes
q->rpm -= q->capacity;
}
}
pthread_cond_broadcast(&q->read_cvar);
pthread_mutex_unlock(&q->mutex);
return ret;
}

Loading…
Cancel
Save