diff --git a/lib/src/phy/rf/rf_zmq_imp.c b/lib/src/phy/rf/rf_zmq_imp.c index c9d4dcd91..af6c19b5e 100644 --- a/lib/src/phy/rf/rf_zmq_imp.c +++ b/lib/src/phy/rf/rf_zmq_imp.c @@ -48,15 +48,15 @@ typedef struct { // Server void* context; - rf_zmq_tx_t transmitter; - rf_zmq_rx_t receiver; + rf_zmq_tx_t transmitter[SRSLTE_MAX_PORTS]; + rf_zmq_rx_t receiver[SRSLTE_MAX_PORTS]; char rx_port[PARAM_LEN]; char tx_port[PARAM_LEN]; char id[PARAM_LEN_SHORT]; // Various sample buffers - cf_t* buffer_decimation; + cf_t* buffer_decimation[SRSLTE_MAX_PORTS]; cf_t* buffer_tx; // Rx timestamp @@ -198,11 +198,6 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) if (h) { *h = NULL; - if (nof_channels != 1) { - printf("rf_zmq only supports single port at the moment.\n"); - return SRSLTE_ERROR; - } - rf_zmq_handler_t* handler = (rf_zmq_handler_t*)malloc(sizeof(rf_zmq_handler_t)); if (!handler) { perror("malloc"); @@ -216,6 +211,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) handler->info.min_rx_gain = -INFINITY; handler->info.max_tx_gain = +INFINITY; handler->info.min_tx_gain = -INFINITY; + handler->nof_channels = nof_channels; strcpy(handler->id, "zmq\0"); // parse args @@ -234,36 +230,6 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) } } - // rxport - { - const char config_arg[] = "rx_port="; - char config_str[PARAM_LEN] = {0}; - char* config_ptr = strstr(args, config_arg); - if (config_ptr) { - copy_subdev_string(config_str, config_ptr + strlen(config_arg)); - printf("Using rx_port=%s\n", config_str); - strncpy(handler->rx_port, config_str, PARAM_LEN); - handler->rx_port[PARAM_LEN - 1] = 0; - remove_substring(args, config_arg); - remove_substring(args, config_str); - } - } - - // txport - { - const char config_arg[] = "tx_port="; - char config_str[PARAM_LEN] = {0}; - char* config_ptr = strstr(args, config_arg); - if (config_ptr) { - copy_subdev_string(config_str, config_ptr + strlen(config_arg)); - printf("Using tx_port=%s\n", config_str); - strncpy(handler->tx_port, config_str, PARAM_LEN); - handler->tx_port[PARAM_LEN - 1] = 0; - remove_substring(args, config_arg); - remove_substring(args, config_str); - } - } - // id { const char config_arg[] = "id="; @@ -289,36 +255,83 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) goto clean_exit; } - // initialize transmitter - if (strlen(handler->tx_port) != 0) { - if (rf_zmq_tx_open(&handler->transmitter, handler->id, handler->context, handler->tx_port) != SRSLTE_SUCCESS) { - fprintf(stderr, "[zmq] Error: opening transmitter\n"); - goto clean_exit; + for (int i = 0; i < handler->nof_channels; i++) { + // rxport + { + char config_arg[PARAM_LEN] = "rx_port="; + char config_str[PARAM_LEN] = {0}; + + if (i > 0) { + snprintf(config_arg, PARAM_LEN, "rx_port%d=", i + 1); + } + + char* config_ptr = strstr(args, config_arg); + + if (config_ptr) { + copy_subdev_string(config_str, config_ptr + strlen(config_arg)); + printf("Channel %d. Using rx_port=%s\n", i, config_str); + strncpy(handler->rx_port, config_str, PARAM_LEN); + handler->rx_port[PARAM_LEN - 1] = 0; + remove_substring(args, config_arg); + remove_substring(args, config_str); + } } - } else { - fprintf(stdout, "[zmq] %s Tx port not specified. Disabling transmitter.\n", handler->id); - } - // initialize receiver - if (strlen(handler->rx_port) != 0) { - if (rf_zmq_rx_open(&handler->receiver, handler->id, handler->context, handler->rx_port) != SRSLTE_SUCCESS) { - fprintf(stderr, "[zmq] Error: opening receiver\n"); - goto clean_exit; + // txport + { + char config_arg[PARAM_LEN] = "tx_port="; + char config_str[PARAM_LEN] = {0}; + + if (i > 0) { + snprintf(config_arg, PARAM_LEN, "tx_port%d=", i + 1); + } + + char* config_ptr = strstr(args, config_arg); + + if (config_ptr) { + copy_subdev_string(config_str, config_ptr + strlen(config_arg)); + printf("Channel %d. Using tx_port=%s\n", i, config_str); + strncpy(handler->tx_port, config_str, PARAM_LEN); + handler->tx_port[PARAM_LEN - 1] = 0; + remove_substring(args, config_arg); + remove_substring(args, config_str); + } } - } else { - fprintf(stdout, "[zmq] %s Rx port not specified. Disabling receiver.\n", handler->id); - } - if (!handler->transmitter.running && !handler->receiver.running) { - fprintf(stderr, "[zmq] Error: Neither Tx port nor Rx port specified.\n"); - goto clean_exit; + // initialize transmitter + if (strlen(handler->tx_port) != 0) { + if (rf_zmq_tx_open(&handler->transmitter[i], handler->id, handler->context, handler->tx_port) != + SRSLTE_SUCCESS) { + fprintf(stderr, "[zmq] Error: opening transmitter\n"); + goto clean_exit; + } + } else { + fprintf(stdout, "[zmq] %s Tx port not specified. Disabling transmitter.\n", handler->id); + } + + // initialize receiver + if (strlen(handler->rx_port) != 0) { + if (rf_zmq_rx_open(&handler->receiver[i], handler->id, handler->context, handler->rx_port) != SRSLTE_SUCCESS) { + fprintf(stderr, "[zmq] Error: opening receiver\n"); + goto clean_exit; + } + } else { + fprintf(stdout, "[zmq] %s Rx port not specified. Disabling receiver.\n", handler->id); + } + + if (!handler->transmitter[i].running && !handler->receiver[i].running) { + fprintf(stderr, "[zmq] Error: Neither Tx port nor Rx port specified.\n"); + goto clean_exit; + } } // Create decimation and overflow buffer - handler->buffer_decimation = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); - if (!handler->buffer_decimation) { - fprintf(stderr, "Error: allocating decimation buffer\n"); - goto clean_exit; + for (uint32_t i = 0; i < handler->nof_channels; i++) { + handler->buffer_decimation[i] = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); + if (!handler->buffer_decimation[i]) { + fprintf(stderr, "Error: allocating decimation buffer\n"); + goto clean_exit; + } } handler->buffer_tx = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); @@ -350,15 +363,19 @@ int rf_zmq_close(void* h) pthread_detach(handler->thread); } - rf_zmq_tx_close(&handler->transmitter); - rf_zmq_rx_close(&handler->receiver); + for (int i = 0; i < handler->nof_channels; i++) { + rf_zmq_tx_close(&handler->transmitter[i]); + rf_zmq_rx_close(&handler->receiver[i]); + } if (handler->context) { zmq_ctx_destroy(handler->context); } - if (handler->buffer_decimation) { - free(handler->buffer_decimation); + for (uint32_t i = 0; i < handler->nof_channels; i++) { + if (handler->buffer_decimation[i]) { + free(handler->buffer_decimation[i]); + } } if (handler->buffer_tx) { @@ -512,7 +529,7 @@ int rf_zmq_recv_with_time_multi( } // return if receiver is turned off - if (!handler->receiver.running) { + if (!handler->receiver[0].running) { update_ts(handler, &handler->next_rx_ts, nsamples_baserate, "rx"); return nsamples; } @@ -529,7 +546,7 @@ int rf_zmq_recv_with_time_multi( // receive samples srslte_timestamp_t ts_tx = {}, ts_rx = {}; - srslte_timestamp_init_uint64(&ts_tx, handler->transmitter.nsamples, handler->base_srate); + srslte_timestamp_init_uint64(&ts_tx, handler->transmitter[0].nsamples, handler->base_srate); srslte_timestamp_init_uint64(&ts_rx, handler->next_rx_ts, handler->base_srate); rf_zmq_info(handler->id, " - next rx time: %d + %.3f\n", ts_rx.full_secs, ts_rx.frac_secs); rf_zmq_info(handler->id, " - next tx time: %d + %.3f\n", ts_tx.full_secs, ts_tx.frac_secs); @@ -538,43 +555,66 @@ int rf_zmq_recv_with_time_multi( usleep((1000000 * nsamples) / handler->base_srate); // check for tx gap if we're also transmitting on this radio - if (handler->transmitter.running) { - rf_zmq_tx_align(&handler->transmitter, handler->next_rx_ts + nsamples_baserate); + for (int i = 0; i < handler->nof_channels; i++) { + if (handler->transmitter[i].running) { + rf_zmq_tx_align(&handler->transmitter[i], handler->next_rx_ts + nsamples_baserate); + } } // copy from rx buffer as many samples as requested into provided buffer - cf_t* ptr = (handler->decim_factor != 1) ? handler->buffer_decimation : data[0]; - int32_t count = 0; - while (count < nsamples_baserate && handler->receiver.running) { - int32_t n = rf_zmq_rx_baseband(&handler->receiver, &ptr[count], nsamples_baserate); - if (n > 0) { - // No error - count += n; - } else if (n == SRSLTE_ERROR_TIMEOUT) { - // Timeout, do nothing, keep going - } else if (n > 0) { - // Other error, exit - fprintf(stderr, "Error: receiving data.\n"); - goto clean_exit; + bool completed = false; + int32_t count[SRSLTE_MAX_PORTS] = {}; + while (!completed) { + uint32_t completed_count = 0; + + // Iterate channels + for (uint32_t i = 0; i < handler->nof_channels; i++) { + cf_t* ptr = (handler->decim_factor != 1) ? handler->buffer_decimation[i] : data[i]; + + // Completed condition + if (count[i] < nsamples_baserate && handler->receiver[i].running) { + // Keep receiving + int32_t n = rf_zmq_rx_baseband(&handler->receiver[i], &ptr[count[i]], nsamples_baserate); + if (n > 0) { + // No error + count[i] += n; + } else if (n == SRSLTE_ERROR_TIMEOUT) { + // Timeout, do nothing, keep going + } else if (n > 0) { + // Other error, exit + fprintf(stderr, "Error: receiving data.\n"); + goto clean_exit; + } + } else { + // Completed, count it + completed_count++; + } } + + // Check if all channels are completed + completed = (completed_count == handler->nof_channels); } rf_zmq_info(handler->id, " - read %d samples. %d samples available\n", NBYTES2NSAMPLES(nbytes), - NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->receiver.ringbuffer))); + NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->receiver[0].ringbuffer))); // decimate if needed if (handler->decim_factor != 1) { - cf_t* dst = data[0]; - - int n; - for (int i = n = 0; i < nsamples; i++) { - // Averaging decimation - cf_t avg = 0.0f; - for (int j = 0; j < handler->decim_factor; j++, n++) { - avg += ptr[n]; + for (int c = 0; c < handler->nof_channels; c++) { + + cf_t* dst = data[c]; + cf_t* ptr = (handler->decim_factor != 1) ? handler->buffer_decimation[c] : data[c]; + + int n; + for (uint32_t i = n = 0; i < nsamples; i++) { + // Averaging decimation + cf_t avg = 0.0f; + for (int j = 0; j < handler->decim_factor; j++, n++) { + avg += ptr[n]; + } + dst[i] = avg; } - dst[i] = avg; } rf_zmq_info(handler->id, " - re-adjust bytes due to %dx decimation %d --> %d samples)\n", @@ -652,50 +692,60 @@ int rf_zmq_send_timed_multi(void* h, srslte_timestamp_t ts = {}; srslte_timestamp_init(&ts, secs, frac_secs); uint64_t tx_ts = srslte_timestamp_uint64(&ts, handler->base_srate); - int num_tx_gap_samples = rf_zmq_tx_align(&handler->transmitter, tx_ts); + int num_tx_gap_samples = 0; + + for (int i = 0; i < handler->nof_channels; i++) { + if (handler->transmitter[i].running) { + num_tx_gap_samples = rf_zmq_tx_align(&handler->transmitter[i], tx_ts); + } + } if (num_tx_gap_samples < 0) { fprintf(stderr, "[zmq] Error: tx time is %.3f ms in the past (%ld < %ld)\n", -1000.0 * num_tx_gap_samples / handler->base_srate, tx_ts, - handler->transmitter.nsamples); + handler->transmitter[0].nsamples); goto clean_exit; } } - // Select buffer pointer depending on interpolation - cf_t* buf = (handler->decim_factor != 1) ? handler->buffer_tx : data[0]; + // Send base-band samples + for (int i = 0; i < handler->nof_channels; i++) { + // Select buffer pointer depending on interpolation + cf_t* buf = (handler->decim_factor != 1) ? handler->buffer_tx : data[i]; + + // Interpolate if required + if (handler->decim_factor != 1) { + rf_zmq_info(handler->id, + " - re-adjust bytes due to %dx interpolation %d --> %d samples)\n", + handler->decim_factor, + nsamples, + nsamples_baseband); + + int n = 0; + cf_t* src = data[i]; + for (int k = 0; k < nsamples; k++) { + // perform zero order hold + for (int j = 0; j < handler->decim_factor; j++, n++) { + buf[n] = src[k]; + } + } - // Interpolate if required - if (handler->decim_factor != 1) { - rf_zmq_info(handler->id, - " - re-adjust bytes due to %dx interpolation %d --> %d samples)\n", - handler->decim_factor, - nsamples, - nsamples_baseband); - - int n = 0; - cf_t* src = data[0]; - for (int i = 0; i < nsamples; i++) { - // perform zero order hold - for (int j = 0; j < handler->decim_factor; j++, n++) { - buf[n] = src[i]; + if (nsamples_baseband != n) { + fprintf(stderr, + "Number of tx samples (%d) does not match with number of interpolated samples (%d)\n", + nsamples_baseband, + n); + goto clean_exit; } } - if (nsamples_baseband != n) { - fprintf(stderr, "Number of tx samples (%d) does not match with number of interpolated samples (%d)\n", - nsamples_baseband, n); + int n = rf_zmq_tx_baseband(&handler->transmitter[i], buf, nsamples_baseband); + if (n == SRSLTE_ERROR) { goto clean_exit; } } - - // Send base-band samples - int n = rf_zmq_tx_baseband(&handler->transmitter, buf, nsamples_baseband); - if (n == SRSLTE_ERROR) { - goto clean_exit; - } handler->tx_used = true; }