1 /*  RetroArch - A frontend for libretro.
2  *  Copyright (C) 2010-2014 - Hans-Kristian Arntzen
3  *  Copyright (C) 2011-2017 - Daniel De Matteis
4  *
5  *  RetroArch is free software: you can redistribute it and/or modify it under the terms
6  *  of the GNU General Public License as published by the Free Software Found-
7  *  ation, either version 3 of the License, or (at your option) any later version.
8  *
9  *  RetroArch is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
10  *  without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
11  *  PURPOSE.  See the GNU General Public License for more details.
12  *
13  *  You should have received a copy of the GNU General Public License along with RetroArch.
14  *  If not, see <http://www.gnu.org/licenses/>.
15  */
16 
17 /*  RSound - A PCM audio client/server
18  *  Copyright (C) 2010 - Hans-Kristian Arntzen
19  *
20  *  RSound is free software: you can redistribute it and/or modify it under the terms
21  *  of the GNU General Public License as published by the Free Software Found-
22  *  ation, either version 3 of the License, or (at your option) any later version.
23  *
24  *  RSound is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
25  *  without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
26  *  PURPOSE.  See the GNU General Public License for more details.
27  *
28  *  You should have received a copy of the GNU General Public License along with RSound.
29  *  If not, see <http://www.gnu.org/licenses/>.
30  */
31 
32 #include "drivers/rsound.h"
33 
34 #ifdef __PS3__
35 #ifdef __PSL1GHT__
36 #include <sysmodule/sysmodule.h>
37 #include <sys/systime.h>
38 #include <net/net.h>
39 #else
40 #include <cell/sysmodule.h>
41 #include <sys/timer.h>
42 #include <sys/sys_time.h>
43 #include <netex/net.h>
44 #include <netex/errno.h>
45 #endif
46 #endif
47 
48 #if defined(GEKKO)
49 #include <network.h>
50 #else
51 #define NETWORK_COMPAT_HEADERS 1
52 #endif
53 
54 #ifdef NETWORK_COMPAT_HEADERS
55 #include <sys/socket.h>
56 #include <netdb.h>
57 #include <netinet/in.h>
58 #include <netinet/tcp.h>
59 #include <arpa/inet.h>
60 #ifdef __PS3__
61 #ifdef __PSL1GHT__
62 #include <net/poll.h>
63 #else
64 #include <sys/poll.h>
65 #endif
66 #else
67 #include <poll.h>
68 #endif
69 #endif
70 #include <fcntl.h>
71 #ifdef _WIN32
72 #include <direct.h>
73 #else
74 #include <unistd.h>
75 #endif
76 
77 #include <ctype.h>
78 #include <stdlib.h>
79 #include <string.h>
80 #include <stdarg.h>
81 #include <time.h>
82 #include <errno.h>
83 
84 #include <compat/strl.h>
85 #include <retro_inline.h>
86 #include <retro_assert.h>
87 #include <retro_miscellaneous.h>
88 #include <retro_timers.h>
89 
90 /*
91  ****************************************************************************
92  Naming convention. Functions for use in API are called rsd_*(),         *
93  internal function are called rsnd_*()                                   *
94  ****************************************************************************
95  */
96 
97 // Internal enumerations
98 enum rsd_logtype
99 {
100    RSD_LOG_DEBUG = 0,
101    RSD_LOG_WARN,
102    RSD_LOG_ERR
103 };
104 
105 enum rsd_conn_type
106 {
107    RSD_CONN_TCP = 0x0000,
108    RSD_CONN_UNIX = 0x0001,
109    RSD_CONN_DECNET = 0x0002,
110 
111    RSD_CONN_PROTO = 0x100
112 };
113 
114 // Some logging macros.
115 #define RSD_WARN(fmt, args...)
116 #define RSD_ERR(fmt, args...)
117 #define RSD_DEBUG(fmt, args...)
118 
119 #if defined(__PS3__)
120 static int init_count = 0;
121 #define pollfd_fd(x) x.fd
122 #define net_send(a,b,c,d) send(a,b,c,d)
123 #define net_socket(a,b,c) socket(a,b,c)
124 #define net_connect(a,b,c) connect(a,b,c)
125 #define net_shutdown(a,b) shutdown(a,b)
126 #define net_socketclose(x) socketclose(x)
127 #define net_recv(a,b,c,d) recv(a,b,c,d)
128 #elif defined(GEKKO)
129 #define SHUT_RD 0
130 
131 #define socketpoll(x, y, z)  net_poll(x, y, z)
132 #define pollfd pollsd
133 #define pollfd_fd(x) x.socket
134 #define gethostbyname net_gethostbyname
135 #define getsockopt net_getsockopt
136 #define setsockopt net_setsockopt
137 #define net_send(a,b,c,d) net_send(a,b,c,d)
138 #define net_socket(a,b,c) net_socket(a,b,c)
139 #define net_connect(a,b,c) net_connect(a,b,c)
140 #define net_shutdown(a,b) net_shutdown(a,b)
141 #define net_socketclose(x) net_close(x)
142 #define net_recv(a,b,c,d) net_recv(a,b,c,d)
143 #else
144 #define pollfd_fd(x) x.fd
145 #define net_socket(a,b,c) socket(a,b,c)
146 #define socketpoll(x, y, z)  poll(x, y, z)
147 #define net_send(a,b,c,d) send(a,b,c,d)
148 #define net_connect(a,b,c) connect(a,b,c)
149 #define net_shutdown(a,b) shutdown(a,b)
150 #define net_socketclose(x) close(x)
151 #define net_recv(a,b,c,d) recv(a,b,c,d)
152 #endif
153 
154 static ssize_t rsnd_send_chunk(int socket, const void *buf, size_t size, int blocking);
155 static ssize_t rsnd_recv_chunk(int socket, void *buf, size_t size, int blocking);
156 static int rsnd_start_thread(rsound_t *rd);
157 static int rsnd_stop_thread(rsound_t *rd);
158 static size_t rsnd_get_delay(rsound_t *rd);
159 static size_t rsnd_get_ptr(rsound_t *rd);
160 static int rsnd_reset(rsound_t *rd);
161 
162 // Protocol functions
163 static int rsnd_send_identity_info(rsound_t *rd);
164 static int rsnd_close_ctl(rsound_t *rd);
165 static int rsnd_send_info_query(rsound_t *rd);
166 static int rsnd_update_server_info(rsound_t *rd);
167 
168 static int rsnd_poll(struct pollfd *fd, int numfd, int timeout);
169 
170 static void rsnd_cb_thread(void *thread_data);
171 static void rsnd_thread(void *thread_data);
172 
173 /* Determine whether we're running big- or little endian */
rsnd_is_little_endian(void)174 static INLINE int rsnd_is_little_endian(void)
175 {
176    uint16_t i = 1;
177    return *((uint8_t*)&i);
178 }
179 
180 /* Simple functions for swapping bytes */
rsnd_swap_endian_16(uint16_t * x)181 static INLINE void rsnd_swap_endian_16 ( uint16_t * x )
182 {
183    *x = (*x>>8) | (*x<<8);
184 }
185 
rsnd_swap_endian_32(uint32_t * x)186 static INLINE void rsnd_swap_endian_32 ( uint32_t * x )
187 {
188    *x =  (*x >> 24 ) |
189       ((*x<<8) & 0x00FF0000) |
190       ((*x>>8) & 0x0000FF00) |
191       (*x << 24);
192 }
193 
rsnd_format_to_samplesize(uint16_t fmt)194 static INLINE int rsnd_format_to_samplesize ( uint16_t fmt )
195 {
196    switch(fmt)
197    {
198       case RSD_S32_LE:
199       case RSD_S32_BE:
200       case RSD_S32_NE:
201       case RSD_U32_LE:
202       case RSD_U32_BE:
203       case RSD_U32_NE:
204          return 4;
205 
206       case RSD_S16_LE:
207       case RSD_U16_LE:
208       case RSD_S16_BE:
209       case RSD_U16_BE:
210       case RSD_S16_NE:
211       case RSD_U16_NE:
212          return 2;
213 
214       case RSD_U8:
215       case RSD_S8:
216       case RSD_ALAW:
217       case RSD_MULAW:
218          return 1;
219 
220       default:
221          return -1;
222    }
223 }
224 
rsd_samplesize(rsound_t * rd)225 int rsd_samplesize( rsound_t *rd )
226 {
227    retro_assert(rd != NULL);
228    return rd->samplesize;
229 }
230 
231 /* Creates sockets and attempts to connect to the server. Returns -1 when failed, and 0 when success. */
rsnd_connect_server(rsound_t * rd)232 static int rsnd_connect_server( rsound_t *rd )
233 {
234    struct sockaddr_in addr;
235    struct pollfd fd;
236    int i = 1;
237    (void)i;
238 
239    memset(&addr, 0, sizeof(addr));
240    addr.sin_family = AF_INET;
241    addr.sin_port = htons(atoi(rd->port));
242 
243    if (!isdigit(rd->host[0]))
244    {
245       struct hostent *host = gethostbyname(rd->host);
246       if (!host)
247          return -1;
248 
249       addr.sin_addr.s_addr = inet_addr(host->h_addr_list[0]);
250    }
251    else
252       addr.sin_addr.s_addr = inet_addr(rd->host);
253 
254    rd->conn_type = RSD_CONN_TCP;
255 
256    rd->conn.socket = net_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
257    if ( rd->conn.socket < 0 )
258       goto error;
259 
260    rd->conn.ctl_socket = net_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
261    if ( rd->conn.ctl_socket < 0 )
262       goto error;
263 
264    /* Uses non-blocking IO since it performed more deterministic with poll()/send() */
265 
266 #ifdef __PS3__
267    setsockopt(rd->conn.socket, SOL_SOCKET, SO_NBIO, &i, sizeof(int));
268    setsockopt(rd->conn.ctl_socket, SOL_SOCKET, SO_NBIO, &i, sizeof(int));
269 #else
270    fcntl(rd->conn.socket, F_SETFL, O_NONBLOCK);
271    fcntl(rd->conn.ctl_socket, F_SETFL, O_NONBLOCK);
272 #endif
273 
274    /* Nonblocking connect with 3 second timeout */
275    net_connect(rd->conn.socket, (struct sockaddr*)&addr, sizeof(addr));
276 
277    pollfd_fd(fd) = rd->conn.socket;
278    fd.events = POLLOUT;
279 
280    rsnd_poll(&fd, 1, 3000);
281    if (!(fd.revents & POLLOUT))
282       goto error;
283 
284    net_connect(rd->conn.ctl_socket, (struct sockaddr*)&addr, sizeof(addr));
285 
286    pollfd_fd(fd) = rd->conn.ctl_socket;
287    rsnd_poll(&fd, 1, 3000);
288    if (!(fd.revents & POLLOUT))
289       goto error;
290 
291    return 0;
292 
293    /* Cleanup for errors. */
294 error:
295    RSD_ERR("[RSound] Connecting to server failed. \"%s\"", rd->host);
296 
297    return -1;
298 }
299 
300 /* Conjures a WAV-header and sends this to server. Returns -1 when failed, and 0 when success. */
rsnd_send_header_info(rsound_t * rd)301 static int rsnd_send_header_info(rsound_t *rd)
302 {
303 
304    /* Defines the size of a wave header */
305 #define HEADER_SIZE 44
306    char *header = calloc(1, HEADER_SIZE);
307    if (!header)
308    {
309       RSD_ERR("[RSound] Could not allocate memory.");
310       return -1;
311    }
312    uint16_t temp16;
313    uint32_t temp32;
314 
315    /* These magic numbers represent the position of the elements in the wave header.
316       We can't simply send a wave struct over the network since the compiler is allowed to
317       pad our structs as they like, so sizeof(waveheader) might not be similar on two different
318       systems. */
319 
320 #define RATE 24
321 #define CHANNEL 22
322 #define FRAMESIZE 34
323 #define FORMAT 42
324 
325    uint32_t temp_rate = rd->rate;
326    uint16_t temp_channels = rd->channels;
327 
328    uint16_t temp_bits = 8 * rsnd_format_to_samplesize(rd->format);
329    uint16_t temp_format = rd->format;
330 
331    // Checks the format for native endian which will need to be set properly.
332    switch ( temp_format )
333    {
334       case RSD_S16_NE:
335          if ( rsnd_is_little_endian() )
336             temp_format = RSD_S16_LE;
337          else
338             temp_format = RSD_S16_BE;
339          break;
340 
341       case RSD_U16_NE:
342          if ( rsnd_is_little_endian() )
343             temp_format = RSD_U16_LE;
344          else
345             temp_format = RSD_U16_BE;
346          break;
347       case RSD_S32_NE:
348          if ( rsnd_is_little_endian() )
349             temp_format = RSD_S32_LE;
350          else
351             temp_format = RSD_S32_BE;
352          break;
353       case RSD_U32_NE:
354          if ( rsnd_is_little_endian() )
355             temp_format = RSD_U32_LE;
356          else
357             temp_format = RSD_U32_BE;
358          break;
359 
360       default:
361          break;
362    }
363 
364    /* Since the values in the wave header we are interested in, are little endian (>_<), we need
365       to determine whether we're running it or not, so we can byte swap accordingly.
366       Could determine this compile time, but it was simpler to do it this way. */
367 
368    // Fancy macros for embedding little endian values into the header.
369 #define SET32(buf,offset,x) (*((uint32_t*)(buf+offset)) = x)
370 #define SET16(buf,offset,x) (*((uint16_t*)(buf+offset)) = x)
371 
372 #define LSB16(x) if ( !rsnd_is_little_endian() ) { rsnd_swap_endian_16(&(x)); }
373 #define LSB32(x) if ( !rsnd_is_little_endian() ) { rsnd_swap_endian_32(&(x)); }
374 
375    /* Here we embed in the rest of the WAV header for it to be somewhat valid */
376 
377    strlcpy(header, "RIFF", sizeof(header));
378    SET32(header, 4, 0);
379    strlcpy(header+8, "WAVE", sizeof(header));
380    strlcpy(header+12, "fmt ", sizeof(header));
381 
382    temp32 = 16;
383    LSB32(temp32);
384    SET32(header, 16, temp32);
385 
386    temp16 = 0; // PCM data
387 
388    switch( rd->format )
389    {
390       case RSD_S16_LE:
391       case RSD_U8:
392          temp16 = 1;
393          break;
394 
395       case RSD_ALAW:
396          temp16 = 6;
397          break;
398 
399       case RSD_MULAW:
400          temp16 = 7;
401          break;
402    }
403 
404    LSB16(temp16);
405    SET16(header, 20, temp16);
406 
407    // Channels here
408    LSB16(temp_channels);
409    SET16(header, CHANNEL, temp_channels);
410    // Samples per sec
411    LSB32(temp_rate);
412    SET32(header, RATE, temp_rate);
413 
414    temp32 = rd->rate * rd->channels * rsnd_format_to_samplesize(rd->format);
415    LSB32(temp32);
416    SET32(header, 28, temp32);
417 
418    temp16 = rd->channels * rsnd_format_to_samplesize(rd->format);
419    LSB16(temp16);
420    SET16(header, 32, temp16);
421 
422    // Bits per sample
423    LSB16(temp_bits);
424    SET16(header, FRAMESIZE, temp_bits);
425 
426    strlcpy(header+36, "data", sizeof(header));
427 
428    /* Do not care about cksize here (impossible to know beforehand).
429     * It is used by the server for format. */
430 
431    LSB16(temp_format);
432    SET16(header, FORMAT, temp_format);
433 
434    /* End static header */
435 
436    if ( rsnd_send_chunk(rd->conn.socket, header, HEADER_SIZE, 1) != HEADER_SIZE )
437    {
438       free(header);
439       return -1;
440    }
441 
442    free(header);
443    return 0;
444 }
445 
446 /* Recieves backend info from server that is of interest to the client. (This mini-protocol might be extended later on.) */
rsnd_get_backend_info(rsound_t * rd)447 static int rsnd_get_backend_info ( rsound_t *rd )
448 {
449 #define RSND_HEADER_SIZE 8
450 #define LATENCY 0
451 #define CHUNKSIZE 1
452 
453    // Header is 2 uint32_t's. = 8 bytes.
454    uint32_t rsnd_header[2] = {0};
455 
456    if ( rsnd_recv_chunk(rd->conn.socket, rsnd_header, RSND_HEADER_SIZE, 1) != RSND_HEADER_SIZE )
457    {
458       RSD_ERR("[RSound] Couldn't receive chunk.\n");
459       return -1;
460    }
461 
462    /* Again, we can't be 100% certain that sizeof(backend_info_t) is equal on every system */
463 
464    if ( rsnd_is_little_endian() )
465    {
466       rsnd_swap_endian_32(&rsnd_header[LATENCY]);
467       rsnd_swap_endian_32(&rsnd_header[CHUNKSIZE]);
468    }
469 
470    rd->backend_info.latency = rsnd_header[LATENCY];
471    rd->backend_info.chunk_size = rsnd_header[CHUNKSIZE];
472 
473 #define MAX_CHUNK_SIZE 1024 // We do not want larger chunk sizes than this.
474    if ( rd->backend_info.chunk_size > MAX_CHUNK_SIZE || rd->backend_info.chunk_size <= 0 )
475       rd->backend_info.chunk_size = MAX_CHUNK_SIZE;
476 
477    /* Assumes a default buffer size should it cause problems of being too small */
478    if ( rd->buffer_size == 0 || rd->buffer_size < rd->backend_info.chunk_size * 2 )
479       rd->buffer_size = rd->backend_info.chunk_size * 32;
480 
481    if ( rd->fifo_buffer != NULL )
482       fifo_free(rd->fifo_buffer);
483    rd->fifo_buffer = fifo_new (rd->buffer_size);
484    if (!rd->fifo_buffer)
485    {
486       RSD_ERR("[RSound] Failed to create FIFO buffer.\n");
487       return -1;
488    }
489 
490    // Only bother with setting network buffer size if we're doing TCP.
491    if ( rd->conn_type & RSD_CONN_TCP )
492    {
493 #define MAX_TCP_BUFSIZE (1 << 14)
494       int bufsiz = rd->buffer_size;
495       if (bufsiz > MAX_TCP_BUFSIZE)
496          bufsiz = MAX_TCP_BUFSIZE;
497 
498       setsockopt(rd->conn.socket, SOL_SOCKET, SO_SNDBUF, &bufsiz, sizeof(int));
499       bufsiz = rd->buffer_size;
500       setsockopt(rd->conn.ctl_socket, SOL_SOCKET, SO_SNDBUF, &bufsiz, sizeof(int));
501       bufsiz = rd->buffer_size;
502       setsockopt(rd->conn.ctl_socket, SOL_SOCKET, SO_RCVBUF, &bufsiz, sizeof(int));
503 
504       int flag = 1;
505       setsockopt(rd->conn.socket, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int));
506       flag = 1;
507       setsockopt(rd->conn.ctl_socket, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int));
508    }
509 
510    // Can we read the last 8 bytes so we can use the protocol interface?
511    // This is non-blocking.
512    if ( rsnd_recv_chunk(rd->conn.socket, rsnd_header, RSND_HEADER_SIZE, 0) == RSND_HEADER_SIZE )
513       rd->conn_type |= RSD_CONN_PROTO;
514    else
515    {  RSD_DEBUG("[RSound] Failed to get new proto.\n"); }
516 
517    // We no longer want to read from this socket.
518 #ifdef _WIN32
519    net_shutdown(rd->conn.socket, SD_RECEIVE);
520 #elif !defined(__APPLE__) // OSX doesn't seem to like shutdown()
521    net_shutdown(rd->conn.socket, SHUT_RD);
522 #endif
523 
524    return 0;
525 }
526 
527 /* Makes sure that we're connected and done with wave header handshaking. Returns -1 on error, and 0 on success.
528    This goes for all other functions in use. */
rsnd_create_connection(rsound_t * rd)529 static int rsnd_create_connection(rsound_t *rd)
530 {
531    int rc;
532 
533    /* Are we connected to the server? If not, these values have been set to <0, so we make sure that we connect */
534    if ( rd->conn.socket <= 0 && rd->conn.ctl_socket <= 0 )
535    {
536       rc = rsnd_connect_server(rd);
537       if (rc < 0)
538       {
539          RSD_ERR("[RSound] connect server failed.\n");
540          rsd_stop(rd);
541          return -1;
542       }
543 
544       /* After connecting, makes really sure that we have a working connection. */
545       struct pollfd fd;
546       pollfd_fd(fd) = rd->conn.socket;
547       fd.events = POLLOUT;
548 
549       if ( rsnd_poll(&fd, 1, 2000) < 0 )
550       {
551          RSD_ERR("[RSound] rsnd_poll failed.\n");
552          rsd_stop(rd);
553          return -1;
554       }
555 
556       if ( !(fd.revents & POLLOUT) )
557       {
558          RSD_ERR("[RSound] Poll didn't return what we wanted.\n");
559          rsd_stop(rd);
560          return -1;
561       }
562    }
563    /* Is the server ready for data? The first thing it expects is the wave header */
564    if ( !rd->ready_for_data )
565    {
566       /* Part of the uber simple protocol.
567          1. Send wave header.
568          2. Recieve backend info like latency and preferred packet size.
569          3. Starts the playback thread. */
570 
571       rc = rsnd_send_header_info(rd);
572       if (rc < 0)
573       {
574          RSD_ERR("[RSound] Send header failed.\n");
575          rsd_stop(rd);
576          return -1;
577       }
578 
579       rc = rsnd_get_backend_info(rd);
580       if (rc < 0)
581       {
582          RSD_ERR("[RSound] Get backend info failed.\n");
583          rsd_stop(rd);
584          return -1;
585       }
586 
587       rc = rsnd_start_thread(rd);
588       if (rc < 0)
589       {
590          RSD_ERR("[RSound] Starting thread failed.\n");
591          rsd_stop(rd);
592          return -1;
593       }
594 
595       if ( (rd->conn_type & RSD_CONN_PROTO) && strlen(rd->identity) > 0 )
596       {
597          rsnd_send_identity_info(rd);
598       }
599 
600       rd->ready_for_data = 1;
601    }
602 
603    return 0;
604 }
605 
606 /* Sends a chunk over the network. Makes sure that everything is sent if blocking. Returns -1 if connection is lost, non-negative if success.
607  * If blocking, and not enough data is recieved, it will return -1. */
rsnd_send_chunk(int socket,const void * buf,size_t size,int blocking)608 static ssize_t rsnd_send_chunk(int socket, const void* buf, size_t size, int blocking)
609 {
610    ssize_t rc = 0;
611    size_t wrote = 0;
612    ssize_t send_size = 0;
613    struct pollfd fd;
614    pollfd_fd(fd) = socket;
615    fd.events = POLLOUT;
616 
617    int sleep_time = (blocking) ? 10000 : 0;
618 
619 #define MAX_PACKET_SIZE 1024
620 
621    while ( wrote < size )
622    {
623       if ( rsnd_poll(&fd, 1, sleep_time) < 0 )
624          return -1;
625 
626       if ( fd.revents & POLLHUP )
627       {
628          RSD_WARN("*** Remote side hung up! ***");
629          return -1;
630       }
631 
632       if ( fd.revents & POLLOUT )
633       {
634          /* We try to limit ourselves to 1KiB packet sizes. */
635          send_size = (size - wrote) > MAX_PACKET_SIZE ? MAX_PACKET_SIZE : size - wrote;
636          rc = net_send(socket, (const char*)buf + wrote, send_size, 0);
637          if ( rc < 0 )
638          {
639             RSD_ERR("[RSound] Error sending chunk, %s.\n", strerror(errno));
640             return rc;
641          }
642          wrote += rc;
643       }
644       else
645       {
646          /* If server hasn't stopped blocking after 10 secs, then we should probably shut down the stream. */
647          if ( blocking )
648             return -1;
649          else
650             return wrote;
651       }
652 
653    }
654    return (ssize_t)wrote;
655 }
656 
657 /* Recieved chunk. Makes sure that everything is recieved if blocking. Returns -1 if connection is lost, non-negative if success.
658  * If blocking, and not enough data is recieved, it will return -1. */
rsnd_recv_chunk(int socket,void * buf,size_t size,int blocking)659 static ssize_t rsnd_recv_chunk(int socket, void *buf, size_t size, int blocking)
660 {
661    ssize_t rc = 0;
662    size_t has_read = 0;
663    ssize_t read_size = 0;
664    struct pollfd fd;
665    pollfd_fd(fd) = socket;
666    fd.events = POLLIN;
667 
668    int sleep_time = (blocking) ? 5000 : 0;
669 
670    while ( has_read < size )
671    {
672       if ( rsnd_poll(&fd, 1, sleep_time) < 0 )
673       {
674          RSD_ERR("[RSound] Poll failed.\n");
675          return -1;
676       }
677 
678       if ( fd.revents & POLLHUP )
679       {
680          RSD_ERR("[RSound] Server hung up.\n");
681          return -1;
682       }
683 
684       if ( fd.revents & POLLIN )
685       {
686          read_size = (size - has_read) > MAX_PACKET_SIZE ? MAX_PACKET_SIZE : size - has_read;
687          rc = net_recv(socket, (char*)buf + has_read, read_size, 0);
688          if ( rc <= 0 )
689          {
690             RSD_ERR("[RSound] Error receiving chunk, %s.\n", strerror(errno));
691             return rc;
692          }
693          has_read += rc;
694       }
695       else
696       {
697          if ( blocking )
698          {
699             RSD_ERR("[RSound] Block fail.\n");
700             return -1;
701          }
702          else
703             return has_read;
704       }
705    }
706 
707    return (ssize_t)has_read;
708 }
709 
rsnd_poll(struct pollfd * fd,int numfd,int timeout)710 static int rsnd_poll(struct pollfd *fd, int numfd, int timeout)
711 {
712    for (;;)
713    {
714       if (socketpoll(fd, numfd, timeout) < 0)
715       {
716          if (errno == EINTR)
717             continue;
718 
719          perror("poll");
720          return -1;
721       }
722       return 0;
723    }
724 }
725 
rsnd_get_time_usec(void)726 static int64_t rsnd_get_time_usec(void)
727 {
728 #if defined(_WIN32)
729    static LARGE_INTEGER freq;
730    if (!freq.QuadPart && !QueryPerformanceFrequency(&freq)) // Frequency is guaranteed to not change.
731       return 0;
732 
733    LARGE_INTEGER count;
734    if (!QueryPerformanceCounter(&count))
735       return 0;
736    return count.QuadPart * 1000000 / freq.QuadPart;
737 #elif defined(__PS3__)
738    return sysGetSystemTime();
739 #elif defined(GEKKO)
740    return ticks_to_microsecs(gettime());
741 #elif defined(__MACH__) // OSX doesn't have clock_gettime ...
742    clock_serv_t cclock;
743    mach_timespec_t mts;
744    host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
745    clock_get_time(cclock, &mts);
746    mach_port_deallocate(mach_task_self(), cclock);
747    return mts.tv_sec * INT64_C(1000000) + (mts.tv_nsec + 500) / 1000;
748 #elif defined(_POSIX_MONOTONIC_CLOCK) || defined(__QNX__) || defined(ANDROID)
749    struct timespec tv;
750    if (clock_gettime(CLOCK_MONOTONIC, &tv) < 0)
751       return 0;
752    return tv.tv_sec * INT64_C(1000000) + (tv.tv_nsec + 500) / 1000;
753 #elif defined(EMSCRIPTEN)
754    return emscripten_get_now() * 1000;
755 #else
756 #error "Your platform does not have a timer function implemented in rsnd_get_time_usec(). Cannot continue."
757 #endif
758 }
759 
760 /* Calculates how many bytes there are in total in the virtual buffer. This is calculated client side.
761    It should be accurate enough unless we have big problems with buffer underruns.
762    This function is called by rsd_delay() to determine the latency.
763    This function might be changed in the future to correctly determine latency from server. */
rsnd_drain(rsound_t * rd)764 static void rsnd_drain(rsound_t *rd)
765 {
766    /* If the audio playback has started on the server we need to use timers. */
767    if ( rd->has_written )
768    {
769       /* Calculates the amount of bytes that the server has consumed. */
770       int64_t time = rsnd_get_time_usec();
771 
772       int64_t delta = time - rd->start_time;
773       delta *= rd->rate * rd->channels * rd->samplesize;
774       delta /= 1000000;
775       /* Calculates the amount of data we have in our virtual buffer. Only used to calculate delay. */
776       slock_lock(rd->thread.mutex);
777       rd->bytes_in_buffer = (int)((int64_t)rd->total_written + (int64_t)FIFO_READ_AVAIL(rd->fifo_buffer) - delta);
778       slock_unlock(rd->thread.mutex);
779    }
780    else
781    {
782       slock_lock(rd->thread.mutex);
783       rd->bytes_in_buffer = FIFO_READ_AVAIL(rd->fifo_buffer);
784       slock_unlock(rd->thread.mutex);
785    }
786 }
787 
788 /* Tries to fill the buffer. Uses signals to determine when the buffer is ready to be filled. Should the thread not be active
789    it will treat this as an error. Crude implementation of a blocking FIFO. */
rsnd_fill_buffer(rsound_t * rd,const char * buf,size_t size)790 static size_t rsnd_fill_buffer(rsound_t *rd, const char *buf, size_t size)
791 {
792 
793    /* Wait until we have a ready buffer */
794    for (;;)
795    {
796       /* Should the thread be shut down while we're running, return with error */
797       if ( !rd->thread_active )
798          return 0;
799 
800       slock_lock(rd->thread.mutex);
801       if (FIFO_WRITE_AVAIL(rd->fifo_buffer) >= size)
802       {
803          slock_unlock(rd->thread.mutex);
804          break;
805       }
806       slock_unlock(rd->thread.mutex);
807 
808       /* Sleeps until we can write to the FIFO. */
809       slock_lock(rd->thread.cond_mutex);
810       scond_signal(rd->thread.cond);
811 
812       RSD_DEBUG("[RSound] rsnd_fill_buffer: Going to sleep.\n");
813       scond_wait(rd->thread.cond, rd->thread.cond_mutex);
814       RSD_DEBUG("[RSound] rsnd_fill_buffer: Woke up.\n");
815       slock_unlock(rd->thread.cond_mutex);
816    }
817 
818    slock_lock(rd->thread.mutex);
819    fifo_write(rd->fifo_buffer, buf, size);
820    slock_unlock(rd->thread.mutex);
821    //RSD_DEBUG("[RSound] fill_buffer: Wrote to buffer.\n");
822 
823    /* Send signal to thread that buffer has been updated */
824    //RSD_DEBUG("[RSound] fill_buffer: Waking up thread.\n");
825    scond_signal(rd->thread.cond);
826 
827    return size;
828 }
829 
rsnd_start_thread(rsound_t * rd)830 static int rsnd_start_thread(rsound_t *rd)
831 {
832    if ( !rd->thread_active )
833    {
834       rd->thread_active = 1;
835       rd->thread.thread = (sthread_t*)sthread_create(rd->audio_callback ? rsnd_cb_thread : rsnd_thread, rd);
836       if ( !rd->thread.thread )
837       {
838          rd->thread_active = 0;
839          RSD_ERR("[RSound] Failed to create thread.");
840          return -1;
841       }
842       return 0;
843    }
844    else
845       return 0;
846 }
847 
848 /* Makes sure that the playback thread has been correctly shut down */
rsnd_stop_thread(rsound_t * rd)849 static int rsnd_stop_thread(rsound_t *rd)
850 {
851    if ( rd->thread_active )
852    {
853 
854       RSD_DEBUG("[RSound] Shutting down thread.\n");
855 
856       slock_lock(rd->thread.cond_mutex);
857       rd->thread_active = 0;
858       scond_signal(rd->thread.cond);
859       slock_unlock(rd->thread.cond_mutex);
860 
861       sthread_join(rd->thread.thread);
862       RSD_DEBUG("[RSound] Thread joined successfully.\n");
863 
864       return 0;
865    }
866    else
867    {
868       RSD_DEBUG("Thread is already shut down.\n");
869       return 0;
870    }
871 }
872 
873 /* Calculates audio delay in bytes */
rsnd_get_delay(rsound_t * rd)874 static size_t rsnd_get_delay(rsound_t *rd)
875 {
876    int ptr;
877    rsnd_drain(rd);
878    ptr = rd->bytes_in_buffer;
879 
880    /* Adds the backend latency to the calculated latency. */
881    ptr += (int)rd->backend_info.latency;
882 
883    slock_lock(rd->thread.mutex);
884    ptr += rd->delay_offset;
885    RSD_DEBUG("Offset: %d.\n", rd->delay_offset);
886    slock_unlock(rd->thread.mutex);
887 
888    if ( ptr < 0 )
889       ptr = 0;
890 
891    return (size_t)ptr;
892 }
893 
rsnd_get_ptr(rsound_t * rd)894 static size_t rsnd_get_ptr(rsound_t *rd)
895 {
896    int ptr;
897    slock_lock(rd->thread.mutex);
898    ptr = FIFO_READ_AVAIL(rd->fifo_buffer);
899    slock_unlock(rd->thread.mutex);
900 
901    return ptr;
902 }
903 
rsnd_send_identity_info(rsound_t * rd)904 static int rsnd_send_identity_info(rsound_t *rd)
905 {
906 #define RSD_PROTO_MAXSIZE 256
907 #define RSD_PROTO_CHUNKSIZE 8
908 
909    char tmpbuf[RSD_PROTO_MAXSIZE];
910    char sendbuf[RSD_PROTO_MAXSIZE];
911 
912    snprintf(tmpbuf, RSD_PROTO_MAXSIZE - 1, " IDENTITY %s", rd->identity);
913    tmpbuf[RSD_PROTO_MAXSIZE - 1] = '\0';
914    snprintf(sendbuf, RSD_PROTO_MAXSIZE - 1, "RSD%5d%s", (int)strlen(tmpbuf), tmpbuf);
915    sendbuf[RSD_PROTO_MAXSIZE - 1] = '\0';
916 
917    if ( rsnd_send_chunk(rd->conn.ctl_socket, sendbuf, strlen(sendbuf), 0) != (ssize_t)strlen(sendbuf) )
918       return -1;
919 
920    return 0;
921 }
922 
rsnd_close_ctl(rsound_t * rd)923 static int rsnd_close_ctl(rsound_t *rd)
924 {
925    if ( !(rd->conn_type & RSD_CONN_PROTO) )
926       return -1;
927 
928    struct pollfd fd;
929    pollfd_fd(fd) = rd->conn.ctl_socket;
930    fd.events = POLLOUT;
931 
932    if ( rsnd_poll(&fd, 1, 0) < 0 )
933       return -1;
934 
935    if ( fd.revents & POLLOUT )
936    {
937       const char *sendbuf = "RSD    9 CLOSECTL";
938       if (net_send(rd->conn.ctl_socket, sendbuf, strlen(sendbuf), 0) < 0 )
939          return -1;
940    }
941    else if ( fd.revents & POLLHUP )
942       return 0;
943 
944    // Let's wait for reply (or POLLHUP)
945 
946    fd.events = POLLIN;
947    int index = 0;
948    char buf[RSD_PROTO_MAXSIZE*2] = {0};
949 
950    for (;;)
951    {
952       if (rsnd_poll(&fd, 1, 2000) < 0)
953          return -1;
954 
955       if (fd.revents & POLLHUP)
956          break;
957 
958       if (fd.revents & POLLIN)
959       {
960          const char *subchar;
961 
962          // We just read everything in large chunks until we find what we're looking for
963          int rc = net_recv(rd->conn.ctl_socket, buf + index, RSD_PROTO_MAXSIZE*2 - 1 - index, 0);
964 
965          if (rc  <= 0 )
966             return -1;
967 
968          // Can we find it directly?
969          if ( strstr(buf, "RSD   12 CLOSECTL OK") != NULL )
970             break;
971          else if ( strstr(buf, "RSD   15 CLOSECTL ERROR") != NULL )
972             return -1;
973 
974          subchar = strrchr(buf, 'R');
975          if (!subchar)
976             index = 0;
977          else
978          {
979             memmove(buf, subchar, strlen(subchar) + 1);
980             index = strlen(buf);
981          }
982 
983       }
984       else
985          return -1;
986    }
987 
988    net_socketclose(rd->conn.ctl_socket);
989    return 0;
990 }
991 
992 // Sends delay info request to server on the ctl socket. This code section isn't critical, and will work if it works.
993 // It will never block.
rsnd_send_info_query(rsound_t * rd)994 static int rsnd_send_info_query(rsound_t *rd)
995 {
996    char tmpbuf[RSD_PROTO_MAXSIZE];
997    char sendbuf[RSD_PROTO_MAXSIZE];
998 
999    snprintf(tmpbuf, RSD_PROTO_MAXSIZE - 1, " INFO %lld", (long long int)rd->total_written);
1000    tmpbuf[RSD_PROTO_MAXSIZE - 1] = '\0';
1001    snprintf(sendbuf, RSD_PROTO_MAXSIZE - 1, "RSD%5d%s", (int)strlen(tmpbuf), tmpbuf);
1002    sendbuf[RSD_PROTO_MAXSIZE - 1] = '\0';
1003 
1004    if ( rsnd_send_chunk(rd->conn.ctl_socket, sendbuf, strlen(sendbuf), 0) != (ssize_t)strlen(sendbuf) )
1005       return -1;
1006 
1007    return 0;
1008 }
1009 
1010 // We check if there's any pending delay information from the server.
1011 // In that case, we read the packet.
rsnd_update_server_info(rsound_t * rd)1012 static int rsnd_update_server_info(rsound_t *rd)
1013 {
1014    long long int client_ptr = -1;
1015    long long int serv_ptr = -1;
1016    char temp[RSD_PROTO_MAXSIZE + 1] = {0};
1017 
1018    // We read until we have the last (most recent) data in the network buffer.
1019    for (;;)
1020    {
1021       ssize_t rc;
1022       const char *substr;
1023       char *tmpstr;
1024       memset(temp, 0, sizeof(temp));
1025 
1026       // We first recieve the small header. We just use the larger buffer as it is disposable.
1027       rc = rsnd_recv_chunk(rd->conn.ctl_socket, temp, RSD_PROTO_CHUNKSIZE, 0);
1028       if ( rc == 0 )
1029          break;
1030       else if ( rc < RSD_PROTO_CHUNKSIZE )
1031          return -1;
1032 
1033       temp[RSD_PROTO_CHUNKSIZE] = '\0';
1034 
1035       if (!(substr = strstr(temp, "RSD")))
1036          return -1;
1037 
1038       // Jump over "RSD" in header
1039       substr += 3;
1040 
1041       // The length of the argument message is stored in the small 8 byte header.
1042       long int len = strtol(substr, NULL, 0);
1043 
1044       // Recieve the rest of the data.
1045       if ( rsnd_recv_chunk(rd->conn.ctl_socket, temp, len, 0) < len )
1046          return -1;
1047 
1048       // We only bother if this is an INFO message.
1049       substr = strstr(temp, "INFO");
1050       if (!substr)
1051          continue;
1052 
1053       // Jump over "INFO" in header
1054       substr += 4;
1055 
1056       client_ptr = strtoull(substr, &tmpstr, 0);
1057       if ( client_ptr == 0 || *tmpstr == '\0' )
1058          return -1;
1059 
1060       substr = tmpstr;
1061       serv_ptr = strtoull(substr, NULL, 0);
1062       if ( serv_ptr <= 0 )
1063          return -1;
1064    }
1065 
1066    if ( client_ptr > 0 && serv_ptr > 0 )
1067    {
1068 
1069       int delay = rsd_delay(rd);
1070       int delta = (int)(client_ptr - serv_ptr);
1071       slock_lock(rd->thread.mutex);
1072       delta += FIFO_READ_AVAIL(rd->fifo_buffer);
1073       slock_unlock(rd->thread.mutex);
1074 
1075       RSD_DEBUG("[RSound] Delay: %d, Delta: %d.\n", delay, delta);
1076 
1077       // We only update the pointer if the data we got is quite recent.
1078       if ( rd->total_written - client_ptr < 4 * rd->backend_info.chunk_size && rd->total_written > client_ptr )
1079       {
1080          int offset_delta = delta - delay;
1081          int max_offset = rd->backend_info.chunk_size;
1082          if ( offset_delta < -max_offset )
1083             offset_delta = -max_offset;
1084          else if ( offset_delta > max_offset )
1085             offset_delta = max_offset;
1086 
1087          slock_lock(rd->thread.mutex);
1088          rd->delay_offset += offset_delta;
1089          slock_unlock(rd->thread.mutex);
1090          RSD_DEBUG("[RSound] Changed offset-delta: %d.\n", offset_delta);
1091       }
1092    }
1093 
1094    return 0;
1095 }
1096 
1097 // Sort of simulates the behavior of pthread_cancel()
1098 #define _TEST_CANCEL() \
1099    if ( !rd->thread_active ) \
1100       break
1101 
1102 /* The blocking thread */
rsnd_thread(void * thread_data)1103 static void rsnd_thread ( void * thread_data )
1104 {
1105    /* We share data between thread and callable functions */
1106    rsound_t *rd = thread_data;
1107    int rc;
1108    char buffer[rd->backend_info.chunk_size];
1109 
1110    /* Plays back data as long as there is data in the buffer. Else, sleep until it can. */
1111    /* Two (;;) for loops! :3 Beware! */
1112    for (;;)
1113    {
1114       for (;;)
1115       {
1116          _TEST_CANCEL();
1117 
1118          // We ask the server to send its latest backend data. Do not really care about errors atm.
1119          // We only bother to check after 1 sec of audio has been played, as it might be quite inaccurate in the start of the stream.
1120          if ( (rd->conn_type & RSD_CONN_PROTO) && (rd->total_written > rd->channels * rd->rate * rd->samplesize) )
1121          {
1122             rsnd_send_info_query(rd);
1123             rsnd_update_server_info(rd);
1124          }
1125 
1126          /* If the buffer is empty or we've stopped the stream, jump out of this for loop */
1127          slock_lock(rd->thread.mutex);
1128          if (FIFO_READ_AVAIL(rd->fifo_buffer) < rd->backend_info.chunk_size || !rd->thread_active)
1129          {
1130             slock_unlock(rd->thread.mutex);
1131             break;
1132          }
1133          slock_unlock(rd->thread.mutex);
1134 
1135          _TEST_CANCEL();
1136          slock_lock(rd->thread.mutex);
1137          fifo_read(rd->fifo_buffer, buffer, sizeof(buffer));
1138          slock_unlock(rd->thread.mutex);
1139          rc = rsnd_send_chunk(rd->conn.socket, buffer, sizeof(buffer), 1);
1140 
1141          /* If this happens, we should make sure that subsequent and current calls to rsd_write() will fail. */
1142          if ( rc != (int)rd->backend_info.chunk_size )
1143          {
1144             _TEST_CANCEL();
1145             rsnd_reset(rd);
1146 
1147             /* Wakes up a potentially sleeping fill_buffer() */
1148             scond_signal(rd->thread.cond);
1149 
1150             /* This thread will not be joined, so detach. */
1151             sthread_detach(rd->thread.thread);
1152             return;
1153          }
1154 
1155          /* If this was the first write, set the start point for the timer. */
1156          if ( !rd->has_written )
1157          {
1158             slock_lock(rd->thread.mutex);
1159             rd->start_time = rsnd_get_time_usec();
1160             rd->has_written = 1;
1161             slock_unlock(rd->thread.mutex);
1162          }
1163 
1164          /* Increase the total_written counter. Used in rsnd_drain() */
1165          slock_lock(rd->thread.mutex);
1166          rd->total_written += rc;
1167          slock_unlock(rd->thread.mutex);
1168 
1169          /* Buffer has decreased, signal fill_buffer() */
1170          scond_signal(rd->thread.cond);
1171 
1172       }
1173 
1174       /* If we're still good to go, sleep. We are waiting for fill_buffer() to fill up some data. */
1175 
1176       if ( rd->thread_active )
1177       {
1178          // There is a very slim change of getting a deadlock using the cond_wait scheme.
1179          // This solution is rather dirty, but avoids complete deadlocks at the very least.
1180 
1181          slock_lock(rd->thread.cond_mutex);
1182          scond_signal(rd->thread.cond);
1183 
1184          if ( rd->thread_active )
1185          {
1186             RSD_DEBUG("[RSound] Thread going to sleep.\n");
1187             scond_wait(rd->thread.cond, rd->thread.cond_mutex);
1188             RSD_DEBUG("[RSound] Thread woke up.\n");
1189          }
1190 
1191          slock_unlock(rd->thread.cond_mutex);
1192          RSD_DEBUG("[RSound] Thread unlocked cond_mutex.\n");
1193       }
1194       /* Abort request, chap. */
1195       else
1196       {
1197          scond_signal(rd->thread.cond);
1198          return;
1199       }
1200 
1201    }
1202 }
1203 
1204 /* Callback thread */
rsnd_cb_thread(void * thread_data)1205 static void rsnd_cb_thread(void *thread_data)
1206 {
1207    rsound_t *rd = thread_data;
1208    size_t read_size = rd->backend_info.chunk_size;
1209    if (rd->cb_max_size != 0 && rd->cb_max_size < read_size)
1210       read_size = rd->cb_max_size;
1211 
1212    uint8_t buffer[rd->backend_info.chunk_size];
1213 
1214    while (rd->thread_active)
1215    {
1216       size_t has_read = 0;
1217 
1218       while (has_read < rd->backend_info.chunk_size)
1219       {
1220          size_t will_read = read_size < rd->backend_info.chunk_size - has_read ? read_size : rd->backend_info.chunk_size - has_read;
1221 
1222          rsd_callback_lock(rd);
1223          ssize_t ret = rd->audio_callback(buffer + has_read, will_read, rd->cb_data);
1224          rsd_callback_unlock(rd);
1225 
1226          if (ret < 0)
1227          {
1228             rsnd_reset(rd);
1229             sthread_detach(rd->thread.thread);
1230             rd->error_callback(rd->cb_data);
1231             return;
1232          }
1233 
1234          has_read += ret;
1235 
1236          if (ret < (ssize_t)will_read)
1237          {
1238             if ((int)rsd_delay_ms(rd) < rd->max_latency / 2)
1239             {
1240                RSD_DEBUG("[RSound] Callback thread: Requested %d bytes, got %d.\n", (int)will_read, (int)ret);
1241                memset(buffer + has_read, 0, will_read - ret);
1242                has_read += will_read - ret;
1243             }
1244             else
1245             {
1246                // The network might do things in large chunks, so it may request large amounts of data in short periods of time.
1247                // This breaks when the caller cannot buffer up big buffers beforehand, so do short sleeps inbetween.
1248                // This is somewhat dirty, but I cannot see a better solution
1249                retro_sleep(1);
1250             }
1251          }
1252       }
1253 
1254       ssize_t ret = rsnd_send_chunk(rd->conn.socket, buffer, rd->backend_info.chunk_size, 1);
1255       if (ret != (ssize_t)rd->backend_info.chunk_size)
1256       {
1257          rsnd_reset(rd);
1258          sthread_detach(rd->thread.thread);
1259          rd->error_callback(rd->cb_data);
1260          return;
1261       }
1262 
1263       /* If this was the first write, set the start point for the timer. */
1264       if (!rd->has_written)
1265       {
1266          rd->start_time = rsnd_get_time_usec();
1267          rd->has_written = 1;
1268       }
1269 
1270       rd->total_written += rd->backend_info.chunk_size;
1271 
1272       if ( (rd->conn_type & RSD_CONN_PROTO) && (rd->total_written > rd->channels * rd->rate * rd->samplesize) )
1273       {
1274          rsnd_send_info_query(rd);
1275          rsnd_update_server_info(rd);
1276       }
1277 
1278       if (rd->has_written)
1279          rsd_delay_wait(rd);
1280    }
1281 }
1282 
rsnd_reset(rsound_t * rd)1283 static int rsnd_reset(rsound_t *rd)
1284 {
1285    if ( rd->conn.socket != -1 )
1286       net_socketclose(rd->conn.socket);
1287 
1288    if ( rd->conn.socket != 1 )
1289       net_socketclose(rd->conn.ctl_socket);
1290 
1291    /* Pristine stuff, baby! */
1292    slock_lock(rd->thread.mutex);
1293    rd->conn.socket = -1;
1294    rd->conn.ctl_socket = -1;
1295    rd->total_written = 0;
1296    rd->ready_for_data = 0;
1297    rd->has_written = 0;
1298    rd->bytes_in_buffer = 0;
1299    rd->thread_active = 0;
1300    rd->delay_offset = 0;
1301    slock_unlock(rd->thread.mutex);
1302    scond_signal(rd->thread.cond);
1303 
1304    return 0;
1305 }
1306 
rsd_stop(rsound_t * rd)1307 int rsd_stop(rsound_t *rd)
1308 {
1309    retro_assert(rd != NULL);
1310    rsnd_stop_thread(rd);
1311 
1312    const char buf[] = "RSD    5 STOP";
1313 
1314    // Do not really care about errors here.
1315    // The socket will be closed down in any case in rsnd_reset().
1316    rsnd_send_chunk(rd->conn.ctl_socket, buf, strlen(buf), 0);
1317 
1318    rsnd_reset(rd);
1319    return 0;
1320 }
1321 
rsd_write(rsound_t * rsound,const void * buf,size_t size)1322 size_t rsd_write( rsound_t *rsound, const void* buf, size_t size)
1323 {
1324    size_t max_write, written = 0;
1325    retro_assert(rsound != NULL);
1326    if ( !rsound->ready_for_data )
1327       return 0;
1328 
1329    max_write = (rsound->buffer_size - rsound->backend_info.chunk_size)/2;
1330 
1331    /* Makes sure that we can handle arbitrary large write sizes */
1332 
1333    while ( written < size )
1334    {
1335       size_t write_size = (size - written) > max_write ? max_write : (size - written);
1336       size_t     result = rsnd_fill_buffer(rsound, (const char*)buf + written, write_size);
1337 
1338       if (result == 0)
1339       {
1340          rsd_stop(rsound);
1341          return 0;
1342       }
1343       written += result;
1344    }
1345    return written;
1346 }
1347 
rsd_start(rsound_t * rsound)1348 int rsd_start(rsound_t *rsound)
1349 {
1350    retro_assert(rsound != NULL);
1351    retro_assert(rsound->rate > 0);
1352    retro_assert(rsound->channels > 0);
1353    retro_assert(rsound->host != NULL);
1354    retro_assert(rsound->port != NULL);
1355 
1356    if ( rsnd_create_connection(rsound) < 0 )
1357       return -1;
1358 
1359    return 0;
1360 }
1361 
rsd_exec(rsound_t * rsound)1362 int rsd_exec(rsound_t *rsound)
1363 {
1364    retro_assert(rsound != NULL);
1365    RSD_DEBUG("[RSound] rsd_exec().\n");
1366 
1367    // Makes sure we have a working connection
1368    if ( rsound->conn.socket < 0 )
1369    {
1370       RSD_DEBUG("[RSound] Calling rsd_start().\n");
1371       if ( rsd_start(rsound) < 0 )
1372       {
1373          RSD_ERR("[RSound] rsd_start() failed.\n");
1374          return -1;
1375       }
1376    }
1377 
1378    RSD_DEBUG("[RSound] Closing ctl.\n");
1379    if ( rsnd_close_ctl(rsound) < 0 )
1380       return -1;
1381 
1382    int fd = rsound->conn.socket;
1383    RSD_DEBUG("[RSound] Socket: %d.\n", fd);
1384 
1385    rsnd_stop_thread(rsound);
1386 
1387 #ifdef __PS3__
1388    int i = 0;
1389    setsockopt(rsound->conn.socket, SOL_SOCKET, SO_NBIO, &i, sizeof(int));
1390 #else
1391    fcntl(rsound->conn.socket, F_SETFL, O_NONBLOCK);
1392 #endif
1393 
1394    /* Flush the buffer */
1395    if (FIFO_READ_AVAIL(rsound->fifo_buffer) > 0 )
1396    {
1397       char buffer[FIFO_READ_AVAIL(rsound->fifo_buffer)];
1398       fifo_read(rsound->fifo_buffer, buffer, sizeof(buffer));
1399       if ( rsnd_send_chunk(fd, buffer, sizeof(buffer), 1) != (ssize_t)sizeof(buffer) )
1400       {
1401          RSD_DEBUG("[RSound] Failed flushing buffer.\n");
1402          net_socketclose(fd);
1403          return -1;
1404       }
1405    }
1406 
1407    RSD_DEBUG("[RSound] Returning from rsd_exec().\n");
1408    rsd_free(rsound);
1409    return fd;
1410 }
1411 
1412 /* ioctl()-ish param setting :D */
rsd_set_param(rsound_t * rd,enum rsd_settings option,void * param)1413 int rsd_set_param(rsound_t *rd, enum rsd_settings option, void* param)
1414 {
1415 	retro_assert(rd != NULL);
1416 	retro_assert(param != NULL);
1417 	int retval = 0;
1418 
1419 	switch(option)
1420 	{
1421 		case RSD_SAMPLERATE:
1422 			if ( *(int*)param > 0 )
1423 			{
1424 				rd->rate = *((int*)param);
1425 				break;
1426 			}
1427 			else
1428 				retval = -1;
1429 			break;
1430 		case RSD_CHANNELS:
1431 			if ( *(int*)param > 0 )
1432 			{
1433 				rd->channels = *((int*)param);
1434 				break;
1435 			}
1436 			else
1437 				retval = -1;
1438 			break;
1439 		case RSD_HOST:
1440 			if ( rd->host != NULL )
1441 				free(rd->host);
1442 			rd->host = strdup((char*)param);
1443 			break;
1444 		case RSD_PORT:
1445 			if ( rd->port != NULL )
1446 				free(rd->port);
1447 			rd->port = strdup((char*)param);
1448 			break;
1449 		case RSD_BUFSIZE:
1450 			if ( *(int*)param > 0 )
1451 			{
1452 				rd->buffer_size = *((int*)param);
1453 				break;
1454 			}
1455 			else
1456 				retval = -1;
1457 			break;
1458 		case RSD_LATENCY:
1459 			rd->max_latency = *((int*)param);
1460 			break;
1461 
1462 			// Checks if format is valid.
1463 		case RSD_FORMAT:
1464 			rd->format = (uint16_t)(*((int*)param));
1465 			rd->samplesize = rsnd_format_to_samplesize(rd->format);
1466 
1467 			if ( rd->samplesize == -1 )
1468 			{
1469 				rd->format = RSD_S16_LE;
1470 				rd->samplesize = rsnd_format_to_samplesize(RSD_S16_LE);
1471 				*((int*)param) = (int)RSD_S16_LE;
1472 			}
1473 			break;
1474 
1475 		case RSD_IDENTITY:
1476 			strlcpy(rd->identity, param, sizeof(rd->identity));
1477 			rd->identity[sizeof(rd->identity)-1] = '\0';
1478 			break;
1479 
1480 		default:
1481 			retval = -1;
1482 	}
1483 
1484 	return retval;
1485 }
1486 
rsd_delay_wait(rsound_t * rd)1487 void rsd_delay_wait(rsound_t *rd)
1488 {
1489 
1490    /* When called, we make sure that the latency never goes over the time designated in RSD_LATENCY.
1491       Useful for certain blocking I/O designs where the latency still needs to be quite low.
1492       Without this, the latency of the stream will depend on how big the network buffers are.
1493       ( We simulate that we're a low latency sound card ) */
1494 
1495    /* Should we bother with checking latency at all? */
1496    if ( rd->max_latency > 0 )
1497    {
1498       /* Latency of stream in ms */
1499       int latency_ms = rsd_delay_ms(rd);
1500 
1501       /* Should we sleep for a while to keep the latency low? */
1502       if ( rd->max_latency < latency_ms )
1503       {
1504          int64_t sleep_ms = latency_ms - rd->max_latency;
1505          RSD_DEBUG("[RSound] Delay wait: %d ms.\n", (int)sleep_ms);
1506          retro_sleep((int)sleep_ms);
1507       }
1508    }
1509 }
1510 
rsd_pointer(rsound_t * rsound)1511 size_t rsd_pointer(rsound_t *rsound)
1512 {
1513    retro_assert(rsound != NULL);
1514    int ptr;
1515 
1516    ptr = rsnd_get_ptr(rsound);
1517 
1518    return ptr;
1519 }
1520 
rsd_get_avail(rsound_t * rd)1521 size_t rsd_get_avail(rsound_t *rd)
1522 {
1523    retro_assert(rd != NULL);
1524    int ptr;
1525    ptr = rsnd_get_ptr(rd);
1526    return rd->buffer_size - ptr;
1527 }
1528 
rsd_delay(rsound_t * rd)1529 size_t rsd_delay(rsound_t *rd)
1530 {
1531    retro_assert(rd != NULL);
1532    int ptr = rsnd_get_delay(rd);
1533    if ( ptr < 0 )
1534       ptr = 0;
1535 
1536    return ptr;
1537 }
1538 
rsd_delay_ms(rsound_t * rd)1539 size_t rsd_delay_ms(rsound_t* rd)
1540 {
1541    retro_assert(rd);
1542    retro_assert(rd->rate > 0 && rd->channels > 0);
1543 
1544    return (rsd_delay(rd) * 1000) / ( rd->rate * rd->channels * rd->samplesize );
1545 }
1546 
rsd_pause(rsound_t * rsound,int enable)1547 int rsd_pause(rsound_t* rsound, int enable)
1548 {
1549    retro_assert(rsound != NULL);
1550    if ( enable )
1551       return rsd_stop(rsound);
1552 
1553    return rsd_start(rsound);
1554 }
1555 
rsd_init(rsound_t ** rsound)1556 int rsd_init(rsound_t** rsound)
1557 {
1558    *rsound = calloc(1, sizeof(rsound_t));
1559    if (*rsound == NULL)
1560       return -1;
1561 
1562    retro_assert(rsound != NULL);
1563 
1564    (*rsound)->conn.socket = -1;
1565    (*rsound)->conn.ctl_socket = -1;
1566 
1567    (*rsound)->thread.mutex = slock_new();
1568    (*rsound)->thread.cond_mutex = slock_new();
1569    (*rsound)->cb_lock = slock_new();
1570    (*rsound)->thread.cond = scond_new();
1571 
1572    // Assumes default of S16_LE samples.
1573    int format = RSD_S16_LE;
1574    rsd_set_param(*rsound, RSD_FORMAT, &format);
1575 
1576    rsd_set_param(*rsound, RSD_HOST, RSD_DEFAULT_HOST);
1577    rsd_set_param(*rsound, RSD_PORT, RSD_DEFAULT_PORT);
1578 
1579 #ifdef __PS3__
1580    if (init_count == 0)
1581    {
1582       sysModuleLoad(SYSMODULE_NET);
1583       netInitialize();
1584       init_count++;
1585    }
1586 #endif
1587 
1588    return 0;
1589 }
1590 
rsd_simple_start(rsound_t ** rsound,const char * host,const char * port,const char * ident,int rate,int channels,enum rsd_format format)1591 int rsd_simple_start(rsound_t** rsound, const char* host, const char* port, const char* ident,
1592                      int rate, int channels, enum rsd_format format)
1593 {
1594    if ( rsd_init(rsound) < 0 )
1595       return -1;
1596 
1597    int fmt = format;
1598 
1599    if ( host != NULL )
1600       rsd_set_param(*rsound, RSD_HOST, (void*)host);
1601    if ( port != NULL )
1602       rsd_set_param(*rsound, RSD_PORT, (void*)port);
1603    if ( ident != NULL )
1604       rsd_set_param(*rsound, RSD_IDENTITY, (void*)ident);
1605 
1606    if (  rsd_set_param(*rsound, RSD_SAMPLERATE, &rate) < 0 ||
1607          rsd_set_param(*rsound, RSD_CHANNELS, &channels) < 0 ||
1608          rsd_set_param(*rsound, RSD_FORMAT, &fmt) < 0 )
1609    {
1610       rsd_free(*rsound);
1611       return -1;
1612    }
1613 
1614    if ( rsd_start(*rsound) < 0 )
1615    {
1616       rsd_free(*rsound);
1617       return -1;
1618    }
1619 
1620    return 0;
1621 }
1622 
rsd_set_callback(rsound_t * rsound,rsd_audio_callback_t audio_cb,rsd_error_callback_t err_cb,size_t max_size,void * userdata)1623 void rsd_set_callback(rsound_t *rsound, rsd_audio_callback_t audio_cb, rsd_error_callback_t err_cb, size_t max_size, void *userdata)
1624 {
1625    retro_assert(rsound != NULL);
1626 
1627    rsound->audio_callback = audio_cb;
1628    rsound->error_callback = err_cb;
1629    rsound->cb_max_size = max_size;
1630    rsound->cb_data = userdata;
1631 
1632    if (rsound->audio_callback)
1633    {
1634       retro_assert(rsound->error_callback);
1635    }
1636 }
1637 
rsd_callback_lock(rsound_t * rsound)1638 void rsd_callback_lock(rsound_t *rsound)
1639 {
1640    slock_lock(rsound->cb_lock);
1641 }
1642 
rsd_callback_unlock(rsound_t * rsound)1643 void rsd_callback_unlock(rsound_t *rsound)
1644 {
1645    slock_unlock(rsound->cb_lock);
1646 }
1647 
rsd_free(rsound_t * rsound)1648 int rsd_free(rsound_t *rsound)
1649 {
1650    retro_assert(rsound != NULL);
1651    if (rsound->fifo_buffer)
1652       fifo_free(rsound->fifo_buffer);
1653    if (rsound->host)
1654       free(rsound->host);
1655    if (rsound->port)
1656       free(rsound->port);
1657 
1658    slock_free(rsound->thread.mutex);
1659    slock_free(rsound->thread.cond_mutex);
1660    slock_free(rsound->cb_lock);
1661    scond_free(rsound->thread.cond);
1662 
1663    free(rsound);
1664 
1665    return 0;
1666 }
1667