diff --git a/lib/examples/CMakeLists.txt b/lib/examples/CMakeLists.txt index e17b3b2b8..05e38dece 100644 --- a/lib/examples/CMakeLists.txt +++ b/lib/examples/CMakeLists.txt @@ -52,6 +52,10 @@ if(SRSGUI_FOUND) add_definitions(-DENABLE_GUI) endif(SRSGUI_FOUND) +if (ZEROMQ_FOUND) + add_executable(zmq_remote_rx zmq_remote_rx.c) + target_link_libraries(zmq_remote_rx srslte_phy srslte_rf) +endif (ZEROMQ_FOUND) ################################################################# # These examples need the UHD driver diff --git a/lib/examples/zmq_remote_rx.c b/lib/examples/zmq_remote_rx.c new file mode 100644 index 000000000..6ce7bdb71 --- /dev/null +++ b/lib/examples/zmq_remote_rx.c @@ -0,0 +1,259 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "srslte/srslte.h" +#include "srslte/phy/rf/rf.h" + +static bool keep_running = true; +static uint32_t nof_rx_antennas = 1; +static const uint32_t max_rx_antennas = 1; + +static void int_handler(int dummy); +static void usage(char *prog); +static void parse_args(int argc, char **argv); +static int init_radio(uint32_t *buf_len); +static int rx_radio(void **buffer, uint32_t buff_len); +static void close_radio(); + +/* Example function to initialize ZMQ socket */ +static void *zmq_ctx = NULL; +static void *zmq_sock = NULL; +static const char *zmq_args = "tcp://*:5550"; +static int init_zmq() +{ + zmq_ctx = zmq_ctx_new(); + + // Create socket + zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB); + if (!zmq_sock) { + fprintf(stderr, "Error: creating transmitter socket\n"); + return -1; + } + + // The transmitter starts first and creates the socket + if (zmq_bind(zmq_sock, zmq_args)) { + fprintf(stderr, "Error: connecting transmitter socket: %s\n", zmq_strerror(zmq_errno())); + return -1; + } + return 0; +} + +/* Example function to write samples to ZMQ socket */ +static int tx_zmq(void **buffer, uint32_t buffer_len) +{ + // wait for request + uint8_t dummy; + zmq_recv(zmq_sock, &dummy, sizeof(dummy), 0); + return zmq_send(zmq_sock, buffer[0], buffer_len, 0); +} + +int main(int argc, char **argv) { + void *buffer[max_rx_antennas]; + int n = 0; + uint32_t buflen = 0; // in samples + uint32_t sample_size = 8; + + // Sets signal handlers + signal(SIGINT, int_handler); + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + sigprocmask(SIG_UNBLOCK, &sigset, NULL); + + // Parse args + parse_args(argc, argv); + + // Initializes ZMQ + if (init_zmq()) { + ERROR("Initializing ZMQ\n"); + exit(-1); + } + + if (init_radio(&buflen)) { + ERROR("Initializing Radio\n"); + exit(-1); + } + + // Initializes memory for input buffer + bzero(buffer, sizeof(void*)*max_rx_antennas); + for (int i = 0; i < nof_rx_antennas; i++) { + buffer[i] = malloc(buflen*sizeof(cf_t)); + if (!buffer[i]) { + perror("malloc"); + exit(-1); + } + } + + printf("Streaming samples...\n"); + uint32_t print_cnt = 0; + while(keep_running) { + n = rx_radio(buffer, buflen); + if (n < 0) { + ERROR("Error receiving samples\n"); + exit(-1); + } + if (srslte_verbose == SRSLTE_VERBOSE_INFO) { + printf("Received %d samples from radio\n", n); + } + + n = tx_zmq((void**) buffer, n*sample_size); + + if (n == -1) { + print_cnt++; + if (print_cnt == 1000) { + printf("ZMQ socket not connected\n"); + print_cnt = 0; + } + } else { + if (srslte_verbose == SRSLTE_VERBOSE_INFO) { + printf("Transmitted %d bytes to ZMQ\n", n); + } + } + } + + // Cleanup memory and close RF device + for (int i = 0; i < nof_rx_antennas; i++) { + if (buffer[i]) { + free(buffer[i]); + } + } + close_radio(); + + printf("Exit Ok\n"); + exit(0); +} + +/* Example function to initialize the Radio frontend. In this case, we use srsLTE RF API to open a device, + * which automatically picks UHD, bladeRF, limeSDR, etc. + */ +static srslte_rf_t rf = {}; +static char *rf_args = "fastpath"; +static float rf_gain = 40.0, rf_freq = -1.0, rf_rate = 11.52e6; +static uint32_t rf_recv_frame_size_ms = 1; +static int init_radio(uint32_t *buffer_len) +{ + // Uses srsLTE RF API to open a device, could use other code here + printf("Opening RF device...\n"); + if (srslte_rf_open_multi(&rf, rf_args, nof_rx_antennas)) { + ERROR("Error opening rf\n"); + return -1; + } + + printf("Set RX freq: %.2f MHz\n", srslte_rf_set_rx_freq(&rf, nof_rx_antennas, rf_freq) / 1000000); + printf("Set RX gain: %.2f dB\n", srslte_rf_set_rx_gain(&rf, rf_gain)); + float srate = srslte_rf_set_rx_srate(&rf, rf_rate); + if (srate != rf_rate) { + ERROR("Error setting samplign frequency %.2f MHz\n", rf_rate * 1e-6); + return -1; + } + + if (buffer_len) { + *buffer_len = srate*rf_recv_frame_size_ms*1e-3; + } + + printf("Set RX rate: %.2f MHz\n", srate*1e-6); + srslte_rf_start_rx_stream(&rf, false); + return 0; +} + +/* Example implementation to receive from Radio frontend. In this case we use srsLTE + */ +static int rx_radio(void **buffer, uint32_t buf_len) +{ + return srslte_rf_recv_with_time_multi(&rf, buffer, buf_len, true, NULL, NULL); +} + +static void close_radio() +{ + srslte_rf_close(&rf); +} + +static void int_handler(int dummy) { + keep_running = false; +} + +static void usage(char *prog) { + printf("Usage: %s [agrAzv] -f rx_frequency_hz\n", prog); + printf("\t-a RF args [Default %s]\n", rf_args); + printf("\t-g RF Gain [Default %.2f dB]\n", rf_gain); + printf("\t-r RF Rate [Default %.6f Hz]\n", rf_rate); + printf("\t-m RF receive frame size in ms [Default %d ms]\n", rf_recv_frame_size_ms); + printf("\t-A Number of antennas [Max %d, Default %d]\n", max_rx_antennas, nof_rx_antennas); + printf("\t-z ZMQ args [Default %s]\n", zmq_args); + printf("\t-v srslte_verbose\n"); +} + +static void parse_args(int argc, char **argv) { + int opt; + while ((opt = getopt(argc, argv, "agrfvmzA")) != -1) { + switch (opt) { + case 'a': + rf_args = argv[optind]; + break; + case 'g': + rf_gain = strtof(argv[optind], NULL); + break; + case 'm': + rf_recv_frame_size_ms = strtol(argv[optind], NULL); + break; + case 'r': + rf_rate = strtof(argv[optind], NULL); + break; + case 'f': + rf_freq = strtof(argv[optind], NULL); + break; + case 'v': + srslte_verbose++; + break; + case 'z': + zmq_args = argv[optind]; + break; + case 'A': + nof_rx_antennas = strtol(argv[optind], NULL); + break; + default: + usage(argv[0]); + exit(-1); + } + } + if (nof_rx_antennas > max_rx_antennas || nof_rx_antennas < 1) { + fprintf(stderr, "Invalid number of antennas\n"); + usage(argv[0]); + exit(-1); + } + if (rf_freq < 0) { + usage(argv[0]); + exit(-1); + } +} + diff --git a/lib/src/phy/rf/rf_zmq_imp.c b/lib/src/phy/rf/rf_zmq_imp.c index b0ad437dd..2721f2974 100644 --- a/lib/src/phy/rf/rf_zmq_imp.c +++ b/lib/src/phy/rf/rf_zmq_imp.c @@ -214,6 +214,13 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) handler->nof_channels = nof_channels; strcpy(handler->id, "zmq\0"); + rf_zmq_opts_t rx_opts = {}; + rf_zmq_opts_t tx_opts = {}; + rx_opts.socket_type = ZMQ_REQ; + tx_opts.socket_type = ZMQ_REP; + tx_opts.id = handler->id; + rx_opts.id = handler->id; + // parse args if (args && strlen(args)) { // base_srate @@ -244,6 +251,77 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) remove_substring(args, config_str); } } + // rx_type + { + const char config_arg[] = "rx_type="; + char config_str[PARAM_LEN_SHORT] = {0}; + char* config_ptr = strstr(args, config_arg); + if (config_ptr) { + copy_subdev_string(config_str, config_ptr + strlen(config_arg)); + if (!strcmp(config_str, "sub")) { + rx_opts.socket_type = ZMQ_SUB; + printf("Using ZMQ_SUB for rx socket\n"); + } else { + printf("Unsupported socket type %s. Using ZMQ_REQ for rx socket\n", config_str); + } + remove_substring(args, config_arg); + remove_substring(args, config_str); + } + } + // rx_format + { + const char config_arg[] = "rx_format="; + char config_str[PARAM_LEN_SHORT] = {0}; + char* config_ptr = strstr(args, config_arg); + if (config_ptr) { + copy_subdev_string(config_str, config_ptr + strlen(config_arg)); + rx_opts.sample_format = ZMQ_TYPE_FC32; + if (!strcmp(config_str, "sc16")) { + rx_opts.sample_format = ZMQ_TYPE_SC16; + printf("Using sc16 format for rx socket\n"); + } else { + printf("Unsupported sample format %s. Using fc32 for rx socket\n", config_str); + } + remove_substring(args, config_arg); + remove_substring(args, config_str); + } + } + // tx_type + { + const char config_arg[] = "tx_type="; + char config_str[PARAM_LEN_SHORT] = {0}; + char* config_ptr = strstr(args, config_arg); + if (config_ptr) { + copy_subdev_string(config_str, config_ptr + strlen(config_arg)); + if (!strcmp(config_str, "pub")) { + tx_opts.socket_type = ZMQ_PUB; + printf("Using ZMQ_PUB for tx socket\n"); + } else { + printf("Unsupported socket type %s. Using ZMQ_REP for tx socket\n", config_str); + } + remove_substring(args, config_arg); + remove_substring(args, config_str); + } + } + // tx_format + { + const char config_arg[] = "tx_format="; + char config_str[PARAM_LEN_SHORT] = {0}; + char* config_ptr = strstr(args, config_arg); + if (config_ptr) { + copy_subdev_string(config_str, config_ptr + strlen(config_arg)); + tx_opts.sample_format = ZMQ_TYPE_FC32; + if (!strcmp(config_str, "sc16")) { + tx_opts.sample_format = ZMQ_TYPE_SC16; + printf("Using sc16 format for tx socket\n"); + } else { + printf("Unsupported sample format %s. Using fc32 for tx socket\n", config_str); + } + remove_substring(args, config_arg); + remove_substring(args, config_str); + } + } + } else { fprintf(stderr, "[zmq] Error: RF device args are required for ZMQ no-RF module\n"); goto clean_exit; @@ -303,7 +381,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) // initialize transmitter if (strlen(handler->tx_port) != 0) { - if (rf_zmq_tx_open(&handler->transmitter[i], handler->id, handler->context, handler->tx_port) != + if (rf_zmq_tx_open(&handler->transmitter[i], tx_opts, handler->context, handler->tx_port) != SRSLTE_SUCCESS) { fprintf(stderr, "[zmq] Error: opening transmitter\n"); goto clean_exit; @@ -314,7 +392,7 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) // initialize receiver if (strlen(handler->rx_port) != 0) { - if (rf_zmq_rx_open(&handler->receiver[i], handler->id, handler->context, handler->rx_port) != SRSLTE_SUCCESS) { + if (rf_zmq_rx_open(&handler->receiver[i], rx_opts, handler->context, handler->rx_port) != SRSLTE_SUCCESS) { fprintf(stderr, "[zmq] Error: opening receiver\n"); goto clean_exit; } diff --git a/lib/src/phy/rf/rf_zmq_imp_rx.c b/lib/src/phy/rf/rf_zmq_imp_rx.c index 93aadc612..e927febc8 100644 --- a/lib/src/phy/rf/rf_zmq_imp_rx.c +++ b/lib/src/phy/rf/rf_zmq_imp_rx.c @@ -29,6 +29,7 @@ #include #include #include +#include static void* rf_zmq_async_rx_thread(void* h) { @@ -41,15 +42,19 @@ static void* rf_zmq_async_rx_thread(void* h) rf_zmq_info(q->id, "-- ASYNC RX wait...\n"); - // Send request - while (n < 0 && q->running) { - rf_zmq_info(q->id, " - tx'ing rx request\n"); - n = zmq_send(q->sock, &dummy, sizeof(dummy), 0); - if (n < 0) { - if (rf_zmq_handle_error(q->id, "synchronous rx request send")) { - return NULL; + // Send request if socket type is REQUEST + if (q->socket_type == ZMQ_REQ) { + while (n < 0 && q->running) { + rf_zmq_info(q->id, " - tx'ing rx request\n"); + n = zmq_send(q->sock, &dummy, sizeof(dummy), 0); + if (n < 0) { + if (rf_zmq_handle_error(q->id, "synchronous rx request send")) { + return NULL; + } } } + } else { + n = 0; } // Receive baseband @@ -95,7 +100,7 @@ static void* rf_zmq_async_rx_thread(void* h) return NULL; } -int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args) +int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock_args) { int ret = SRSLTE_ERROR; @@ -104,15 +109,21 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args) bzero(q, sizeof(rf_zmq_rx_t)); // Copy id - strncpy(q->id, id, ZMQ_ID_STRLEN - 1); + strncpy(q->id, opts.id, ZMQ_ID_STRLEN - 1); q->id[ZMQ_ID_STRLEN - 1] = '\0'; // Create socket - q->sock = zmq_socket(zmq_ctx, ZMQ_REQ); + q->sock = zmq_socket(zmq_ctx, opts.socket_type); if (!q->sock) { fprintf(stderr, "[zmq] Error: creating transmitter socket\n"); goto clean_exit; } + q->socket_type = opts.socket_type; + q->sample_format = opts.sample_format; + + if (opts.socket_type == ZMQ_SUB) { + zmq_setsockopt(q->sock, ZMQ_SUBSCRIBE, "", 0); + } rf_zmq_info(q->id, "Connecting receiver: %s\n", sock_args); @@ -152,6 +163,12 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args) goto clean_exit; } + q->temp_buffer_convert = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); + if (!q->temp_buffer_convert) { + fprintf(stderr, "Error: allocating rx buffer\n"); + goto clean_exit; + } + if (pthread_mutex_init(&q->mutex, NULL)) { fprintf(stderr, "Error: creating mutex\n"); goto clean_exit; @@ -172,7 +189,23 @@ clean_exit: int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples) { - return srslte_ringbuffer_read_timed(&q->ringbuffer, buffer, NSAMPLES2NBYTES(nsamples), ZMQ_TIMEOUT_MS); + void *dst_buffer = buffer; + uint32_t sample_sz = sizeof(cf_t); + if (q->sample_format != ZMQ_TYPE_FC32) { + dst_buffer = q->temp_buffer_convert; + sample_sz = 2*sizeof(short); + } + + int n = srslte_ringbuffer_read_timed(&q->ringbuffer, dst_buffer, sample_sz*nsamples, ZMQ_TIMEOUT_MS); + if (n < 0) { + return n; + } + + if (q->sample_format == ZMQ_TYPE_SC16) { + srslte_vec_convert_if(dst_buffer, INT16_MAX, (float*) buffer, 2*nsamples); + } + + return n; } void rf_zmq_rx_close(rf_zmq_rx_t* q) @@ -191,6 +224,10 @@ void rf_zmq_rx_close(rf_zmq_rx_t* q) free(q->temp_buffer); } + if (q->temp_buffer_convert) { + free(q->temp_buffer_convert); + } + if (q->sock) { zmq_close(q->sock); q->sock = NULL; diff --git a/lib/src/phy/rf/rf_zmq_imp_trx.h b/lib/src/phy/rf/rf_zmq_imp_trx.h index f73961d35..178ab8824 100644 --- a/lib/src/phy/rf/rf_zmq_imp_trx.h +++ b/lib/src/phy/rf/rf_zmq_imp_trx.h @@ -40,17 +40,27 @@ #define ZMQ_BASERATE_DEFAULT_HZ (23040000) #define ZMQ_ID_STRLEN 16 +typedef enum { + ZMQ_TYPE_FC32 = 0, + ZMQ_TYPE_SC16 +} rf_zmq_format_t; + typedef struct { char id[ZMQ_ID_STRLEN]; + uint32_t socket_type; + rf_zmq_format_t sample_format; void* sock; uint64_t nsamples; bool running; pthread_mutex_t mutex; cf_t* zeros; + void* temp_buffer_convert; } rf_zmq_tx_t; typedef struct { char id[ZMQ_ID_STRLEN]; + uint32_t socket_type; + rf_zmq_format_t sample_format; void* sock; uint64_t nsamples; bool running; @@ -58,8 +68,15 @@ typedef struct { pthread_mutex_t mutex; srslte_ringbuffer_t ringbuffer; cf_t* temp_buffer; + void* temp_buffer_convert; } rf_zmq_rx_t; +typedef struct { + const char *id; + uint32_t socket_type; + rf_zmq_format_t sample_format; +} rf_zmq_opts_t; + /* * Common functions */ @@ -72,7 +89,7 @@ SRSLTE_API int rf_zmq_handle_error(char* id, const char* text); /* * Transmitter functions */ -SRSLTE_API int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_args); +SRSLTE_API int rf_zmq_tx_open(rf_zmq_tx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock_args); SRSLTE_API int rf_zmq_tx_align(rf_zmq_tx_t* q, uint64_t ts); @@ -83,7 +100,7 @@ SRSLTE_API void rf_zmq_tx_close(rf_zmq_tx_t* q); /* * Receiver functions */ -SRSLTE_API int rf_zmq_rx_open(rf_zmq_rx_t* q, char* id, void* zmq_ctx, char* sock_args); +SRSLTE_API int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock_args); SRSLTE_API int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples); diff --git a/lib/src/phy/rf/rf_zmq_imp_tx.c b/lib/src/phy/rf/rf_zmq_imp_tx.c index 71320fc08..ff0a871af 100644 --- a/lib/src/phy/rf/rf_zmq_imp_tx.c +++ b/lib/src/phy/rf/rf_zmq_imp_tx.c @@ -30,8 +30,9 @@ #include #include #include +#include -int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_args) +int rf_zmq_tx_open(rf_zmq_tx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock_args) { int ret = SRSLTE_ERROR; @@ -40,15 +41,17 @@ int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_arg bzero(q, sizeof(rf_zmq_tx_t)); // Copy id - strncpy(q->id, id, ZMQ_ID_STRLEN - 1); + strncpy(q->id, opts.id, ZMQ_ID_STRLEN - 1); q->id[ZMQ_ID_STRLEN - 1] = '\0'; // Create socket - q->sock = zmq_socket(zmq_ctx, ZMQ_REP); + q->sock = zmq_socket(zmq_ctx, opts.socket_type); if (!q->sock) { fprintf(stderr, "[zmq] Error: creating transmitter socket\n"); goto clean_exit; } + q->socket_type = opts.socket_type; + q->sample_format = opts.sample_format; rf_zmq_info(q->id, "Binding transmitter: %s\n", sock_args); @@ -82,6 +85,12 @@ int rf_zmq_tx_open(rf_zmq_tx_t* q, const char* id, void* zmq_ctx, char* sock_arg goto clean_exit; } + q->temp_buffer_convert = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); + if (!q->temp_buffer_convert) { + fprintf(stderr, "Error: allocating rx buffer\n"); + goto clean_exit; + } + q->zeros = srslte_vec_malloc(ZMQ_MAX_BUFFER_SIZE); if (!q->zeros) { fprintf(stderr, "Error: allocating zeros\n"); @@ -103,23 +112,37 @@ static int _rf_zmq_tx_baseband(rf_zmq_tx_t* q, cf_t* buffer, uint32_t nsamples) int n = SRSLTE_ERROR; while (n < 0 && q->running) { - // Receive Transmit request - uint8_t dummy; - n = zmq_recv(q->sock, &dummy, sizeof(dummy), 0); - if (n < 0) { - if (rf_zmq_handle_error(q->id, "tx request receive")) { - n = SRSLTE_ERROR; - goto clean_exit; + // Receive Transmit request is socket type is REPLY + if (q->socket_type == ZMQ_REP) { + uint8_t dummy; + n = zmq_recv(q->sock, &dummy, sizeof(dummy), 0); + if (n < 0) { + if (rf_zmq_handle_error(q->id, "tx request receive")) { + n = SRSLTE_ERROR; + goto clean_exit; + } + } else { + // Tx request received successful + rf_zmq_info(q->id, " - tx request received\n"); + rf_zmq_info(q->id, " - sending %d samples (%d B)\n", nsamples, NSAMPLES2NBYTES(nsamples)); } } else { - // Tx request received successful - rf_zmq_info(q->id, " - tx request received\n"); - rf_zmq_info(q->id, " - sending %d samples (%d B)\n", nsamples, NSAMPLES2NBYTES(nsamples)); + n = 1; + } + + // convert samples if necessary + void *buf = buffer; + uint32_t sample_sz = sizeof(cf_t); + + if (q->sample_format == ZMQ_TYPE_SC16) { + buf = q->temp_buffer_convert; + sample_sz = 2*sizeof(short); + srslte_vec_convert_fi((float*) buffer, INT16_MAX, (short*) q->temp_buffer_convert, 2*nsamples); } // Send base-band if request was received if (n > 0) { - n = zmq_send(q->sock, buffer, NSAMPLES2NBYTES(nsamples), 0); + n = zmq_send(q->sock, buf, sample_sz*nsamples, 0); if (n < 0) { if (rf_zmq_handle_error(q->id, "tx baseband send")) { n = SRSLTE_ERROR; @@ -184,6 +207,10 @@ void rf_zmq_tx_close(rf_zmq_tx_t* q) free(q->zeros); } + if (q->temp_buffer_convert) { + free(q->temp_buffer_convert); + } + if (q->sock) { zmq_close(q->sock); q->sock = NULL;