1 #include <errno.h>
2 #include <stdint.h>
3 #include <string.h>
4 #include <pthread.h>
5
6 #if !defined(WIN32)
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <sys/un.h>
10 #include <unistd.h>
11 #endif // !defined(WIN32)
12
13 #include <glib.h>
14 #include <glib/gstdio.h>
15 #include <jansson.h>
16
17 #include "searpc-utils.h"
18 #include "searpc-client.h"
19 #include "searpc-server.h"
20 #include "searpc-named-pipe-transport.h"
21
22 #if defined(WIN32)
23 static const int kPipeBufSize = 1024;
24 static char* formatErrorMessage();
25
26 #define G_WARNING_WITH_LAST_ERROR(fmt) \
27 do { \
28 char *error_msg__ = formatErrorMessage(); \
29 g_warning(fmt ": %s\n", error_msg__); \
30 g_free (error_msg__); \
31 } while(0);
32
33 #endif // defined(WIN32)
34
35 static void* named_pipe_listen(void *arg);
36 static void* handle_named_pipe_client_with_thread (void *arg);
37 static void handle_named_pipe_client_with_threadpool(void *data, void *user_data);
38 static void named_pipe_client_handler (void *data);
39 static char* searpc_named_pipe_send(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len);
40
41 static char * request_to_json(const char *service, const char *fcall_str, size_t fcall_len);
42 static int request_from_json (const char *content, size_t len, char **service, char **fcall_str);
43 static void json_object_set_string_member (json_t *object, const char *key, const char *value);
44 static const char * json_object_get_string_member (json_t *object, const char *key);
45
46 static gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n);
47 static gssize pipe_read_n(SearpcNamedPipe fd, void *vptr, size_t n);
48
49 typedef struct {
50 SearpcNamedPipeClient* client;
51 char *service;
52 } ClientTransportData;
53
54 SearpcClient*
searpc_client_with_named_pipe_transport(SearpcNamedPipeClient * pipe_client,const char * service)55 searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *pipe_client,
56 const char *service)
57 {
58 SearpcClient *client= searpc_client_new();
59 client->send = searpc_named_pipe_send;
60
61 ClientTransportData *data = g_malloc(sizeof(ClientTransportData));
62 data->client = pipe_client;
63 data->service = g_strdup(service);
64
65 client->arg = data;
66 return client;
67 }
68
searpc_create_named_pipe_client(const char * path)69 SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path)
70 {
71 SearpcNamedPipeClient *client = g_malloc0(sizeof(SearpcNamedPipeClient));
72 memcpy(client->path, path, strlen(path) + 1);
73 return client;
74 }
75
searpc_create_named_pipe_server(const char * path)76 SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path)
77 {
78 SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
79 memcpy(server->path, path, strlen(path) + 1);
80
81 return server;
82 }
83
searpc_create_named_pipe_server_with_threadpool(const char * path,int named_pipe_server_thread_pool_size)84 SearpcNamedPipeServer* searpc_create_named_pipe_server_with_threadpool (const char *path, int named_pipe_server_thread_pool_size)
85 {
86 GError *error = NULL;
87
88 SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
89 memcpy(server->path, path, strlen(path) + 1);
90 server->named_pipe_server_thread_pool = g_thread_pool_new (handle_named_pipe_client_with_threadpool,
91 NULL,
92 named_pipe_server_thread_pool_size,
93 FALSE,
94 &error);
95 if (!server->named_pipe_server_thread_pool) {
96 if (error) {
97 g_warning ("Falied to create named pipe server thread pool : %s\n", error->message);
98 g_clear_error (&error);
99 } else {
100 g_warning ("Falied to create named pipe server thread pool.\n");
101 }
102 g_free (server);
103 return NULL;
104 }
105
106 return server;
107 }
108
searpc_named_pipe_server_start(SearpcNamedPipeServer * server)109 int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
110 {
111 #if !defined(WIN32)
112 int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0);
113 const char *un_path = server->path;
114 if (pipe_fd < 0) {
115 g_warning ("Failed to create unix socket fd : %s\n",
116 strerror(errno));
117 return -1;
118 }
119
120 struct sockaddr_un saddr;
121 saddr.sun_family = AF_UNIX;
122
123 if (strlen(server->path) > sizeof(saddr.sun_path)-1) {
124 g_warning ("Unix socket path %s is too long."
125 "Please set or modify UNIX_SOCKET option in ccnet.conf.\n",
126 un_path);
127 goto failed;
128 }
129
130 if (g_file_test (un_path, G_FILE_TEST_EXISTS)) {
131 g_message ("socket file exists, delete it anyway\n");
132 if (g_unlink (un_path) < 0) {
133 g_warning ("delete socket file failed : %s\n", strerror(errno));
134 goto failed;
135 }
136 }
137
138 g_strlcpy (saddr.sun_path, un_path, sizeof(saddr.sun_path));
139 if (bind(pipe_fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
140 g_warning ("failed to bind unix socket fd to %s : %s\n",
141 un_path, strerror(errno));
142 goto failed;
143 }
144
145 if (listen(pipe_fd, 10) < 0) {
146 g_warning ("failed to listen to unix socket: %s\n", strerror(errno));
147 goto failed;
148 }
149
150 if (chmod(un_path, 0700) < 0) {
151 g_warning ("failed to set permisson for unix socket %s: %s\n",
152 un_path, strerror(errno));
153 goto failed;
154 }
155
156 server->pipe_fd = pipe_fd;
157
158 #endif // !defined(WIN32)
159
160 /* TODO: use glib thread pool */
161 pthread_create(&server->listener_thread, NULL, named_pipe_listen, server);
162 return 0;
163
164 #if !defined(WIN32)
165 failed:
166 close(pipe_fd);
167 return -1;
168 #endif
169 }
170
171 typedef struct {
172 SearpcNamedPipe connfd;
173 } ServerHandlerData;
174
named_pipe_listen(void * arg)175 static void* named_pipe_listen(void *arg)
176 {
177 SearpcNamedPipeServer *server = arg;
178 #if !defined(WIN32)
179 while (1) {
180 int connfd = accept (server->pipe_fd, NULL, 0);
181 ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
182 data->connfd = connfd;
183 if (server->named_pipe_server_thread_pool)
184 g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
185 else {
186 pthread_t handler;
187 pthread_attr_t attr;
188 pthread_attr_init(&attr);
189 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
190 pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
191 }
192 }
193
194 #else // !defined(WIN32)
195 while (1) {
196 HANDLE connfd = INVALID_HANDLE_VALUE;
197 BOOL connected = FALSE;
198
199 connfd = CreateNamedPipe(
200 server->path, // pipe name
201 PIPE_ACCESS_DUPLEX, // read/write access
202 PIPE_TYPE_MESSAGE | // message type pipe
203 PIPE_READMODE_MESSAGE | // message-read mode
204 PIPE_WAIT, // blocking mode
205 PIPE_UNLIMITED_INSTANCES, // max. instances
206 kPipeBufSize, // output buffer size
207 kPipeBufSize, // input buffer size
208 0, // client time-out
209 NULL); // default security attribute
210
211 if (connfd == INVALID_HANDLE_VALUE) {
212 G_WARNING_WITH_LAST_ERROR ("Failed to create named pipe");
213 break;
214 }
215
216 /* listening on this pipe */
217 connected = ConnectNamedPipe(connfd, NULL) ?
218 TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
219
220 if (!connected) {
221 G_WARNING_WITH_LAST_ERROR ("failed to ConnectNamedPipe()");
222 CloseHandle(connfd);
223 break;
224 }
225
226 /* g_debug ("Accepted a named pipe client\n"); */
227
228 ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
229 data->connfd = connfd;
230 if (server->named_pipe_server_thread_pool)
231 g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
232 else {
233 pthread_t handler;
234 pthread_attr_t attr;
235 pthread_attr_init(&attr);
236 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
237 pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
238 }
239 }
240 #endif // !defined(WIN32)
241 return NULL;
242 }
243
handle_named_pipe_client_with_thread(void * arg)244 static void* handle_named_pipe_client_with_thread(void *arg)
245 {
246 named_pipe_client_handler(arg);
247
248 return NULL;
249 }
250
handle_named_pipe_client_with_threadpool(void * data,void * user_data)251 static void handle_named_pipe_client_with_threadpool(void *data, void *user_data)
252 {
253 named_pipe_client_handler(data);
254 }
255
named_pipe_client_handler(void * data)256 static void named_pipe_client_handler(void *data)
257 {
258 ServerHandlerData *handler_data = data;
259 SearpcNamedPipe connfd = handler_data->connfd;
260
261 guint32 len;
262 guint32 bufsize = 4096;
263 char *buf = g_malloc(bufsize);
264
265 g_message ("start to serve on pipe client\n");
266
267 while (1) {
268 len = 0;
269 if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) {
270 g_warning("failed to read rpc request size: %s\n", strerror(errno));
271 break;
272 }
273
274 if (len == 0) {
275 /* g_debug("EOF reached, pipe connection lost"); */
276 break;
277 }
278
279 while (bufsize < len) {
280 bufsize *= 2;
281 buf = realloc(buf, bufsize);
282 }
283
284 if (pipe_read_n(connfd, buf, len) < 0 || len == 0) {
285 g_warning("failed to read rpc request: %s\n", strerror(errno));
286 break;
287 }
288
289 char *service, *body;
290 if (request_from_json (buf, len, &service, &body) < 0) {
291 break;
292 }
293
294 gsize ret_len;
295 char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
296 g_free (service);
297 g_free (body);
298
299 len = (guint32)ret_len;
300 if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) {
301 g_warning("failed to send rpc response(%s): %s\n", ret_str, strerror(errno));
302 g_free (ret_str);
303 break;
304 }
305
306 if (pipe_write_n(connfd, ret_str, ret_len) < 0) {
307 g_warning("failed to send rpc response: %s\n", strerror(errno));
308 g_free (ret_str);
309 break;
310 }
311
312 g_free (ret_str);
313 }
314
315 #if !defined(WIN32)
316 close(connfd);
317 #else // !defined(WIN32)
318 DisconnectNamedPipe(connfd);
319 CloseHandle(connfd);
320 #endif // !defined(WIN32)
321 g_free (data);
322 g_free (buf);
323 }
324
325
searpc_named_pipe_client_connect(SearpcNamedPipeClient * client)326 int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client)
327 {
328 #if !defined(WIN32)
329 client->pipe_fd = socket(AF_UNIX, SOCK_STREAM, 0);
330 struct sockaddr_un servaddr;
331 servaddr.sun_family = AF_UNIX;
332
333 g_strlcpy (servaddr.sun_path, client->path, sizeof(servaddr.sun_path));
334 if (connect(client->pipe_fd, (struct sockaddr *)&servaddr, (socklen_t)sizeof(servaddr)) < 0) {
335 g_warning ("pipe client failed to connect to server: %s\n", strerror(errno));
336 return -1;
337 }
338
339 #else // !defined(WIN32)
340 SearpcNamedPipe pipe_fd;
341 pipe_fd = CreateFile(
342 client->path, // pipe name
343 GENERIC_READ | // read and write access
344 GENERIC_WRITE,
345 0, // no sharing
346 NULL, // default security attributes
347 OPEN_EXISTING, // opens existing pipe
348 0, // default attributes
349 NULL); // no template file
350
351 if (pipe_fd == INVALID_HANDLE_VALUE) {
352 G_WARNING_WITH_LAST_ERROR("Failed to connect to named pipe");
353 return -1;
354 }
355
356 DWORD mode = PIPE_READMODE_MESSAGE;
357 if (!SetNamedPipeHandleState(pipe_fd, &mode, NULL, NULL)) {
358 G_WARNING_WITH_LAST_ERROR("Failed to set named pipe mode");
359 return -1;
360 }
361
362 client->pipe_fd = pipe_fd;
363
364 #endif // !defined(WIN32)
365
366 /* g_debug ("pipe client connected to server\n"); */
367 return 0;
368 }
369
searpc_free_client_with_pipe_transport(SearpcClient * client)370 void searpc_free_client_with_pipe_transport (SearpcClient *client)
371 {
372 ClientTransportData *data = (ClientTransportData *)(client->arg);
373 SearpcNamedPipeClient *pipe_client = data->client;
374 #if defined(WIN32)
375 CloseHandle(pipe_client->pipe_fd);
376 #else
377 close(pipe_client->pipe_fd);
378 #endif
379 g_free (pipe_client);
380 g_free (data->service);
381 g_free (data);
382 searpc_client_free (client);
383 }
384
searpc_named_pipe_send(void * arg,const gchar * fcall_str,size_t fcall_len,size_t * ret_len)385 char *searpc_named_pipe_send(void *arg, const gchar *fcall_str,
386 size_t fcall_len, size_t *ret_len)
387 {
388 /* g_debug ("searpc_named_pipe_send is called\n"); */
389 ClientTransportData *data = arg;
390 SearpcNamedPipeClient *client = data->client;
391
392 char *json_str = request_to_json(data->service, fcall_str, fcall_len);
393 guint32 len = (guint32)strlen(json_str);
394
395 if (pipe_write_n(client->pipe_fd, &len, sizeof(guint32)) < 0) {
396 g_warning("failed to send rpc call: %s\n", strerror(errno));
397 free (json_str);
398 return NULL;
399 }
400
401 if (pipe_write_n(client->pipe_fd, json_str, len) < 0) {
402 g_warning("failed to send rpc call: %s\n", strerror(errno));
403 free (json_str);
404 return NULL;
405 }
406
407 free (json_str);
408
409 if (pipe_read_n(client->pipe_fd, &len, sizeof(guint32)) < 0) {
410 g_warning("failed to read rpc response: %s\n", strerror(errno));
411 return NULL;
412 }
413
414 char *buf = g_malloc(len);
415
416 if (pipe_read_n(client->pipe_fd, buf, len) < 0) {
417 g_warning("failed to read rpc response: %s\n", strerror(errno));
418 g_free (buf);
419 return NULL;
420 }
421
422 *ret_len = len;
423 return buf;
424 }
425
426 static char *
request_to_json(const char * service,const char * fcall_str,size_t fcall_len)427 request_to_json (const char *service, const char *fcall_str, size_t fcall_len)
428 {
429 json_t *object = json_object ();
430
431 char *temp_request = g_malloc0(fcall_len + 1);
432 memcpy(temp_request, fcall_str, fcall_len);
433
434 json_object_set_string_member (object, "service", service);
435 json_object_set_string_member (object, "request", temp_request);
436
437 g_free (temp_request);
438
439 char *str = json_dumps (object, 0);
440 json_decref (object);
441 return str;
442 }
443
444 static int
request_from_json(const char * content,size_t len,char ** service,char ** fcall_str)445 request_from_json (const char *content, size_t len, char **service, char **fcall_str)
446 {
447 json_error_t jerror;
448 json_t *object = json_loadb(content, len, 0, &jerror);
449 if (!object) {
450 g_warning ("Failed to parse request body: %s.\n", strlen(jerror.text) > 0 ? jerror.text : "");
451 return -1;
452 }
453
454 *service = g_strdup(json_object_get_string_member (object, "service"));
455 *fcall_str = g_strdup(json_object_get_string_member(object, "request"));
456
457 json_decref (object);
458
459 if (!*service || !*fcall_str) {
460 g_free (*service);
461 g_free (*fcall_str);
462 return -1;
463 }
464
465 return 0;
466 }
467
json_object_set_string_member(json_t * object,const char * key,const char * value)468 static void json_object_set_string_member (json_t *object, const char *key, const char *value)
469 {
470 json_object_set_new (object, key, json_string (value));
471 }
472
473 static const char *
json_object_get_string_member(json_t * object,const char * key)474 json_object_get_string_member (json_t *object, const char *key)
475 {
476 json_t *string = json_object_get (object, key);
477 if (!string)
478 return NULL;
479 return json_string_value (string);
480 }
481
482 #if !defined(WIN32)
483
484 // Write "n" bytes to a descriptor.
485 gssize
pipe_write_n(int fd,const void * vptr,size_t n)486 pipe_write_n(int fd, const void *vptr, size_t n)
487 {
488 size_t nleft;
489 gssize nwritten;
490 const char *ptr;
491
492 ptr = vptr;
493 nleft = n;
494 while (nleft > 0) {
495 if ( (nwritten = write(fd, ptr, nleft)) <= 0)
496 {
497 if (nwritten < 0 && errno == EINTR)
498 nwritten = 0; /* and call write() again */
499 else
500 return(-1); /* error */
501 }
502
503 nleft -= nwritten;
504 ptr += nwritten;
505 }
506 return(n);
507 }
508
509 // Read "n" bytes from a descriptor.
510 gssize
pipe_read_n(int fd,void * vptr,size_t n)511 pipe_read_n(int fd, void *vptr, size_t n)
512 {
513 size_t nleft;
514 gssize nread;
515 char *ptr;
516
517 ptr = vptr;
518 nleft = n;
519 while (nleft > 0) {
520 if ( (nread = read(fd, ptr, nleft)) < 0) {
521 if (errno == EINTR)
522 nread = 0; /* and call read() again */
523 else
524 return(-1);
525 } else if (nread == 0)
526 break; /* EOF */
527
528 nleft -= nread;
529 ptr += nread;
530 }
531 return(n - nleft); /* return >= 0 */
532 }
533
534 #else // !defined(WIN32)
535
pipe_read_n(SearpcNamedPipe fd,void * vptr,size_t n)536 gssize pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n)
537 {
538 DWORD bytes_read;
539 BOOL success = ReadFile(
540 fd, // handle to pipe
541 vptr, // buffer to receive data
542 (DWORD)n, // size of buffer
543 &bytes_read, // number of bytes read
544 NULL); // not overlapped I/O
545
546 if (!success || bytes_read != (DWORD)n) {
547 if (GetLastError() == ERROR_BROKEN_PIPE) {
548 return 0;
549 }
550 G_WARNING_WITH_LAST_ERROR("failed to read from pipe");
551 return -1;
552 }
553
554 return n;
555 }
556
pipe_write_n(SearpcNamedPipe fd,const void * vptr,size_t n)557 gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n)
558 {
559 DWORD bytes_written;
560 BOOL success = WriteFile(
561 fd, // handle to pipe
562 vptr, // buffer to receive data
563 (DWORD)n, // size of buffer
564 &bytes_written, // number of bytes written
565 NULL); // not overlapped I/O
566
567 if (!success || bytes_written != (DWORD)n) {
568 G_WARNING_WITH_LAST_ERROR("failed to write to named pipe");
569 return -1;
570 }
571
572 FlushFileBuffers(fd);
573 return 0;
574 }
575
576 // http://stackoverflow.com/questions/3006229/get-a-text-from-the-error-code-returns-from-the-getlasterror-function
577 // The caller is responsible to free the returned message.
formatErrorMessage()578 char* formatErrorMessage()
579 {
580 DWORD error_code = GetLastError();
581 if (error_code == 0) {
582 return g_strdup("no error");
583 }
584 char buf[256] = {0};
585 FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM,
586 NULL,
587 error_code,
588 /* EN_US */
589 MAKELANGID(LANG_ENGLISH, 0x01),
590 buf,
591 sizeof(buf) - 1,
592 NULL);
593 return g_strdup(buf);
594 }
595
596 #endif // !defined(WIN32)
597