zmq: add monitor code for sockets but disable it

according to the ZMQ dev guide, one can use a socket monitor
to get informed about changes to a socket, e.g. when a client disconnected.

This is useful to detect when a UE dropped the connection to reinitialize
the socket and timers in a eNB.

This commit adds code to create such a socket monitor but leaves
it disabled because it still doesn't work as expected.
master
Andre Puschmann 5 years ago
parent 995774c830
commit 6a50fe3233

@ -657,6 +657,47 @@ void rf_zmq_get_time(void* h, time_t* secs, double* frac_secs)
} }
} }
#if ZMQ_MONITOR
static int rf_zmq_rx_get_monitor_event(void* monitor, int* value, char** address)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, 0) == -1) {
printf("zmq_msg_recv failed!\n");
return -1; // Interruped, presumably
}
if (zmq_msg_more(&msg)) {
printf("more to read\n");
}
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
uint16_t event = *(uint16_t*)(data);
if (value) {
*value = *(uint32_t*)(data + 2);
}
// Second frame in message contains event address
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, 0) == -1) {
return -1; // Interruped, presumably
}
if (zmq_msg_more(&msg)) {
printf("error in msg_more \n");
}
if (address) {
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
size_t size = zmq_msg_size(&msg);
*address = (char*)malloc(size + 1);
memcpy(*address, data, size);
*address[size] = 0;
}
return event;
}
#endif // ZMQ_MONITOR
int rf_zmq_recv_with_time(void* h, void* data, uint32_t nsamples, bool blocking, time_t* secs, double* frac_secs) int rf_zmq_recv_with_time(void* h, void* data, uint32_t nsamples, bool blocking, time_t* secs, double* frac_secs)
{ {
return rf_zmq_recv_with_time_multi(h, &data, nsamples, blocking, secs, frac_secs); return rf_zmq_recv_with_time_multi(h, &data, nsamples, blocking, secs, frac_secs);
@ -757,6 +798,23 @@ int rf_zmq_recv_with_time_multi(void* h,
if (count[i] < nsamples_baserate && handler->receiver[i].running) { if (count[i] < nsamples_baserate && handler->receiver[i].running) {
// Keep receiving // Keep receiving
int32_t n = rf_zmq_rx_baseband(&handler->receiver[i], &ptr[count[i]], nsamples_baserate); int32_t n = rf_zmq_rx_baseband(&handler->receiver[i], &ptr[count[i]], nsamples_baserate);
#if ZMQ_MONITOR
// handle socket events
int event = rf_zmq_rx_get_monitor_event(handler->receiver[i].socket_monitor, NULL, NULL);
if (event != -1) {
printf("event=0x%X\n", event);
switch (event) {
case ZMQ_EVENT_CONNECTED:
handler->receiver[i].tx_connected = true;
break;
case ZMQ_EVENT_CLOSED:
handler->receiver[i].tx_connected = false;
break;
default:
break;
}
}
#endif // ZMQ_MONITOR
if (n > 0) { if (n > 0) {
// No error // No error
count[i] += n; count[i] += n;

@ -121,11 +121,29 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock
q->socket_type = opts.socket_type; q->socket_type = opts.socket_type;
q->sample_format = opts.sample_format; q->sample_format = opts.sample_format;
q->frequency_mhz = opts.frequency_mhz; q->frequency_mhz = opts.frequency_mhz;
q->fail_on_disconnect = opts.fail_on_disconnect;
if (opts.socket_type == ZMQ_SUB) { if (opts.socket_type == ZMQ_SUB) {
zmq_setsockopt(q->sock, ZMQ_SUBSCRIBE, "", 0); zmq_setsockopt(q->sock, ZMQ_SUBSCRIBE, "", 0);
} }
#if ZMQ_MONITOR
// Monitor all events (monitoring only works over inproc://)
ret = zmq_socket_monitor(q->sock, "inproc://monitor-client", ZMQ_EVENT_ALL);
if (ret == -1) {
fprintf(stderr, "Error: creating socket monitor: %s\n", zmq_strerror(zmq_errno()));
goto clean_exit;
}
// create socket socket for monitoring and connect monitor
q->socket_monitor = zmq_socket(zmq_ctx, ZMQ_PAIR);
ret = zmq_connect(q->socket_monitor, "inproc://monitor-client");
if (ret) {
fprintf(stderr, "Error: connecting monitor socket: %s\n", zmq_strerror(zmq_errno()));
goto clean_exit;
}
#endif // ZMQ_MONITOR
rf_zmq_info(q->id, "Connecting receiver: %s\n", sock_args); rf_zmq_info(q->id, "Connecting receiver: %s\n", sock_args);
ret = zmq_connect(q->sock, sock_args); ret = zmq_connect(q->sock, sock_args);
@ -242,4 +260,11 @@ void rf_zmq_rx_close(rf_zmq_rx_t* q)
zmq_close(q->sock); zmq_close(q->sock);
q->sock = NULL; q->sock = NULL;
} }
#if ZMQ_MONITOR
if (q->socket_monitor) {
zmq_close(q->socket_monitor);
q->socket_monitor = NULL;
}
#endif // ZMQ_MONITOR
} }

@ -33,6 +33,7 @@
/* Definitions */ /* Definitions */
#define VERBOSE (0) #define VERBOSE (0)
#define ZMQ_MONITOR (0)
#define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t)) #define NSAMPLES2NBYTES(X) (((uint32_t)(X)) * sizeof(cf_t))
#define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t)) #define NBYTES2NSAMPLES(X) ((X) / sizeof(cf_t))
#define ZMQ_MAX_BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz #define ZMQ_MAX_BUFFER_SIZE (NSAMPLES2NBYTES(3072000)) // 10 subframes at 20 MHz
@ -62,6 +63,10 @@ typedef struct {
uint32_t socket_type; uint32_t socket_type;
rf_zmq_format_t sample_format; rf_zmq_format_t sample_format;
void* sock; void* sock;
#if ZMQ_MONITOR
void* socket_monitor;
bool tx_connected;
#endif
uint64_t nsamples; uint64_t nsamples;
bool running; bool running;
pthread_t thread; pthread_t thread;

Loading…
Cancel
Save