1 // ------------------------------------------------------------------------
2 // eca-neteci-server.c: NetECI server implementation.
3 // Copyright (C) 2002,2004,2009 Kai Vehmanen
4 //
5 // Attributes:
6 //     eca-style-version: 3
7 //
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12 //
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with this program; if not, write to the Free Software
20 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
21 // ------------------------------------------------------------------------
22 
23 #include <cassert>
24 #include <cstring>        /* memcpy() */
25 #include <iostream>
26 #include <string>
27 
28 #include <fcntl.h>        /* POSIX: fcntl() */
29 #include <pthread.h>      /* POSIX: pthread_* */
30 #include <unistd.h>       /* POSIX: fcntl() */
31 #include <arpa/inet.h>    /* BSD: inet_ntoa() */
32 #include <netinet/in.h>   /* BSD: inet_ntoa() */
33 #include <sys/poll.h>     /* POSIX: poll() */
34 #include <sys/socket.h>   /* BSD: getpeername() */
35 #include <sys/types.h>    /* OSX: u_int32_t (INADDR_ANY) */
36 
37 #include <kvu_dbc.h>
38 #include <kvu_fd_io.h>
39 #include <kvu_numtostr.h>
40 #include <kvu_utils.h>
41 
42 #include <eca-control-mt.h>
43 #include <eca-logger.h>
44 #include <eca-logger-wellformed.h>
45 
46 #include "ecasound.h"
47 #include "eca-neteci-server.h"
48 
49 /**
50  * Options
51  */
52 // #define NETECI_DEBUG_ENABLED
53 
54 #define ECA_NETECI_START_BUFFER_SIZE    128
55 #define ECA_NETECI_MAX_BUFFER_SIZE      65536
56 
57 /**
58  * Macro definitions
59  */
60 
61 #ifdef NETECI_DEBUG_ENABLED
62 #define NETECI_DEBUG(x) x
63 #else
64 #define NETECI_DEBUG(x) ((void) 0)
65 #endif
66 
67 /**
68  * Import namespaces
69  */
70 
71 using namespace std;
72 
ECA_NETECI_SERVER(ECASOUND_RUN_STATE * state)73 ECA_NETECI_SERVER::ECA_NETECI_SERVER(ECASOUND_RUN_STATE* state)
74   : state_repp(state),
75     srvfd_rep(-1),
76     server_listening_rep(false),
77     unix_sockets_rep(false),
78     cleanup_request_rep(false)
79 {
80 }
81 
~ECA_NETECI_SERVER(void)82 ECA_NETECI_SERVER::~ECA_NETECI_SERVER(void)
83 {
84   if (server_listening_rep == true) {
85     close_server_socket();
86   }
87 }
88 
89 /**
90  * Launches the server thread.
91  *
92  * @param arg pointer to a ECA_NETECI_SERVER object
93  */
launch_server_thread(void * arg)94 void* ECA_NETECI_SERVER::launch_server_thread(void* arg)
95 {
96   ECA_LOG_MSG(ECA_LOGGER::user_objects, "Server thread started");
97 
98   ECA_NETECI_SERVER* self =
99     reinterpret_cast<ECA_NETECI_SERVER*>(arg);
100   self->run();
101   return 0;
102 }
103 
104 /**
105  * Starts running the NetECI server.
106  *
107  * After calling this function, the ECA_CONTROL_MAIN object
108  * may be used at any time from the NetECI server thread.
109  */
run(void)110 void ECA_NETECI_SERVER::run(void)
111 {
112   create_server_socket();
113   open_server_socket();
114   if (server_listening_rep == true) {
115     listen_for_events();
116   }
117   else {
118     ECA_LOG_MSG(ECA_LOGGER::info,
119 		"Unable to start NetECI server. Please check that no other program is using the TCP port "
120 		+ kvu_numtostr(state_repp->neteci_tcp_port)
121 		+ ".");
122   }
123   close_server_socket();
124 
125   ECA_LOG_MSG(ECA_LOGGER::user_objects,
126 	      "server thread exiting");
127 }
128 
129 /**
130  * Creates a server socket with 'socket()'. Depending on
131  * object configuration either UNIX or IP socket is
132  * created.
133  */
create_server_socket(void)134 void ECA_NETECI_SERVER::create_server_socket(void)
135 {
136   DBC_REQUIRE(server_listening_rep != true);
137   DBC_REQUIRE(srvfd_rep <= 0);
138 
139   if (unix_sockets_rep == true) {
140     srvfd_rep = socket(AF_UNIX, SOCK_STREAM, 0);
141     if (srvfd_rep >= 0) {
142       /* create a temporary filename for the socket in a secure way */
143       socketpath_rep = "/tmp/neteci_server_1";
144       addr_un_rep.sun_family = AF_UNIX;
145       memcpy(addr_un_rep.sun_path, socketpath_rep.c_str(), socketpath_rep.size() + 1);
146       addr_repp = reinterpret_cast<struct sockaddr*>(&addr_un_rep);
147     }
148   }
149   else {
150     srvfd_rep = socket(PF_INET, SOCK_STREAM, 0);
151     if (srvfd_rep >= 0) {
152       addr_in_rep.sin_family = AF_INET;
153       addr_in_rep.sin_port = htons(state_repp->neteci_tcp_port);
154       addr_in_rep.sin_addr.s_addr = INADDR_ANY;
155 
156       addr_repp = reinterpret_cast<struct sockaddr*>(&addr_in_rep);
157     }
158   }
159 }
160 
161 /**
162  * Opens the server socket for listening. If succesful,
163  * 'server_listening_rep' will be true after the call.
164  */
open_server_socket(void)165 void ECA_NETECI_SERVER::open_server_socket(void)
166 {
167   DBC_REQUIRE(server_listening_rep != true);
168   DBC_REQUIRE(srvfd_rep > 0);
169 
170   int val = 1;
171   int ret = setsockopt(srvfd_rep, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val));
172   if (ret < 0)
173     std::cerr << "setsockopt() failed." << endl;
174 
175   // int res = bind(srvfd_rep, (struct sockaddr*)addr_repp, sizeof(*addr_repp));
176 
177   int res = 0;
178   if (unix_sockets_rep == true)
179     res = ::bind(srvfd_rep, (struct sockaddr*)&addr_un_rep, sizeof(addr_un_rep));
180   else
181     res = ::bind(srvfd_rep, (struct sockaddr*)&addr_in_rep, sizeof(addr_in_rep));
182 
183   if (res == 0) {
184     res = listen(srvfd_rep, 5);
185     if (res == 0) {
186       int res = fcntl(srvfd_rep, F_SETFL, O_NONBLOCK);
187       if (res == -1)
188 	std::cerr << "fcntl() failed." << endl;
189 
190       NETECI_DEBUG(std::cout << "server socket created." << endl);
191       server_listening_rep = true;
192     }
193     else
194       std::cerr << "listen() failed." << endl;
195   }
196   else {
197     if (unix_sockets_rep == true) {
198       unlink(socketpath_rep.c_str());
199     }
200     socketpath_rep.resize(0);
201     std::cerr << "bind() failed." << endl;
202   }
203 
204   DBC_ENSURE((unix_sockets_rep == true &&
205 	     (((server_listening_rep == true && socketpath_rep.size() > 0) ||
206 	       (server_listening_rep != true && socketpath_rep.size() == 0)))) ||
207 	     (unix_sockets_rep != true));
208 }
209 
210 /**
211  * Closes the server socket.
212  */
close_server_socket(void)213 void ECA_NETECI_SERVER::close_server_socket(void)
214 {
215   DBC_REQUIRE(srvfd_rep > 0);
216   DBC_REQUIRE(server_listening_rep == true);
217 
218   NETECI_DEBUG(cerr << "closing socket " << kvu_numtostr(srvfd_rep) << "." << endl);
219   close(srvfd_rep);
220   srvfd_rep = -1;
221   server_listening_rep = false;
222 
223   DBC_ENSURE(srvfd_rep == -1);
224   DBC_ENSURE(server_listening_rep != true);
225 }
226 
227 /**
228  * Listens for and accepts incoming connections.
229  */
listen_for_events(void)230 void ECA_NETECI_SERVER::listen_for_events(void)
231 {
232   /*
233    * - loop until we get an exit request from network or from
234    *   ecasound_state
235    */
236 
237   /* - enter poll
238    * - if new connections, accept them and add the new client to
239    *   client list
240    * - if incoming bytes, grab ecasound_state lock, send command,
241    *   store retval, release lock, send the reply to client
242    * - return to poll
243    */
244   while(state_repp->exit_requested() != true) {
245     // NETECI_DEBUG(cerr << "checking for events" << endl);
246     check_for_events(2000);
247   }
248 
249   if (state_repp->exit_requested() == true) {
250     NETECI_DEBUG(cerr << "exit_request received" << endl);
251   }
252 }
253 
254 /**
255  * Checks for new connections and messages from
256  * clients.
257  *
258  * @param timeout upper-limit in ms for how long
259  *        function waits for events; if -1,
260  *        call will return immediately
261  *        (ie. is non-blocking)
262  */
check_for_events(int timeout)263 void ECA_NETECI_SERVER::check_for_events(int timeout)
264 {
265   int nfds = clients_rep.size() + 1;
266   struct pollfd* ufds = new struct pollfd [nfds];
267 
268   ufds[0].fd = srvfd_rep;
269   ufds[0].events = POLLIN;
270   ufds[0].revents = 0;
271 
272   std::list<struct ecasound_neteci_server_client*>::iterator p = clients_rep.begin();
273   for(int n = 1; n < nfds; n++) {
274     ufds[n].fd = (*p)->fd;
275     ufds[n].events = POLLIN;
276     ufds[n].revents = 0;
277     ++p;
278   }
279   DBC_CHECK(nfds == 1 || p == clients_rep.end());
280 
281   int ret = poll(ufds, nfds, timeout);
282   if (ret > 0) {
283     if (ufds[0].revents & POLLIN) {
284       /* 1. new incoming connection */
285       handle_connection(srvfd_rep);
286     }
287     p = clients_rep.begin();
288     for(int n = 1; n < nfds; n++) {
289       if (ufds[n].revents & POLLIN) {
290 	/* 2. client has sent a message */
291 	handle_client_messages(*p);
292       }
293       else if (ufds[n].revents == POLLERR ||
294 	       ufds[n].revents == POLLHUP ||
295 	       ufds[n].revents == POLLNVAL) {
296 	/* 3. error, remove client */
297 	remove_client(*p);
298       }
299       if (p != clients_rep.end()) ++p;
300     }
301   }
302 
303   if (cleanup_request_rep == true) {
304     clean_removed_clients();
305   }
306 
307   delete[] ufds;
308 }
309 
handle_connection(int fd)310 void ECA_NETECI_SERVER::handle_connection(int fd)
311 {
312   socklen_t bytes = 0;
313   string peername;
314   int connfd = 0;
315 
316   if (unix_sockets_rep == true) {
317     bytes = static_cast<socklen_t>(sizeof(addr_un_rep));
318     connfd = accept(fd, reinterpret_cast<struct sockaddr*>(&addr_un_rep), &bytes);
319     peername = "UNIX:" + socketpath_rep;
320   }
321   else {
322     bytes = static_cast<socklen_t>(sizeof(addr_in_rep));
323     connfd = accept(fd, reinterpret_cast<struct sockaddr*>(&addr_in_rep), &bytes);
324 
325     if (connfd > 0) {
326       struct sockaddr_in peeraddr;
327       socklen_t peernamelen;
328       // struct in_addr peerip;
329       peername = "TCP/IP:";
330       int res = getpeername(connfd,
331 			    reinterpret_cast<struct sockaddr*>(&peeraddr),
332 			    reinterpret_cast<socklen_t*>(&peernamelen));
333       if (res == 0)
334 	peername += string(inet_ntoa(peeraddr.sin_addr));
335       else
336 	peername += string(inet_ntoa(addr_in_rep.sin_addr));
337     }
338   }
339 
340   ECA_LOG_MSG(ECA_LOGGER::info,
341 	      "New connection from " +
342 	      peername + ".");
343 
344 
345   if (connfd >= 0) {
346     NETECI_DEBUG(cerr << "incoming connection accepted" << endl);
347     struct ecasound_neteci_server_client* client = new struct ecasound_neteci_server_client; /* add a new client */
348     client->fd = connfd;
349     client->buffer_length = ECA_NETECI_START_BUFFER_SIZE;
350     client->buffer = new char [client->buffer_length];
351     client->buffer_current_ptr = 0;
352     client->peername = peername;
353     clients_rep.push_back(client);
354   }
355 }
356 
357 /**
358  * Handle incoming messages for client 'client'.
359  */
handle_client_messages(struct ecasound_neteci_server_client * client)360 void ECA_NETECI_SERVER::handle_client_messages(struct ecasound_neteci_server_client* client)
361 {
362   char* buf[128];
363   int connfd = client->fd;
364 
365   NETECI_DEBUG(cerr << "handle_client_messages for fd "
366 	       << connfd << endl);
367 
368   ssize_t c = kvu_fd_read(connfd, buf, 128, 5000);
369   if (c > 0) {
370     parse_raw_incoming_data(reinterpret_cast<char*>(buf), c, client);
371     while(parsed_cmd_queue_rep.size() > 0) {
372       const string& nextcmd = parsed_cmd_queue_rep.front();
373       if (nextcmd == "quit" || nextcmd == "q") {
374 	NETECI_DEBUG(cerr << "client initiated quit, removing client-fd " << connfd << "." << endl);
375 	remove_client(client);
376       }
377       else {
378 	handle_eci_command(nextcmd, client);
379       }
380       parsed_cmd_queue_rep.pop_front();
381     }
382     /* ... */
383   }
384   else {
385     /* read() <= 0 */
386     NETECI_DEBUG(cerr << "read error, removing client-fd " << connfd << "." << endl);
387     remove_client(client);
388   }
389 }
390 
parse_raw_incoming_data(const char * buffer,ssize_t bytes,struct ecasound_neteci_server_client * client)391 void ECA_NETECI_SERVER::parse_raw_incoming_data(const char* buffer,
392 						ssize_t bytes,
393 						struct ecasound_neteci_server_client* client)
394 {
395   DBC_REQUIRE(buffer != 0);
396   DBC_REQUIRE(bytes >= 0);
397   DBC_REQUIRE(client != 0);
398   DBC_DECLARE(int old_client_ptr = client->buffer_current_ptr);
399   DBC_DECLARE(unsigned int old_cmd_queue_size = parsed_cmd_queue_rep.size());
400 
401   NETECI_DEBUG(cerr << "parse incoming data; "
402 	       << bytes << " bytes. Buffer length is "
403 	       << client->buffer_length << endl);
404 
405   for(int n = 0; n < bytes; n++) {
406     DBC_CHECK(client->buffer_current_ptr <= client->buffer_length);
407     if (client->buffer_current_ptr == client->buffer_length) {
408       int new_buffer_length = client->buffer_length * 2;
409       char *new_buffer = new char [new_buffer_length];
410 
411       if (new_buffer_length > ECA_NETECI_MAX_BUFFER_SIZE) {
412 	cerr << "client buffer overflow, unable to increase buffer size. flushing..." << endl;
413 	client->buffer_current_ptr = 0;
414       }
415       else {
416 	NETECI_DEBUG(cerr << "client buffer overflow, increasing buffer size from "
417 		     << client->buffer_length << " to " << new_buffer_length << " bytes." << endl);
418 
419 	for(int i = 0; i < client->buffer_length; i++) new_buffer[i] = client->buffer[i];
420 
421 	delete[] client->buffer;
422 	client->buffer = new_buffer;
423 	client->buffer_length = new_buffer_length;
424       }
425     }
426 
427     NETECI_DEBUG(cerr << "copying '" << buffer[n] << "'\n");
428     client->buffer[client->buffer_current_ptr] = buffer[n];
429     if (client->buffer_current_ptr > 0 &&
430 	client->buffer[client->buffer_current_ptr] == '\n' &&
431 	client->buffer[client->buffer_current_ptr - 1] == '\r') {
432 
433       string cmd (client->buffer, client->buffer_current_ptr - 1);
434       NETECI_DEBUG(cerr << "storing command '" <<	cmd << "'" << endl);
435       parsed_cmd_queue_rep.push_back(cmd);
436 
437       NETECI_DEBUG(cerr << "copying "
438 		   << client->buffer_length - client->buffer_current_ptr - 1
439 		   << " bytes from " << client->buffer_current_ptr + 1
440 		   << " to the beginning of the buffer."
441 		   << " Index is " << client->buffer_current_ptr << endl);
442 
443       DBC_CHECK(client->buffer_current_ptr < client->buffer_length);
444 
445 #if 0
446       /* must not use memcpy() as the
447 	 affected areas may overlap! */
448       for(int o = 0, p = index + 1;
449 	  p < client->buffer_length; o++, p++) {
450 	client->buffer[o] = client->buffer[p];
451       }
452 #endif
453       client->buffer_current_ptr = 0;
454     }
455     else {
456       // NETECI_DEBUG(cerr << "crlf not found, index=" << index << ", n=" << n << "cur_ptr=" << client->buffer_current_ptr << ".\n");
457       client->buffer_current_ptr++;
458     }
459   }
460 
461   DBC_ENSURE(client->buffer_current_ptr > old_client_ptr ||
462 	     parsed_cmd_queue_rep.size() > old_cmd_queue_size);
463 }
464 
handle_eci_command(const string & cmd,struct ecasound_neteci_server_client * client)465 void ECA_NETECI_SERVER::handle_eci_command(const string& cmd, struct ecasound_neteci_server_client* client)
466 {
467   ECA_CONTROL_MT* ctrl = state_repp->control;
468 
469   NETECI_DEBUG(cerr << "handle eci command: " << cmd << endl);
470 
471   assert(ctrl != 0);
472 
473   struct eci_return_value retval;
474   ctrl->command(cmd, &retval);
475 
476   string strtosend =
477     ECA_LOGGER_WELLFORMED::create_wellformed_message(ECA_LOGGER::eiam_return_values,
478       std::string(ECA_CONTROL_MAIN::return_value_type_to_string(&retval))
479       + " " +
480       ECA_CONTROL_MAIN::return_value_to_string(&retval));
481 
482   int bytes_to_send = strtosend.size();
483   while(bytes_to_send > 0) {
484     int ret = kvu_fd_write(client->fd, strtosend.c_str(), strtosend.size(), 5000);
485     if (ret < 0) {
486       cerr << "error in kvu_fd_write(), removing client.\n";
487       remove_client(client);
488       break;
489     }
490     else {
491       bytes_to_send -= ret;
492     }
493   }
494 }
495 
496 /**
497  * Removes 'client' from list of clients.
498  *
499  * Note! Internally, the 'fd' field of the deleted client
500  * is marked to be -1.
501  *
502  * @see clean_removed_clients()
503  */
remove_client(struct ecasound_neteci_server_client * client)504 void ECA_NETECI_SERVER::remove_client(struct ecasound_neteci_server_client* client)
505 {
506   NETECI_DEBUG(std::cout << "removing client." << std::endl);
507 
508   if (client != 0 && client->fd > 0) {
509     ECA_LOG_MSG(ECA_LOGGER::info,
510 		"Closing connection " +
511 		client->peername + ".");
512     close(client->fd);
513     client->fd = -1;
514   }
515 
516   cleanup_request_rep = true;
517 }
518 
519 /**
520  * Cleans the list of clients from removed objects.
521  *
522  * @see remove_client()
523  */
clean_removed_clients(void)524 void ECA_NETECI_SERVER::clean_removed_clients(void)
525 {
526   DBC_DECLARE(size_t oldsize = clients_rep.size());
527   DBC_DECLARE(size_t counter = 0);
528 
529   NETECI_DEBUG(std::cerr << "cleaning removed clients." << std::endl);
530 
531   list<struct ecasound_neteci_server_client*>::iterator p = clients_rep.begin();
532   while(p != clients_rep.end()) {
533     NETECI_DEBUG(std::cerr << "checking for delete, client " << *p << std::endl);
534     if (*p != 0 && (*p)->fd == -1) {
535       if ((*p)->buffer != 0) {
536 	delete[] (*p)->buffer;
537 	(*p)->buffer = 0;
538       }
539       std::list<struct ecasound_neteci_server_client*>::iterator q = p;
540       ++q;
541       NETECI_DEBUG(std::cerr << "deleting client " << *p << std::endl);
542       delete *p;
543       NETECI_DEBUG(std::cerr << "erasing client " << *p << std::endl);
544       *p = 0;
545       clients_rep.erase(p);
546       p = q;
547       DBC_DECLARE(++counter);
548     }
549     else {
550       ++p;
551     }
552   }
553 
554   cleanup_request_rep = false;
555 
556   DBC_ENSURE(clients_rep.size() == oldsize - counter);
557 }
558