1 // ------------------------------------------------------------------------
2 // eca-neteci-server.c: NetECI server implementation.
3 // Copyright (C) 2002,2004,2009 Kai Vehmanen
4 //
5 // Attributes:
6 // eca-style-version: 3
7 //
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12 //
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with this program; if not, write to the Free Software
20 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 // ------------------------------------------------------------------------
22
23 #include <cassert>
24 #include <cstring> /* memcpy() */
25 #include <iostream>
26 #include <string>
27
28 #include <fcntl.h> /* POSIX: fcntl() */
29 #include <pthread.h> /* POSIX: pthread_* */
30 #include <unistd.h> /* POSIX: fcntl() */
31 #include <arpa/inet.h> /* BSD: inet_ntoa() */
32 #include <netinet/in.h> /* BSD: inet_ntoa() */
33 #include <sys/poll.h> /* POSIX: poll() */
34 #include <sys/socket.h> /* BSD: getpeername() */
35 #include <sys/types.h> /* OSX: u_int32_t (INADDR_ANY) */
36
37 #include <kvu_dbc.h>
38 #include <kvu_fd_io.h>
39 #include <kvu_numtostr.h>
40 #include <kvu_utils.h>
41
42 #include <eca-control-mt.h>
43 #include <eca-logger.h>
44 #include <eca-logger-wellformed.h>
45
46 #include "ecasound.h"
47 #include "eca-neteci-server.h"
48
49 /**
50 * Options
51 */
52 // #define NETECI_DEBUG_ENABLED
53
54 #define ECA_NETECI_START_BUFFER_SIZE 128
55 #define ECA_NETECI_MAX_BUFFER_SIZE 65536
56
57 /**
58 * Macro definitions
59 */
60
61 #ifdef NETECI_DEBUG_ENABLED
62 #define NETECI_DEBUG(x) x
63 #else
64 #define NETECI_DEBUG(x) ((void) 0)
65 #endif
66
67 /**
68 * Import namespaces
69 */
70
71 using namespace std;
72
ECA_NETECI_SERVER(ECASOUND_RUN_STATE * state)73 ECA_NETECI_SERVER::ECA_NETECI_SERVER(ECASOUND_RUN_STATE* state)
74 : state_repp(state),
75 srvfd_rep(-1),
76 server_listening_rep(false),
77 unix_sockets_rep(false),
78 cleanup_request_rep(false)
79 {
80 }
81
~ECA_NETECI_SERVER(void)82 ECA_NETECI_SERVER::~ECA_NETECI_SERVER(void)
83 {
84 if (server_listening_rep == true) {
85 close_server_socket();
86 }
87 }
88
89 /**
90 * Launches the server thread.
91 *
92 * @param arg pointer to a ECA_NETECI_SERVER object
93 */
launch_server_thread(void * arg)94 void* ECA_NETECI_SERVER::launch_server_thread(void* arg)
95 {
96 ECA_LOG_MSG(ECA_LOGGER::user_objects, "Server thread started");
97
98 ECA_NETECI_SERVER* self =
99 reinterpret_cast<ECA_NETECI_SERVER*>(arg);
100 self->run();
101 return 0;
102 }
103
104 /**
105 * Starts running the NetECI server.
106 *
107 * After calling this function, the ECA_CONTROL_MAIN object
108 * may be used at any time from the NetECI server thread.
109 */
run(void)110 void ECA_NETECI_SERVER::run(void)
111 {
112 create_server_socket();
113 open_server_socket();
114 if (server_listening_rep == true) {
115 listen_for_events();
116 }
117 else {
118 ECA_LOG_MSG(ECA_LOGGER::info,
119 "Unable to start NetECI server. Please check that no other program is using the TCP port "
120 + kvu_numtostr(state_repp->neteci_tcp_port)
121 + ".");
122 }
123 close_server_socket();
124
125 ECA_LOG_MSG(ECA_LOGGER::user_objects,
126 "server thread exiting");
127 }
128
129 /**
130 * Creates a server socket with 'socket()'. Depending on
131 * object configuration either UNIX or IP socket is
132 * created.
133 */
create_server_socket(void)134 void ECA_NETECI_SERVER::create_server_socket(void)
135 {
136 DBC_REQUIRE(server_listening_rep != true);
137 DBC_REQUIRE(srvfd_rep <= 0);
138
139 if (unix_sockets_rep == true) {
140 srvfd_rep = socket(AF_UNIX, SOCK_STREAM, 0);
141 if (srvfd_rep >= 0) {
142 /* create a temporary filename for the socket in a secure way */
143 socketpath_rep = "/tmp/neteci_server_1";
144 addr_un_rep.sun_family = AF_UNIX;
145 memcpy(addr_un_rep.sun_path, socketpath_rep.c_str(), socketpath_rep.size() + 1);
146 addr_repp = reinterpret_cast<struct sockaddr*>(&addr_un_rep);
147 }
148 }
149 else {
150 srvfd_rep = socket(PF_INET, SOCK_STREAM, 0);
151 if (srvfd_rep >= 0) {
152 addr_in_rep.sin_family = AF_INET;
153 addr_in_rep.sin_port = htons(state_repp->neteci_tcp_port);
154 addr_in_rep.sin_addr.s_addr = INADDR_ANY;
155
156 addr_repp = reinterpret_cast<struct sockaddr*>(&addr_in_rep);
157 }
158 }
159 }
160
161 /**
162 * Opens the server socket for listening. If succesful,
163 * 'server_listening_rep' will be true after the call.
164 */
open_server_socket(void)165 void ECA_NETECI_SERVER::open_server_socket(void)
166 {
167 DBC_REQUIRE(server_listening_rep != true);
168 DBC_REQUIRE(srvfd_rep > 0);
169
170 int val = 1;
171 int ret = setsockopt(srvfd_rep, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val));
172 if (ret < 0)
173 std::cerr << "setsockopt() failed." << endl;
174
175 // int res = bind(srvfd_rep, (struct sockaddr*)addr_repp, sizeof(*addr_repp));
176
177 int res = 0;
178 if (unix_sockets_rep == true)
179 res = ::bind(srvfd_rep, (struct sockaddr*)&addr_un_rep, sizeof(addr_un_rep));
180 else
181 res = ::bind(srvfd_rep, (struct sockaddr*)&addr_in_rep, sizeof(addr_in_rep));
182
183 if (res == 0) {
184 res = listen(srvfd_rep, 5);
185 if (res == 0) {
186 int res = fcntl(srvfd_rep, F_SETFL, O_NONBLOCK);
187 if (res == -1)
188 std::cerr << "fcntl() failed." << endl;
189
190 NETECI_DEBUG(std::cout << "server socket created." << endl);
191 server_listening_rep = true;
192 }
193 else
194 std::cerr << "listen() failed." << endl;
195 }
196 else {
197 if (unix_sockets_rep == true) {
198 unlink(socketpath_rep.c_str());
199 }
200 socketpath_rep.resize(0);
201 std::cerr << "bind() failed." << endl;
202 }
203
204 DBC_ENSURE((unix_sockets_rep == true &&
205 (((server_listening_rep == true && socketpath_rep.size() > 0) ||
206 (server_listening_rep != true && socketpath_rep.size() == 0)))) ||
207 (unix_sockets_rep != true));
208 }
209
210 /**
211 * Closes the server socket.
212 */
close_server_socket(void)213 void ECA_NETECI_SERVER::close_server_socket(void)
214 {
215 DBC_REQUIRE(srvfd_rep > 0);
216 DBC_REQUIRE(server_listening_rep == true);
217
218 NETECI_DEBUG(cerr << "closing socket " << kvu_numtostr(srvfd_rep) << "." << endl);
219 close(srvfd_rep);
220 srvfd_rep = -1;
221 server_listening_rep = false;
222
223 DBC_ENSURE(srvfd_rep == -1);
224 DBC_ENSURE(server_listening_rep != true);
225 }
226
227 /**
228 * Listens for and accepts incoming connections.
229 */
listen_for_events(void)230 void ECA_NETECI_SERVER::listen_for_events(void)
231 {
232 /*
233 * - loop until we get an exit request from network or from
234 * ecasound_state
235 */
236
237 /* - enter poll
238 * - if new connections, accept them and add the new client to
239 * client list
240 * - if incoming bytes, grab ecasound_state lock, send command,
241 * store retval, release lock, send the reply to client
242 * - return to poll
243 */
244 while(state_repp->exit_requested() != true) {
245 // NETECI_DEBUG(cerr << "checking for events" << endl);
246 check_for_events(2000);
247 }
248
249 if (state_repp->exit_requested() == true) {
250 NETECI_DEBUG(cerr << "exit_request received" << endl);
251 }
252 }
253
254 /**
255 * Checks for new connections and messages from
256 * clients.
257 *
258 * @param timeout upper-limit in ms for how long
259 * function waits for events; if -1,
260 * call will return immediately
261 * (ie. is non-blocking)
262 */
check_for_events(int timeout)263 void ECA_NETECI_SERVER::check_for_events(int timeout)
264 {
265 int nfds = clients_rep.size() + 1;
266 struct pollfd* ufds = new struct pollfd [nfds];
267
268 ufds[0].fd = srvfd_rep;
269 ufds[0].events = POLLIN;
270 ufds[0].revents = 0;
271
272 std::list<struct ecasound_neteci_server_client*>::iterator p = clients_rep.begin();
273 for(int n = 1; n < nfds; n++) {
274 ufds[n].fd = (*p)->fd;
275 ufds[n].events = POLLIN;
276 ufds[n].revents = 0;
277 ++p;
278 }
279 DBC_CHECK(nfds == 1 || p == clients_rep.end());
280
281 int ret = poll(ufds, nfds, timeout);
282 if (ret > 0) {
283 if (ufds[0].revents & POLLIN) {
284 /* 1. new incoming connection */
285 handle_connection(srvfd_rep);
286 }
287 p = clients_rep.begin();
288 for(int n = 1; n < nfds; n++) {
289 if (ufds[n].revents & POLLIN) {
290 /* 2. client has sent a message */
291 handle_client_messages(*p);
292 }
293 else if (ufds[n].revents == POLLERR ||
294 ufds[n].revents == POLLHUP ||
295 ufds[n].revents == POLLNVAL) {
296 /* 3. error, remove client */
297 remove_client(*p);
298 }
299 if (p != clients_rep.end()) ++p;
300 }
301 }
302
303 if (cleanup_request_rep == true) {
304 clean_removed_clients();
305 }
306
307 delete[] ufds;
308 }
309
handle_connection(int fd)310 void ECA_NETECI_SERVER::handle_connection(int fd)
311 {
312 socklen_t bytes = 0;
313 string peername;
314 int connfd = 0;
315
316 if (unix_sockets_rep == true) {
317 bytes = static_cast<socklen_t>(sizeof(addr_un_rep));
318 connfd = accept(fd, reinterpret_cast<struct sockaddr*>(&addr_un_rep), &bytes);
319 peername = "UNIX:" + socketpath_rep;
320 }
321 else {
322 bytes = static_cast<socklen_t>(sizeof(addr_in_rep));
323 connfd = accept(fd, reinterpret_cast<struct sockaddr*>(&addr_in_rep), &bytes);
324
325 if (connfd > 0) {
326 struct sockaddr_in peeraddr;
327 socklen_t peernamelen;
328 // struct in_addr peerip;
329 peername = "TCP/IP:";
330 int res = getpeername(connfd,
331 reinterpret_cast<struct sockaddr*>(&peeraddr),
332 reinterpret_cast<socklen_t*>(&peernamelen));
333 if (res == 0)
334 peername += string(inet_ntoa(peeraddr.sin_addr));
335 else
336 peername += string(inet_ntoa(addr_in_rep.sin_addr));
337 }
338 }
339
340 ECA_LOG_MSG(ECA_LOGGER::info,
341 "New connection from " +
342 peername + ".");
343
344
345 if (connfd >= 0) {
346 NETECI_DEBUG(cerr << "incoming connection accepted" << endl);
347 struct ecasound_neteci_server_client* client = new struct ecasound_neteci_server_client; /* add a new client */
348 client->fd = connfd;
349 client->buffer_length = ECA_NETECI_START_BUFFER_SIZE;
350 client->buffer = new char [client->buffer_length];
351 client->buffer_current_ptr = 0;
352 client->peername = peername;
353 clients_rep.push_back(client);
354 }
355 }
356
357 /**
358 * Handle incoming messages for client 'client'.
359 */
handle_client_messages(struct ecasound_neteci_server_client * client)360 void ECA_NETECI_SERVER::handle_client_messages(struct ecasound_neteci_server_client* client)
361 {
362 char* buf[128];
363 int connfd = client->fd;
364
365 NETECI_DEBUG(cerr << "handle_client_messages for fd "
366 << connfd << endl);
367
368 ssize_t c = kvu_fd_read(connfd, buf, 128, 5000);
369 if (c > 0) {
370 parse_raw_incoming_data(reinterpret_cast<char*>(buf), c, client);
371 while(parsed_cmd_queue_rep.size() > 0) {
372 const string& nextcmd = parsed_cmd_queue_rep.front();
373 if (nextcmd == "quit" || nextcmd == "q") {
374 NETECI_DEBUG(cerr << "client initiated quit, removing client-fd " << connfd << "." << endl);
375 remove_client(client);
376 }
377 else {
378 handle_eci_command(nextcmd, client);
379 }
380 parsed_cmd_queue_rep.pop_front();
381 }
382 /* ... */
383 }
384 else {
385 /* read() <= 0 */
386 NETECI_DEBUG(cerr << "read error, removing client-fd " << connfd << "." << endl);
387 remove_client(client);
388 }
389 }
390
parse_raw_incoming_data(const char * buffer,ssize_t bytes,struct ecasound_neteci_server_client * client)391 void ECA_NETECI_SERVER::parse_raw_incoming_data(const char* buffer,
392 ssize_t bytes,
393 struct ecasound_neteci_server_client* client)
394 {
395 DBC_REQUIRE(buffer != 0);
396 DBC_REQUIRE(bytes >= 0);
397 DBC_REQUIRE(client != 0);
398 DBC_DECLARE(int old_client_ptr = client->buffer_current_ptr);
399 DBC_DECLARE(unsigned int old_cmd_queue_size = parsed_cmd_queue_rep.size());
400
401 NETECI_DEBUG(cerr << "parse incoming data; "
402 << bytes << " bytes. Buffer length is "
403 << client->buffer_length << endl);
404
405 for(int n = 0; n < bytes; n++) {
406 DBC_CHECK(client->buffer_current_ptr <= client->buffer_length);
407 if (client->buffer_current_ptr == client->buffer_length) {
408 int new_buffer_length = client->buffer_length * 2;
409 char *new_buffer = new char [new_buffer_length];
410
411 if (new_buffer_length > ECA_NETECI_MAX_BUFFER_SIZE) {
412 cerr << "client buffer overflow, unable to increase buffer size. flushing..." << endl;
413 client->buffer_current_ptr = 0;
414 }
415 else {
416 NETECI_DEBUG(cerr << "client buffer overflow, increasing buffer size from "
417 << client->buffer_length << " to " << new_buffer_length << " bytes." << endl);
418
419 for(int i = 0; i < client->buffer_length; i++) new_buffer[i] = client->buffer[i];
420
421 delete[] client->buffer;
422 client->buffer = new_buffer;
423 client->buffer_length = new_buffer_length;
424 }
425 }
426
427 NETECI_DEBUG(cerr << "copying '" << buffer[n] << "'\n");
428 client->buffer[client->buffer_current_ptr] = buffer[n];
429 if (client->buffer_current_ptr > 0 &&
430 client->buffer[client->buffer_current_ptr] == '\n' &&
431 client->buffer[client->buffer_current_ptr - 1] == '\r') {
432
433 string cmd (client->buffer, client->buffer_current_ptr - 1);
434 NETECI_DEBUG(cerr << "storing command '" << cmd << "'" << endl);
435 parsed_cmd_queue_rep.push_back(cmd);
436
437 NETECI_DEBUG(cerr << "copying "
438 << client->buffer_length - client->buffer_current_ptr - 1
439 << " bytes from " << client->buffer_current_ptr + 1
440 << " to the beginning of the buffer."
441 << " Index is " << client->buffer_current_ptr << endl);
442
443 DBC_CHECK(client->buffer_current_ptr < client->buffer_length);
444
445 #if 0
446 /* must not use memcpy() as the
447 affected areas may overlap! */
448 for(int o = 0, p = index + 1;
449 p < client->buffer_length; o++, p++) {
450 client->buffer[o] = client->buffer[p];
451 }
452 #endif
453 client->buffer_current_ptr = 0;
454 }
455 else {
456 // NETECI_DEBUG(cerr << "crlf not found, index=" << index << ", n=" << n << "cur_ptr=" << client->buffer_current_ptr << ".\n");
457 client->buffer_current_ptr++;
458 }
459 }
460
461 DBC_ENSURE(client->buffer_current_ptr > old_client_ptr ||
462 parsed_cmd_queue_rep.size() > old_cmd_queue_size);
463 }
464
handle_eci_command(const string & cmd,struct ecasound_neteci_server_client * client)465 void ECA_NETECI_SERVER::handle_eci_command(const string& cmd, struct ecasound_neteci_server_client* client)
466 {
467 ECA_CONTROL_MT* ctrl = state_repp->control;
468
469 NETECI_DEBUG(cerr << "handle eci command: " << cmd << endl);
470
471 assert(ctrl != 0);
472
473 struct eci_return_value retval;
474 ctrl->command(cmd, &retval);
475
476 string strtosend =
477 ECA_LOGGER_WELLFORMED::create_wellformed_message(ECA_LOGGER::eiam_return_values,
478 std::string(ECA_CONTROL_MAIN::return_value_type_to_string(&retval))
479 + " " +
480 ECA_CONTROL_MAIN::return_value_to_string(&retval));
481
482 int bytes_to_send = strtosend.size();
483 while(bytes_to_send > 0) {
484 int ret = kvu_fd_write(client->fd, strtosend.c_str(), strtosend.size(), 5000);
485 if (ret < 0) {
486 cerr << "error in kvu_fd_write(), removing client.\n";
487 remove_client(client);
488 break;
489 }
490 else {
491 bytes_to_send -= ret;
492 }
493 }
494 }
495
496 /**
497 * Removes 'client' from list of clients.
498 *
499 * Note! Internally, the 'fd' field of the deleted client
500 * is marked to be -1.
501 *
502 * @see clean_removed_clients()
503 */
remove_client(struct ecasound_neteci_server_client * client)504 void ECA_NETECI_SERVER::remove_client(struct ecasound_neteci_server_client* client)
505 {
506 NETECI_DEBUG(std::cout << "removing client." << std::endl);
507
508 if (client != 0 && client->fd > 0) {
509 ECA_LOG_MSG(ECA_LOGGER::info,
510 "Closing connection " +
511 client->peername + ".");
512 close(client->fd);
513 client->fd = -1;
514 }
515
516 cleanup_request_rep = true;
517 }
518
519 /**
520 * Cleans the list of clients from removed objects.
521 *
522 * @see remove_client()
523 */
clean_removed_clients(void)524 void ECA_NETECI_SERVER::clean_removed_clients(void)
525 {
526 DBC_DECLARE(size_t oldsize = clients_rep.size());
527 DBC_DECLARE(size_t counter = 0);
528
529 NETECI_DEBUG(std::cerr << "cleaning removed clients." << std::endl);
530
531 list<struct ecasound_neteci_server_client*>::iterator p = clients_rep.begin();
532 while(p != clients_rep.end()) {
533 NETECI_DEBUG(std::cerr << "checking for delete, client " << *p << std::endl);
534 if (*p != 0 && (*p)->fd == -1) {
535 if ((*p)->buffer != 0) {
536 delete[] (*p)->buffer;
537 (*p)->buffer = 0;
538 }
539 std::list<struct ecasound_neteci_server_client*>::iterator q = p;
540 ++q;
541 NETECI_DEBUG(std::cerr << "deleting client " << *p << std::endl);
542 delete *p;
543 NETECI_DEBUG(std::cerr << "erasing client " << *p << std::endl);
544 *p = 0;
545 clients_rep.erase(p);
546 p = q;
547 DBC_DECLARE(++counter);
548 }
549 else {
550 ++p;
551 }
552 }
553
554 cleanup_request_rep = false;
555
556 DBC_ENSURE(clients_rep.size() == oldsize - counter);
557 }
558