diff --git a/lte/examples/pdsch_enodeb.c b/lte/examples/pdsch_enodeb.c index 25bf52f1c..a40dd75db 100644 --- a/lte/examples/pdsch_enodeb.c +++ b/lte/examples/pdsch_enodeb.c @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include "liblte/phy/phy.h" #include "liblte/rrc/rrc.h" @@ -56,16 +58,16 @@ lte_cell_t cell = { PHICH_NORM // PHICH length }; -int udp_port = -1; // -1 generates random data +int net_port = -1; // -1 generates random data uint32_t cfi=1; -uint32_t mcs_idx = 12, last_mcs_idx = 12; +uint32_t mcs_idx = 1, last_mcs_idx = 1; int nof_frames = -1; char *uhd_args = ""; float uhd_amp = 0.1, uhd_gain = 70.0, uhd_freq = 2400000000; -udpsource_t udp_source; +bool null_file_sink=false; filesink_t fsink; lte_fft_t ifft; pbch_t pbch; @@ -74,10 +76,22 @@ pdcch_t pdcch; pdsch_t pdsch; pdsch_harq_t harq_process; regs_t regs; +ra_pdsch_t ra_dl; + cf_t *sf_buffer = NULL, *output_buffer = NULL; int sf_n_re, sf_n_samples; +pthread_t net_thread; +void *net_thread_fnc(void *arg); +sem_t net_sem; +bool net_packet_ready = false; +netsource_t net_source; +netsink_t net_sink; + +int prbset_num = 1, last_prbset_num = 1; +int prbset_orig = 0; + void usage(char *prog) { printf("Usage: %s [agmfoncvpu]\n", prog); #ifndef DISABLE_UHD @@ -93,7 +107,7 @@ void usage(char *prog) { printf("\t-n number of frames [Default %d]\n", nof_frames); printf("\t-c cell id [Default %d]\n", cell.id); printf("\t-p nof_prb [Default %d]\n", cell.nof_prb); - printf("\t-u listen UDP port for input data (-1 is random) [Default %d]\n", udp_port); + printf("\t-u listen UDP port for input data (-1 is random) [Default %d]\n", net_port); printf("\t-v [set verbose to debug, default none]\n"); } @@ -120,7 +134,7 @@ void parse_args(int argc, char **argv) { mcs_idx = atoi(argv[optind]); break; case 'u': - udp_port = atoi(argv[optind]); + net_port = atoi(argv[optind]); break; case 'n': nof_frames = atoi(argv[optind]); @@ -162,9 +176,14 @@ void base_init() { } /* open file or USRP */ if (output_file_name) { - if (filesink_init(&fsink, output_file_name, COMPLEX_FLOAT_BIN)) { - fprintf(stderr, "Error opening file %s\n", output_file_name); - exit(-1); + if (strcmp(output_file_name, "NULL")) { + if (filesink_init(&fsink, output_file_name, COMPLEX_FLOAT_BIN)) { + fprintf(stderr, "Error opening file %s\n", output_file_name); + exit(-1); + } + null_file_sink = false; + } else { + null_file_sink = true; } } else { #ifndef DISABLE_UHD @@ -179,18 +198,21 @@ void base_init() { #endif } - if (udp_port > 0) { - if (udpsource_init(&udp_source, "0.0.0.0", udp_port)) { - fprintf(stderr, "Error creating input UDP socket at port %d\n", udp_port); + if (net_port > 0) { + if (netsource_init(&net_source, "0.0.0.0", net_port, NETSOURCE_TCP)) { + fprintf(stderr, "Error creating input UDP socket at port %d\n", net_port); exit(-1); } - - if (udpsource_set_timeout(&udp_source, 5)) { - fprintf(stderr, "Error setting UDP socket timeout\n"); + if (null_file_sink) { + if (netsink_init(&net_sink, "127.0.0.1", net_port+1, NETSINK_TCP)) { + fprintf(stderr, "Error sink\n"); + exit(-1); + } + } + if (sem_init(&net_sem, 0, 1)) { + perror("sem_init"); exit(-1); } - - printf("Opened UDP socket at port %d\n", udp_port); } /* create ifft object */ @@ -253,21 +275,21 @@ void base_free() { free(output_buffer); } if (output_file_name) { - filesink_free(&fsink); + if (!null_file_sink) { + filesink_free(&fsink); + } } else { #ifndef DISABLE_UHD cuhd_close(&uhd); #endif } - if (udp_port > 0) { - udpsource_free(&udp_source); + if (net_port > 0) { + netsource_free(&net_source); + sem_close(&net_sem); } } -int prbset_num = 1, last_prbset_num = 1; -int prbset_orig = 0; - unsigned int reverse(register unsigned int x) { @@ -289,33 +311,33 @@ uint32_t prbset_to_bitmask() { return reverse(mask)>>(32-cell.nof_prb); } -int update_radl(ra_pdsch_t *ra_dl) { +int update_radl() { ra_prb_t prb_alloc; - bzero(ra_dl, sizeof(ra_pdsch_t)); - ra_dl->harq_process = 0; - ra_dl->mcs_idx = mcs_idx; - ra_dl->ndi = 0; - ra_dl->rv_idx = 0; - ra_dl->alloc_type = alloc_type0; - ra_dl->type0_alloc.rbg_bitmask = prbset_to_bitmask(); + bzero(&ra_dl, sizeof(ra_pdsch_t)); + ra_dl.harq_process = 0; + ra_dl.mcs_idx = mcs_idx; + ra_dl.ndi = 0; + ra_dl.rv_idx = 0; + ra_dl.alloc_type = alloc_type0; + ra_dl.type0_alloc.rbg_bitmask = prbset_to_bitmask(); - ra_prb_get_dl(&prb_alloc, ra_dl, cell.nof_prb); + ra_prb_get_dl(&prb_alloc, &ra_dl, cell.nof_prb); ra_prb_get_re_dl(&prb_alloc, cell.nof_prb, 1, cell.nof_prb<10?(cfi+1):cfi, CPNORM); - ra_mcs_from_idx_dl(mcs_idx, cell.nof_prb, &ra_dl->mcs); + ra_mcs_from_idx_dl(mcs_idx, cell.nof_prb, &ra_dl.mcs); - ra_pdsch_fprint(stdout, ra_dl, cell.nof_prb); + ra_pdsch_fprint(stdout, &ra_dl, cell.nof_prb); - if (pdsch_harq_setup(&harq_process, ra_dl->mcs, &prb_alloc)) { + if (pdsch_harq_setup(&harq_process, ra_dl.mcs, &prb_alloc)) { fprintf(stderr, "Error configuring HARQ process\n"); return -1; } - + return 0; } /* Read new MCS from stdin */ -int update_control(ra_pdsch_t *ra_dl) { +int update_control() { char input[128]; fd_set set; @@ -355,11 +377,11 @@ int update_control(ra_pdsch_t *ra_dl) { mcs_idx = atoi(input); } bzero(input,sizeof(input)); - if (update_radl(ra_dl)) { + if (update_radl()) { printf("Trying with last known MCS index\n"); mcs_idx = last_mcs_idx; prbset_num = last_prbset_num; - return update_radl(ra_dl); + return update_radl(); } } return 0; @@ -372,7 +394,42 @@ int update_control(ra_pdsch_t *ra_dl) { } } -uint8_t data[10000], data_unpacked[10000]; +#define DATA_BUFF_SZ 10000 +uint8_t data[DATA_BUFF_SZ], data_unpacked[DATA_BUFF_SZ]; +uint8_t data_tmp[DATA_BUFF_SZ]; + +/** Function run in a separate thread to receive UDP data */ +void *net_thread_fnc(void *arg) { + int n; + int rpm = 0, wpm=0; + + do { + n = netsource_read(&net_source, &data_unpacked[rpm], DATA_BUFF_SZ-rpm); + if (n > 0) { + int nbytes = 1+(ra_dl.mcs.tbs-1)/8; + rpm += n; + printf("received %d bytes. rpm=%d/%d\n",n,rpm,nbytes); + wpm = 0; + while (rpm >= nbytes) { + // wait for packet to be transmitted + sem_wait(&net_sem); + bit_pack_vector(&data_unpacked[wpm], data, nbytes*8); + printf("Sent %d/%d bytes ready\n", nbytes, rpm); + rpm -= nbytes; + wpm += nbytes; + net_packet_ready = true; + } + if (wpm > 0) { + INFO("%d bytes left in buffer for next packet\n", rpm); + memcpy(data_unpacked, &data_unpacked[wpm], rpm * sizeof(uint8_t)); + } + } else if (n < 0) { + fprintf(stderr, "Error receiving from network\n"); + exit(-1); + } + } while(n >= 0); + return NULL; +} int main(int argc, char **argv) { int nf, sf_idx, N_id_2; @@ -380,7 +437,6 @@ int main(int argc, char **argv) { float sss_signal0[SSS_LEN]; // for subframe 0 float sss_signal5[SSS_LEN]; // for subframe 5 uint8_t bch_payload[BCH_PAYLOAD_LEN], bch_payload_packed[BCH_PAYLOAD_LEN/8]; - ra_pdsch_t ra_dl; int i; cf_t *sf_symbols[MAX_PORTS]; cf_t *slot1_symbols[MAX_PORTS]; @@ -406,6 +462,9 @@ int main(int argc, char **argv) { cell.phich_resources = R_1; sfn = 0; + prbset_num = cell.nof_prb; + last_prbset_num = cell.nof_prb; + /* this *must* be called after setting slot_len_* */ base_init(); @@ -434,10 +493,17 @@ int main(int argc, char **argv) { } #endif - if (update_radl(&ra_dl)) { + if (update_radl()) { exit(-1); } + if (net_port > 0) { + if (pthread_create(&net_thread, NULL, net_thread_fnc, NULL)) { + perror("pthread_create"); + exit(-1); + } + } + /* Initiate valid DCI locations */ for (i=0;i 0) { - int n = udpsource_read(&udp_source, data_unpacked, 1+(ra_dl.mcs.tbs-1)/8); - if (n > 0) { - bit_pack_vector(data_unpacked, data, n*8); - send_data = true; - } else if (n == 0) { - send_data = false; - } else { - fprintf(stderr, "Error receiving from UDP socket\n"); - exit(-1); + if (net_port > 0) { + send_data = net_packet_ready; + if (net_packet_ready) { + INFO("Transmitting packet\n",0); } } else { INFO("SF: %d, Generating %d random bits\n", sf_idx, ra_dl.mcs.tbs); @@ -508,15 +568,27 @@ int main(int argc, char **argv) { fprintf(stderr, "Error encoding PDSCH\n"); exit(-1); } + if (net_port > 0 && net_packet_ready) { + if (null_file_sink) { + bit_unpack_vector(data, data_tmp, ra_dl.mcs.tbs); + if (netsink_write(&net_sink, data_tmp, 1+(ra_dl.mcs.tbs-1)/8) < 0) { + fprintf(stderr, "Error sending data through UDP socket\n"); + } + } + net_packet_ready = false; + sem_post(&net_sem); + } } - + /* Transform to OFDM symbols */ lte_ifft_run_sf(&ifft, sf_buffer, output_buffer); /* send to file or usrp */ if (output_file_name) { - filesink_write(&fsink, output_buffer, sf_n_samples); - usleep(5000); + if (!null_file_sink) { + filesink_write(&fsink, output_buffer, sf_n_samples); + } + usleep(1000); } else { #ifndef DISABLE_UHD vec_sc_prod_cfc(output_buffer, uhd_amp, output_buffer, sf_n_samples); diff --git a/lte/examples/pdsch_ue.c b/lte/examples/pdsch_ue.c index 30c405ff7..f1778210b 100644 --- a/lte/examples/pdsch_ue.c +++ b/lte/examples/pdsch_ue.c @@ -70,8 +70,8 @@ typedef struct { char *uhd_args; float uhd_freq; float uhd_gain; - int udp_port; - char *udp_address; + int net_port; + char *net_address; }prog_args_t; void args_default(prog_args_t *args) { @@ -81,8 +81,8 @@ void args_default(prog_args_t *args) { args->uhd_args = ""; args->uhd_freq = -1.0; args->uhd_gain = 60.0; - args->udp_port = -1; - args->udp_address = "127.0.0.1"; + args->net_port = -1; + args->net_address = "127.0.0.1"; } void usage(prog_args_t *args, char *prog) { @@ -97,8 +97,8 @@ void usage(prog_args_t *args, char *prog) { printf("\t plots are disabled. Graphics library not available\n"); #endif printf("\t-n nof_subframes [Default %d]\n", args->nof_subframes); - printf("\t-u remote UDP port to send data (-1 does nothing with it) [Default %d]\n", args->udp_port); - printf("\t-U remote UDP address to send data [Default %s]\n", args->udp_address); + printf("\t-u remote UDP port to send data (-1 does nothing with it) [Default %d]\n", args->net_port); + printf("\t-U remote UDP address to send data [Default %s]\n", args->net_address); printf("\t-v [set verbose to debug, default none]\n"); } @@ -126,10 +126,10 @@ void parse_args(prog_args_t *args, int argc, char **argv) { args->force_N_id_2 = atoi(argv[optind]); break; case 'u': - args->udp_port = atoi(argv[optind]); + args->net_port = atoi(argv[optind]); break; case 'U': - args->udp_address = argv[optind]; + args->net_address = argv[optind]; break; case 'd': args->disable_plots = true; @@ -186,13 +186,13 @@ int main(int argc, char **argv) { int n; uint8_t bch_payload[BCH_PAYLOAD_LEN], bch_payload_unpacked[BCH_PAYLOAD_LEN]; uint32_t sfn_offset; - udpsink_t udp_sink; + netsink_t net_sink; parse_args(&prog_args, argc, argv); - if (prog_args.udp_port > 0) { - if (udpsink_init(&udp_sink, prog_args.udp_address, prog_args.udp_port)) { - fprintf(stderr, "Error initiating UDP socket to %s:%d\n", prog_args.udp_address, prog_args.udp_port); + if (prog_args.net_port > 0) { + if (netsink_init(&net_sink, prog_args.net_address, prog_args.net_port, NETSINK_TCP)) { + fprintf(stderr, "Error initiating UDP socket to %s:%d\n", prog_args.net_address, prog_args.net_port); exit(-1); } } @@ -228,7 +228,6 @@ int main(int argc, char **argv) { return LIBLTE_ERROR; } - INFO("Stopping UHD and flushing buffer...\r",0); cuhd_stop_rx_stream(uhd); cuhd_flush_buffer(uhd); @@ -318,9 +317,9 @@ int main(int argc, char **argv) { exit(-1); } else if (n > 0) { /* Send data if socket active */ - if (prog_args.udp_port > 0) { + if (prog_args.net_port > 0) { bit_unpack_vector(data_packed, data, n); - if (udpsink_write(&udp_sink, data, 1+(n-1)/8) < 0) { + if (netsink_write(&net_sink, data, 1+(n-1)/8) < 0) { fprintf(stderr, "Error sending data through UDP socket\n"); } } diff --git a/lte/phy/include/liblte/phy/io/udpsink.h b/lte/phy/include/liblte/phy/io/netsink.h similarity index 73% rename from lte/phy/include/liblte/phy/io/udpsink.h rename to lte/phy/include/liblte/phy/io/netsink.h index 61b2f000c..ef36dd81e 100644 --- a/lte/phy/include/liblte/phy/io/udpsink.h +++ b/lte/phy/include/liblte/phy/io/netsink.h @@ -26,8 +26,8 @@ */ -#ifndef UDPSINK_ -#define UDPSINK_ +#ifndef NETSINK_ +#define NETSINK_ #include #include @@ -41,23 +41,26 @@ typedef struct LIBLTE_API { int sockfd; struct sockaddr_in servaddr; -}udpsink_t; +}netsink_t; -LIBLTE_API int udpsink_init(udpsink_t *q, +typedef enum {NETSINK_UDP, NETSINK_TCP} netsink_type_t; + +LIBLTE_API int netsink_init(netsink_t *q, char *address, - int port); + int port, + netsink_type_t type); -LIBLTE_API void udpsink_free(udpsink_t *q); +LIBLTE_API void netsink_free(netsink_t *q); -LIBLTE_API int udpsink_write(udpsink_t *q, +LIBLTE_API int netsink_write(netsink_t *q, void *buffer, int nof_bytes); /* High-level API */ typedef struct LIBLTE_API { - udpsink_t obj; - struct udpsink_init { + netsink_t obj; + struct netsink_init { char *address; int port; int block_length; @@ -65,10 +68,10 @@ typedef struct LIBLTE_API { } init; void* input; int in_len; -}udpsink_hl; +}netsink_hl; -LIBLTE_API int udpsink_initialize(udpsink_hl* h); -LIBLTE_API int udpsink_work( udpsink_hl* hl); -LIBLTE_API int udpsink_stop(udpsink_hl* h); +LIBLTE_API int netsink_initialize(netsink_hl* h); +LIBLTE_API int netsink_work( netsink_hl* hl); +LIBLTE_API int netsink_stop(netsink_hl* h); #endif // UDPSINK_ diff --git a/lte/phy/include/liblte/phy/io/udpsource.h b/lte/phy/include/liblte/phy/io/netsource.h similarity index 67% rename from lte/phy/include/liblte/phy/io/udpsource.h rename to lte/phy/include/liblte/phy/io/netsource.h index e245e33fd..5a12f7081 100644 --- a/lte/phy/include/liblte/phy/io/udpsource.h +++ b/lte/phy/include/liblte/phy/io/netsource.h @@ -26,8 +26,8 @@ */ -#ifndef UDPSOURCE_ -#define UDPSOURCE_ +#ifndef NETSOURCE_ +#define NETSOURCE_ #include #include @@ -38,45 +38,51 @@ #include "liblte/config.h" +typedef enum {NETSOURCE_UDP, NETSOURCE_TCP} netsource_type_t; + /* Low-level API */ typedef struct LIBLTE_API { int sockfd; + int connfd; struct sockaddr_in servaddr; -}udpsource_t; + netsource_type_t type; + struct sockaddr_in cliaddr; +}netsource_t; -LIBLTE_API int udpsource_init(udpsource_t *q, +LIBLTE_API int netsource_init(netsource_t *q, char *address, - int port); + int port, + netsource_type_t type); -LIBLTE_API void udpsource_free(udpsource_t *q); +LIBLTE_API void netsource_free(netsource_t *q); -LIBLTE_API int udpsource_set_nonblocking(udpsource_t *q); +LIBLTE_API int netsource_set_nonblocking(netsource_t *q); -LIBLTE_API int udpsource_read(udpsource_t *q, +LIBLTE_API int netsource_read(netsource_t *q, void *buffer, int nof_bytes); -LIBLTE_API int udpsource_set_timeout(udpsource_t *q, +LIBLTE_API int netsource_set_timeout(netsource_t *q, uint32_t microseconds); /* High-level API */ typedef struct LIBLTE_API { - udpsource_t obj; - struct udpsource_init { + netsource_t obj; + struct netsource_init { char *address; int port; int data_type; } init; - struct udpsource_ctrl_in { + struct netsource_ctrl_in { int nsamples; // Number of samples to read } ctrl_in; void* output; int out_len; -}udpsource_hl; +}netsource_hl; -LIBLTE_API int udpsource_initialize(udpsource_hl* h); -LIBLTE_API int udpsource_work( udpsource_hl* hl); -LIBLTE_API int udpsource_stop(udpsource_hl* h); +LIBLTE_API int netsource_initialize(netsource_hl* h); +LIBLTE_API int netsource_work( netsource_hl* hl); +LIBLTE_API int netsource_stop(netsource_hl* h); #endif // UDPSOURCE_ diff --git a/lte/phy/include/liblte/phy/phy.h b/lte/phy/include/liblte/phy/phy.h index 4e145614a..7d96fcb85 100644 --- a/lte/phy/include/liblte/phy/phy.h +++ b/lte/phy/include/liblte/phy/phy.h @@ -74,8 +74,8 @@ #include "liblte/phy/io/binsource.h" #include "liblte/phy/io/filesink.h" #include "liblte/phy/io/filesource.h" -#include "liblte/phy/io/udpsink.h" -#include "liblte/phy/io/udpsource.h" +#include "liblte/phy/io/netsink.h" +#include "liblte/phy/io/netsource.h" #include "liblte/phy/modem/demod_hard.h" #include "liblte/phy/modem/demod_soft.h" diff --git a/lte/phy/lib/io/src/udpsink.c b/lte/phy/lib/io/src/netsink.c similarity index 63% rename from lte/phy/lib/io/src/udpsink.c rename to lte/phy/lib/io/src/netsink.c index 74bf7b9cf..80fe5a22d 100644 --- a/lte/phy/lib/io/src/udpsink.c +++ b/lte/phy/lib/io/src/netsink.c @@ -35,12 +35,12 @@ #include -#include "liblte/phy/io/udpsink.h" +#include "liblte/phy/io/netsink.h" -int udpsink_init(udpsink_t *q, char *address, int port) { - bzero(q, sizeof(udpsink_t)); +int netsink_init(netsink_t *q, char *address, int port, netsink_type_t type) { + bzero(q, sizeof(netsink_t)); - q->sockfd=socket(AF_INET,SOCK_DGRAM,0); + q->sockfd=socket(AF_INET, type==NETSINK_TCP?SOCK_STREAM:SOCK_DGRAM,0); if (q->sockfd < 0) { perror("socket"); @@ -50,36 +50,41 @@ int udpsink_init(udpsink_t *q, char *address, int port) { q->servaddr.sin_family = AF_INET; q->servaddr.sin_addr.s_addr=inet_addr(address); q->servaddr.sin_port=htons(port); + + printf("Connecting to %s:%d\n", address, port); + if (connect(q->sockfd,&q->servaddr,sizeof(q->servaddr)) < 0) { + perror("connect"); + return -1; + } return 0; } -void udpsink_free(udpsink_t *q) { +void netsink_free(netsink_t *q) { if (q->sockfd) { close(q->sockfd); } - bzero(q, sizeof(udpsink_t)); + bzero(q, sizeof(netsink_t)); } -int udpsink_write(udpsink_t *q, void *buffer, int nof_bytes) { - return sendto(q->sockfd, buffer, nof_bytes, 0, - &q->servaddr, sizeof(struct sockaddr_in)); +int netsink_write(netsink_t *q, void *buffer, int nof_bytes) { + return write(q->sockfd, buffer, nof_bytes); } -int udpsink_initialize(udpsink_hl* h) { - return udpsink_init(&h->obj, h->init.address, h->init.port); +int netsink_initialize(netsink_hl* h) { + return netsink_init(&h->obj, h->init.address, h->init.port, NETSINK_UDP); } -int udpsink_work(udpsink_hl* h) { - if (udpsink_write(&h->obj, h->input, h->in_len)<0) { +int netsink_work(netsink_hl* h) { + if (netsink_write(&h->obj, h->input, h->in_len)<0) { return -1; } return 0; } -int udpsink_stop(udpsink_hl* h) { - udpsink_free(&h->obj); +int netsink_stop(netsink_hl* h) { + netsink_free(&h->obj); return 0; } diff --git a/lte/phy/lib/io/src/udpsource.c b/lte/phy/lib/io/src/netsource.c similarity index 54% rename from lte/phy/lib/io/src/udpsource.c rename to lte/phy/lib/io/src/netsource.c index cd2bf59ba..b28d5f035 100644 --- a/lte/phy/lib/io/src/udpsource.c +++ b/lte/phy/lib/io/src/netsource.c @@ -35,17 +35,18 @@ #include #include -#include "liblte/phy/io/udpsource.h" +#include "liblte/phy/io/netsource.h" -int udpsource_init(udpsource_t *q, char *address, int port) { - bzero(q, sizeof(udpsource_t)); +int netsource_init(netsource_t *q, char *address, int port, netsource_type_t type) { + bzero(q, sizeof(netsource_t)); - q->sockfd=socket(AF_INET,SOCK_DGRAM,0); + q->sockfd=socket(AF_INET,type==NETSOURCE_TCP?SOCK_STREAM:SOCK_DGRAM,0); if (q->sockfd < 0) { perror("socket"); return -1; } + q->type = type; q->servaddr.sin_family = AF_INET; q->servaddr.sin_addr.s_addr=inet_addr(address); @@ -55,32 +56,58 @@ int udpsource_init(udpsource_t *q, char *address, int port) { perror("bind"); return -1; } + q->connfd = 0; return 0; } -void udpsource_free(udpsource_t *q) { +void netsource_free(netsource_t *q) { if (q->sockfd) { close(q->sockfd); } - bzero(q, sizeof(udpsource_t)); + bzero(q, sizeof(netsource_t)); } -int udpsource_read(udpsource_t *q, void *buffer, int nbytes) { - int n = recv(q->sockfd, buffer, nbytes, 0); - - if (n == -1) { - if (errno == EAGAIN) { - return 0; +int netsource_read(netsource_t *q, void *buffer, int nbytes) { + if (q->type == NETSOURCE_UDP) { + int n = recv(q->sockfd, buffer, nbytes, 0); + + if (n == -1) { + if (errno == EAGAIN) { + return 0; + } else { + return -1; + } } else { - return -1; - } + return n; + } } else { + if (q->connfd == 0) { + printf("Waiting for TCP connection\n"); + listen(q->sockfd, 1); + socklen_t clilen = sizeof(q->cliaddr); + q->connfd = accept(q->sockfd, (struct sockaddr *)&q->cliaddr, &clilen); + if (q->connfd < 0) { + perror("accept"); + return -1; + } + } + int n = read(q->connfd, buffer, nbytes); + if (n == -1) { + if (errno == ECONNRESET) { + printf("Connection closed\n"); + close(q->connfd); + q->connfd = 0; + return 0; + } else { + perror("read"); + } + } return n; } } -int udpsource_set_nonblocking(udpsource_t *q) { +int netsource_set_nonblocking(netsource_t *q) { if (fcntl(q->sockfd, F_SETFL, O_NONBLOCK)) { perror("fcntl"); return -1; @@ -88,7 +115,7 @@ int udpsource_set_nonblocking(udpsource_t *q) { return 0; } -int udpsource_set_timeout(udpsource_t *q, uint32_t microseconds) { +int netsource_set_timeout(netsource_t *q, uint32_t microseconds) { struct timeval t; t.tv_sec = 0; t.tv_usec = microseconds; @@ -99,19 +126,19 @@ int udpsource_set_timeout(udpsource_t *q, uint32_t microseconds) { return 0; } -int udpsource_initialize(udpsource_hl* h) { - return udpsource_init(&h->obj, h->init.address, h->init.port); +int netsource_initialize(netsource_hl* h) { + return netsource_init(&h->obj, h->init.address, h->init.port, NETSOURCE_UDP); } -int udpsource_work(udpsource_hl* h) { - h->out_len = udpsource_read(&h->obj, h->output, h->ctrl_in.nsamples); +int netsource_work(netsource_hl* h) { + h->out_len = netsource_read(&h->obj, h->output, h->ctrl_in.nsamples); if (h->out_len < 0) { return -1; } return 0; } -int udpsource_stop(udpsource_hl* h) { - udpsource_free(&h->obj); +int netsource_stop(netsource_hl* h) { + netsource_free(&h->obj); return 0; }