1 #include <errno.h>
2 #include <stdint.h>
3 #include <string.h>
4 #include <pthread.h>
5 
6 #if !defined(WIN32)
7   #include <sys/types.h>
8   #include <sys/socket.h>
9   #include <sys/un.h>
10   #include <unistd.h>
11 #endif // !defined(WIN32)
12 
13 #include <glib.h>
14 #include <glib/gstdio.h>
15 #include <jansson.h>
16 
17 #include "searpc-utils.h"
18 #include "searpc-client.h"
19 #include "searpc-server.h"
20 #include "searpc-named-pipe-transport.h"
21 
22 #if defined(WIN32)
23 static const int kPipeBufSize = 1024;
24 static char* formatErrorMessage();
25 
26 #define G_WARNING_WITH_LAST_ERROR(fmt)                        \
27     do {                                                      \
28         char *error_msg__ = formatErrorMessage();             \
29         g_warning(fmt ": %s\n", error_msg__);                 \
30         g_free (error_msg__);                                 \
31     } while(0);
32 
33 #endif // defined(WIN32)
34 
35 static void* named_pipe_listen(void *arg);
36 static void* handle_named_pipe_client_with_thread (void *arg);
37 static void handle_named_pipe_client_with_threadpool(void *data, void *user_data);
38 static void named_pipe_client_handler (void *data);
39 static char* searpc_named_pipe_send(void *arg, const gchar *fcall_str, size_t fcall_len, size_t *ret_len);
40 
41 static char * request_to_json(const char *service, const char *fcall_str, size_t fcall_len);
42 static int request_from_json (const char *content, size_t len, char **service, char **fcall_str);
43 static void json_object_set_string_member (json_t *object, const char *key, const char *value);
44 static const char * json_object_get_string_member (json_t *object, const char *key);
45 
46 static gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n);
47 static gssize pipe_read_n(SearpcNamedPipe fd, void *vptr, size_t n);
48 
49 typedef struct {
50     SearpcNamedPipeClient* client;
51     char *service;
52 } ClientTransportData;
53 
54 SearpcClient*
searpc_client_with_named_pipe_transport(SearpcNamedPipeClient * pipe_client,const char * service)55 searpc_client_with_named_pipe_transport(SearpcNamedPipeClient *pipe_client,
56                                         const char *service)
57 {
58     SearpcClient *client= searpc_client_new();
59     client->send = searpc_named_pipe_send;
60 
61     ClientTransportData *data = g_malloc(sizeof(ClientTransportData));
62     data->client = pipe_client;
63     data->service = g_strdup(service);
64 
65     client->arg = data;
66     return client;
67 }
68 
searpc_create_named_pipe_client(const char * path)69 SearpcNamedPipeClient* searpc_create_named_pipe_client(const char *path)
70 {
71     SearpcNamedPipeClient *client = g_malloc0(sizeof(SearpcNamedPipeClient));
72     memcpy(client->path, path, strlen(path) + 1);
73     return client;
74 }
75 
searpc_create_named_pipe_server(const char * path)76 SearpcNamedPipeServer* searpc_create_named_pipe_server(const char *path)
77 {
78     SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
79     memcpy(server->path, path, strlen(path) + 1);
80 
81     return server;
82 }
83 
searpc_create_named_pipe_server_with_threadpool(const char * path,int named_pipe_server_thread_pool_size)84 SearpcNamedPipeServer* searpc_create_named_pipe_server_with_threadpool (const char *path, int named_pipe_server_thread_pool_size)
85 {
86     GError *error = NULL;
87 
88     SearpcNamedPipeServer *server = g_malloc0(sizeof(SearpcNamedPipeServer));
89     memcpy(server->path, path, strlen(path) + 1);
90     server->named_pipe_server_thread_pool = g_thread_pool_new (handle_named_pipe_client_with_threadpool,
91                                                                NULL,
92                                                                named_pipe_server_thread_pool_size,
93                                                                FALSE,
94                                                                &error);
95     if (!server->named_pipe_server_thread_pool) {
96         if (error) {
97             g_warning ("Falied to create named pipe server thread pool : %s\n", error->message);
98             g_clear_error (&error);
99         } else {
100             g_warning ("Falied to create named pipe server thread pool.\n");
101         }
102         g_free (server);
103         return NULL;
104     }
105 
106     return server;
107 }
108 
searpc_named_pipe_server_start(SearpcNamedPipeServer * server)109 int searpc_named_pipe_server_start(SearpcNamedPipeServer *server)
110 {
111 #if !defined(WIN32)
112     int pipe_fd = socket (AF_UNIX, SOCK_STREAM, 0);
113     const char *un_path = server->path;
114     if (pipe_fd < 0) {
115         g_warning ("Failed to create unix socket fd : %s\n",
116                    strerror(errno));
117         return -1;
118     }
119 
120     struct sockaddr_un saddr;
121     saddr.sun_family = AF_UNIX;
122 
123     if (strlen(server->path) > sizeof(saddr.sun_path)-1) {
124         g_warning ("Unix socket path %s is too long."
125                        "Please set or modify UNIX_SOCKET option in ccnet.conf.\n",
126                        un_path);
127         goto failed;
128     }
129 
130     if (g_file_test (un_path, G_FILE_TEST_EXISTS)) {
131         g_message ("socket file exists, delete it anyway\n");
132         if (g_unlink (un_path) < 0) {
133             g_warning ("delete socket file failed : %s\n", strerror(errno));
134             goto failed;
135         }
136     }
137 
138     g_strlcpy (saddr.sun_path, un_path, sizeof(saddr.sun_path));
139     if (bind(pipe_fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
140         g_warning ("failed to bind unix socket fd to %s : %s\n",
141                       un_path, strerror(errno));
142         goto failed;
143     }
144 
145     if (listen(pipe_fd, 10) < 0) {
146         g_warning ("failed to listen to unix socket: %s\n", strerror(errno));
147         goto failed;
148     }
149 
150     if (chmod(un_path, 0700) < 0) {
151         g_warning ("failed to set permisson for unix socket %s: %s\n",
152                       un_path, strerror(errno));
153         goto failed;
154     }
155 
156     server->pipe_fd = pipe_fd;
157 
158 #endif // !defined(WIN32)
159 
160     /* TODO: use glib thread pool */
161     pthread_create(&server->listener_thread, NULL, named_pipe_listen, server);
162     return 0;
163 
164 #if !defined(WIN32)
165 failed:
166     close(pipe_fd);
167     return -1;
168 #endif
169 }
170 
171 typedef struct {
172     SearpcNamedPipe connfd;
173 } ServerHandlerData;
174 
named_pipe_listen(void * arg)175 static void* named_pipe_listen(void *arg)
176 {
177     SearpcNamedPipeServer *server = arg;
178 #if !defined(WIN32)
179     while (1) {
180         int connfd = accept (server->pipe_fd, NULL, 0);
181         ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
182         data->connfd = connfd;
183         if (server->named_pipe_server_thread_pool)
184             g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
185         else {
186             pthread_t handler;
187             pthread_attr_t attr;
188             pthread_attr_init(&attr);
189             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
190             pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
191         }
192     }
193 
194 #else // !defined(WIN32)
195     while (1) {
196         HANDLE connfd = INVALID_HANDLE_VALUE;
197         BOOL connected = FALSE;
198 
199         connfd = CreateNamedPipe(
200             server->path,             // pipe name
201             PIPE_ACCESS_DUPLEX,       // read/write access
202             PIPE_TYPE_MESSAGE |       // message type pipe
203             PIPE_READMODE_MESSAGE |   // message-read mode
204             PIPE_WAIT,                // blocking mode
205             PIPE_UNLIMITED_INSTANCES, // max. instances
206             kPipeBufSize,             // output buffer size
207             kPipeBufSize,             // input buffer size
208             0,                        // client time-out
209             NULL);                    // default security attribute
210 
211         if (connfd == INVALID_HANDLE_VALUE) {
212             G_WARNING_WITH_LAST_ERROR ("Failed to create named pipe");
213             break;
214         }
215 
216         /* listening on this pipe */
217         connected = ConnectNamedPipe(connfd, NULL) ?
218             TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
219 
220         if (!connected) {
221             G_WARNING_WITH_LAST_ERROR ("failed to ConnectNamedPipe()");
222             CloseHandle(connfd);
223             break;
224         }
225 
226         /* g_debug ("Accepted a named pipe client\n"); */
227 
228         ServerHandlerData *data = g_malloc(sizeof(ServerHandlerData));
229         data->connfd = connfd;
230         if (server->named_pipe_server_thread_pool)
231             g_thread_pool_push (server->named_pipe_server_thread_pool, data, NULL);
232         else {
233             pthread_t handler;
234             pthread_attr_t attr;
235             pthread_attr_init(&attr);
236             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
237             pthread_create(&handler, &attr, handle_named_pipe_client_with_thread, data);
238         }
239     }
240 #endif // !defined(WIN32)
241     return NULL;
242 }
243 
handle_named_pipe_client_with_thread(void * arg)244 static void* handle_named_pipe_client_with_thread(void *arg)
245 {
246     named_pipe_client_handler(arg);
247 
248     return NULL;
249 }
250 
handle_named_pipe_client_with_threadpool(void * data,void * user_data)251 static void handle_named_pipe_client_with_threadpool(void *data, void *user_data)
252 {
253     named_pipe_client_handler(data);
254 }
255 
named_pipe_client_handler(void * data)256 static void named_pipe_client_handler(void *data)
257 {
258     ServerHandlerData *handler_data = data;
259     SearpcNamedPipe connfd = handler_data->connfd;
260 
261     guint32 len;
262     guint32 bufsize = 4096;
263     char *buf = g_malloc(bufsize);
264 
265     g_message ("start to serve on pipe client\n");
266 
267     while (1) {
268         len = 0;
269         if (pipe_read_n(connfd, &len, sizeof(guint32)) < 0) {
270             g_warning("failed to read rpc request size: %s\n", strerror(errno));
271             break;
272         }
273 
274         if (len == 0) {
275             /* g_debug("EOF reached, pipe connection lost"); */
276             break;
277         }
278 
279         while (bufsize < len) {
280             bufsize *= 2;
281             buf = realloc(buf, bufsize);
282         }
283 
284         if (pipe_read_n(connfd, buf, len) < 0 || len == 0) {
285             g_warning("failed to read rpc request: %s\n", strerror(errno));
286             break;
287         }
288 
289         char *service, *body;
290         if (request_from_json (buf, len, &service, &body) < 0) {
291             break;
292         }
293 
294         gsize ret_len;
295         char *ret_str = searpc_server_call_function (service, body, strlen(body), &ret_len);
296         g_free (service);
297         g_free (body);
298 
299         len = (guint32)ret_len;
300         if (pipe_write_n(connfd, &len, sizeof(guint32)) < 0) {
301             g_warning("failed to send rpc response(%s): %s\n", ret_str, strerror(errno));
302             g_free (ret_str);
303             break;
304         }
305 
306         if (pipe_write_n(connfd, ret_str, ret_len) < 0) {
307             g_warning("failed to send rpc response: %s\n", strerror(errno));
308             g_free (ret_str);
309             break;
310         }
311 
312         g_free (ret_str);
313     }
314 
315 #if !defined(WIN32)
316     close(connfd);
317 #else // !defined(WIN32)
318     DisconnectNamedPipe(connfd);
319     CloseHandle(connfd);
320 #endif // !defined(WIN32)
321     g_free (data);
322     g_free (buf);
323 }
324 
325 
searpc_named_pipe_client_connect(SearpcNamedPipeClient * client)326 int searpc_named_pipe_client_connect(SearpcNamedPipeClient *client)
327 {
328 #if !defined(WIN32)
329     client->pipe_fd = socket(AF_UNIX, SOCK_STREAM, 0);
330     struct sockaddr_un servaddr;
331     servaddr.sun_family = AF_UNIX;
332 
333     g_strlcpy (servaddr.sun_path, client->path, sizeof(servaddr.sun_path));
334     if (connect(client->pipe_fd, (struct sockaddr *)&servaddr, (socklen_t)sizeof(servaddr)) < 0) {
335         g_warning ("pipe client failed to connect to server: %s\n", strerror(errno));
336         return -1;
337     }
338 
339 #else // !defined(WIN32)
340     SearpcNamedPipe pipe_fd;
341     pipe_fd = CreateFile(
342         client->path,           // pipe name
343         GENERIC_READ |          // read and write access
344         GENERIC_WRITE,
345         0,                      // no sharing
346         NULL,                   // default security attributes
347         OPEN_EXISTING,          // opens existing pipe
348         0,                      // default attributes
349         NULL);                  // no template file
350 
351     if (pipe_fd == INVALID_HANDLE_VALUE) {
352         G_WARNING_WITH_LAST_ERROR("Failed to connect to named pipe");
353         return -1;
354     }
355 
356     DWORD mode = PIPE_READMODE_MESSAGE;
357     if (!SetNamedPipeHandleState(pipe_fd, &mode, NULL, NULL)) {
358         G_WARNING_WITH_LAST_ERROR("Failed to set named pipe mode");
359         return -1;
360     }
361 
362     client->pipe_fd = pipe_fd;
363 
364 #endif // !defined(WIN32)
365 
366     /* g_debug ("pipe client connected to server\n"); */
367     return 0;
368 }
369 
searpc_free_client_with_pipe_transport(SearpcClient * client)370 void searpc_free_client_with_pipe_transport (SearpcClient *client)
371 {
372     ClientTransportData *data = (ClientTransportData *)(client->arg);
373     SearpcNamedPipeClient *pipe_client = data->client;
374 #if defined(WIN32)
375     CloseHandle(pipe_client->pipe_fd);
376 #else
377     close(pipe_client->pipe_fd);
378 #endif
379     g_free (pipe_client);
380     g_free (data->service);
381     g_free (data);
382     searpc_client_free (client);
383 }
384 
searpc_named_pipe_send(void * arg,const gchar * fcall_str,size_t fcall_len,size_t * ret_len)385 char *searpc_named_pipe_send(void *arg, const gchar *fcall_str,
386                              size_t fcall_len, size_t *ret_len)
387 {
388     /* g_debug ("searpc_named_pipe_send is called\n"); */
389     ClientTransportData *data = arg;
390     SearpcNamedPipeClient *client = data->client;
391 
392     char *json_str = request_to_json(data->service, fcall_str, fcall_len);
393     guint32 len = (guint32)strlen(json_str);
394 
395     if (pipe_write_n(client->pipe_fd, &len, sizeof(guint32)) < 0) {
396         g_warning("failed to send rpc call: %s\n", strerror(errno));
397         free (json_str);
398         return NULL;
399     }
400 
401     if (pipe_write_n(client->pipe_fd, json_str, len) < 0) {
402         g_warning("failed to send rpc call: %s\n", strerror(errno));
403         free (json_str);
404         return NULL;
405     }
406 
407     free (json_str);
408 
409     if (pipe_read_n(client->pipe_fd, &len, sizeof(guint32)) < 0) {
410         g_warning("failed to read rpc response: %s\n", strerror(errno));
411         return NULL;
412     }
413 
414     char *buf = g_malloc(len);
415 
416     if (pipe_read_n(client->pipe_fd, buf, len) < 0) {
417         g_warning("failed to read rpc response: %s\n", strerror(errno));
418         g_free (buf);
419         return NULL;
420     }
421 
422     *ret_len = len;
423     return buf;
424 }
425 
426 static char *
request_to_json(const char * service,const char * fcall_str,size_t fcall_len)427 request_to_json (const char *service, const char *fcall_str, size_t fcall_len)
428 {
429     json_t *object = json_object ();
430 
431     char *temp_request = g_malloc0(fcall_len + 1);
432     memcpy(temp_request, fcall_str, fcall_len);
433 
434     json_object_set_string_member (object, "service", service);
435     json_object_set_string_member (object, "request", temp_request);
436 
437     g_free (temp_request);
438 
439     char *str = json_dumps (object, 0);
440     json_decref (object);
441     return str;
442 }
443 
444 static int
request_from_json(const char * content,size_t len,char ** service,char ** fcall_str)445 request_from_json (const char *content, size_t len, char **service, char **fcall_str)
446 {
447     json_error_t jerror;
448     json_t *object = json_loadb(content, len, 0, &jerror);
449     if (!object) {
450         g_warning ("Failed to parse request body: %s.\n", strlen(jerror.text) > 0 ? jerror.text : "");
451         return -1;
452     }
453 
454     *service = g_strdup(json_object_get_string_member (object, "service"));
455     *fcall_str = g_strdup(json_object_get_string_member(object, "request"));
456 
457     json_decref (object);
458 
459     if (!*service || !*fcall_str) {
460         g_free (*service);
461         g_free (*fcall_str);
462         return -1;
463     }
464 
465     return 0;
466 }
467 
json_object_set_string_member(json_t * object,const char * key,const char * value)468 static void json_object_set_string_member (json_t *object, const char *key, const char *value)
469 {
470     json_object_set_new (object, key, json_string (value));
471 }
472 
473 static const char *
json_object_get_string_member(json_t * object,const char * key)474 json_object_get_string_member (json_t *object, const char *key)
475 {
476     json_t *string = json_object_get (object, key);
477     if (!string)
478         return NULL;
479     return json_string_value (string);
480 }
481 
482 #if !defined(WIN32)
483 
484 // Write "n" bytes to a descriptor.
485 gssize
pipe_write_n(int fd,const void * vptr,size_t n)486 pipe_write_n(int fd, const void *vptr, size_t n)
487 {
488     size_t      nleft;
489     gssize     nwritten;
490     const char  *ptr;
491 
492     ptr = vptr;
493     nleft = n;
494     while (nleft > 0) {
495         if ( (nwritten = write(fd, ptr, nleft)) <= 0)
496         {
497             if (nwritten < 0 && errno == EINTR)
498                 nwritten = 0;       /* and call write() again */
499             else
500                 return(-1);         /* error */
501         }
502 
503         nleft -= nwritten;
504         ptr   += nwritten;
505     }
506     return(n);
507 }
508 
509 // Read "n" bytes from a descriptor.
510 gssize
pipe_read_n(int fd,void * vptr,size_t n)511 pipe_read_n(int fd, void *vptr, size_t n)
512 {
513     size_t  nleft;
514     gssize nread;
515     char    *ptr;
516 
517     ptr = vptr;
518     nleft = n;
519     while (nleft > 0) {
520         if ( (nread = read(fd, ptr, nleft)) < 0) {
521             if (errno == EINTR)
522                 nread = 0;      /* and call read() again */
523             else
524                 return(-1);
525         } else if (nread == 0)
526             break;              /* EOF */
527 
528         nleft -= nread;
529         ptr   += nread;
530     }
531     return(n - nleft);      /* return >= 0 */
532 }
533 
534 #else // !defined(WIN32)
535 
pipe_read_n(SearpcNamedPipe fd,void * vptr,size_t n)536 gssize pipe_read_n (SearpcNamedPipe fd, void *vptr, size_t n)
537 {
538     DWORD bytes_read;
539     BOOL success = ReadFile(
540         fd,                     // handle to pipe
541         vptr,                   // buffer to receive data
542         (DWORD)n,               // size of buffer
543         &bytes_read,            // number of bytes read
544         NULL);                  // not overlapped I/O
545 
546     if (!success || bytes_read != (DWORD)n) {
547         if (GetLastError() == ERROR_BROKEN_PIPE) {
548             return 0;
549         }
550         G_WARNING_WITH_LAST_ERROR("failed to read from pipe");
551         return -1;
552     }
553 
554     return n;
555 }
556 
pipe_write_n(SearpcNamedPipe fd,const void * vptr,size_t n)557 gssize pipe_write_n(SearpcNamedPipe fd, const void *vptr, size_t n)
558 {
559     DWORD bytes_written;
560     BOOL success = WriteFile(
561         fd,                     // handle to pipe
562         vptr,                   // buffer to receive data
563         (DWORD)n,               // size of buffer
564         &bytes_written,         // number of bytes written
565         NULL);                  // not overlapped I/O
566 
567     if (!success || bytes_written != (DWORD)n) {
568         G_WARNING_WITH_LAST_ERROR("failed to write to named pipe");
569         return -1;
570     }
571 
572     FlushFileBuffers(fd);
573     return 0;
574 }
575 
576 // http://stackoverflow.com/questions/3006229/get-a-text-from-the-error-code-returns-from-the-getlasterror-function
577 // The caller is responsible to free the returned message.
formatErrorMessage()578 char* formatErrorMessage()
579 {
580     DWORD error_code = GetLastError();
581     if (error_code == 0) {
582         return g_strdup("no error");
583     }
584     char buf[256] = {0};
585     FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM,
586                    NULL,
587                    error_code,
588                    /* EN_US */
589                    MAKELANGID(LANG_ENGLISH, 0x01),
590                    buf,
591                    sizeof(buf) - 1,
592                    NULL);
593     return g_strdup(buf);
594 }
595 
596 #endif // !defined(WIN32)
597