diff --git a/lib/src/phy/rf/rf_zmq_imp.c b/lib/src/phy/rf/rf_zmq_imp.c index 29237b570..80cd55705 100644 --- a/lib/src/phy/rf/rf_zmq_imp.c +++ b/lib/src/phy/rf/rf_zmq_imp.c @@ -657,6 +657,47 @@ void rf_zmq_get_time(void* h, time_t* secs, double* frac_secs) } } +#if ZMQ_MONITOR +static int rf_zmq_rx_get_monitor_event(void* monitor, int* value, char** address) +{ + // First frame in message contains event number and value + zmq_msg_t msg; + zmq_msg_init(&msg); + if (zmq_msg_recv(&msg, monitor, 0) == -1) { + printf("zmq_msg_recv failed!\n"); + return -1; // Interruped, presumably + } + + if (zmq_msg_more(&msg)) { + printf("more to read\n"); + } + + uint8_t* data = (uint8_t*)zmq_msg_data(&msg); + uint16_t event = *(uint16_t*)(data); + if (value) { + *value = *(uint32_t*)(data + 2); + } + + // Second frame in message contains event address + zmq_msg_init(&msg); + if (zmq_msg_recv(&msg, monitor, 0) == -1) { + return -1; // Interruped, presumably + } + if (zmq_msg_more(&msg)) { + printf("error in msg_more \n"); + } + + if (address) { + uint8_t* data = (uint8_t*)zmq_msg_data(&msg); + size_t size = zmq_msg_size(&msg); + *address = (char*)malloc(size + 1); + memcpy(*address, data, size); + *address[size] = 0; + } + return event; +} +#endif // ZMQ_MONITOR + int rf_zmq_recv_with_time(void* h, void* data, uint32_t nsamples, bool blocking, time_t* secs, double* frac_secs) { return rf_zmq_recv_with_time_multi(h, &data, nsamples, blocking, secs, frac_secs); @@ -757,6 +798,23 @@ int rf_zmq_recv_with_time_multi(void* h, if (count[i] < nsamples_baserate && handler->receiver[i].running) { // Keep receiving int32_t n = rf_zmq_rx_baseband(&handler->receiver[i], &ptr[count[i]], nsamples_baserate); +#if ZMQ_MONITOR + // handle socket events + int event = rf_zmq_rx_get_monitor_event(handler->receiver[i].socket_monitor, NULL, NULL); + if (event != -1) { + printf("event=0x%X\n", event); + switch (event) { + case ZMQ_EVENT_CONNECTED: + handler->receiver[i].tx_connected = true; + break; + case ZMQ_EVENT_CLOSED: + handler->receiver[i].tx_connected = false; + break; + default: + break; + } + } +#endif // ZMQ_MONITOR if (n > 0) { // No error count[i] += n; diff --git a/lib/src/phy/rf/rf_zmq_imp_rx.c b/lib/src/phy/rf/rf_zmq_imp_rx.c index 26c296208..fe54b8c1b 100644 --- a/lib/src/phy/rf/rf_zmq_imp_rx.c +++ b/lib/src/phy/rf/rf_zmq_imp_rx.c @@ -118,14 +118,32 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock fprintf(stderr, "[zmq] Error: creating transmitter socket\n"); goto clean_exit; } - q->socket_type = opts.socket_type; + q->socket_type = opts.socket_type; q->sample_format = opts.sample_format; q->frequency_mhz = opts.frequency_mhz; + q->fail_on_disconnect = opts.fail_on_disconnect; if (opts.socket_type == ZMQ_SUB) { zmq_setsockopt(q->sock, ZMQ_SUBSCRIBE, "", 0); } +#if ZMQ_MONITOR + // Monitor all events (monitoring only works over inproc://) + ret = zmq_socket_monitor(q->sock, "inproc://monitor-client", ZMQ_EVENT_ALL); + if (ret == -1) { + fprintf(stderr, "Error: creating socket monitor: %s\n", zmq_strerror(zmq_errno())); + goto clean_exit; + } + + // create socket socket for monitoring and connect monitor + q->socket_monitor = zmq_socket(zmq_ctx, ZMQ_PAIR); + ret = zmq_connect(q->socket_monitor, "inproc://monitor-client"); + if (ret) { + fprintf(stderr, "Error: connecting monitor socket: %s\n", zmq_strerror(zmq_errno())); + goto clean_exit; + } +#endif // ZMQ_MONITOR + rf_zmq_info(q->id, "Connecting receiver: %s\n", sock_args); ret = zmq_connect(q->sock, sock_args); @@ -242,4 +260,11 @@ void rf_zmq_rx_close(rf_zmq_rx_t* q) zmq_close(q->sock); q->sock = NULL; } + +#if ZMQ_MONITOR + if (q->socket_monitor) { + zmq_close(q->socket_monitor); + q->socket_monitor = NULL; + } +#endif // ZMQ_MONITOR } diff --git a/lib/src/phy/rf/rf_zmq_imp_trx.h b/lib/src/phy/rf/rf_zmq_imp_trx.h index 732912c97..65cfdb03b 100644 --- a/lib/src/phy/rf/rf_zmq_imp_trx.h +++ b/lib/src/phy/rf/rf_zmq_imp_trx.h @@ -33,6 +33,7 @@ /* Definitions */ #define VERBOSE (0) +#define ZMQ_MONITOR (0) #define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t)) #define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t)) #define ZMQ_MAX_BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz @@ -62,6 +63,10 @@ typedef struct { uint32_t socket_type; rf_zmq_format_t sample_format; void* sock; +#if ZMQ_MONITOR + void* socket_monitor; + bool tx_connected; +#endif uint64_t nsamples; bool running; pthread_t thread;