1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3  * What is a library without an example to show you how to use the library?
4  * This example use both interfaces to implement a small memcached server.
5  * Please note that this is an exemple on how to use the library, not
6  * an implementation of a scalable memcached server. If you look closely
7  * at the example it isn't even multithreaded ;-)
8  *
9  * With that in mind, let me give you some pointers into the source:
10  *   storage.c/h       - Implements the item store for this server and not really
11  *                       interesting for this example.
12  *   interface_v0.cc   - Shows an implementation of the memcached server by using
13  *                       the "raw" access to the packets as they arrive
14  *   interface_v1.cc   - Shows an implementation of the memcached server by using
15  *                       the more "logical" interface.
16  *   memcached_light.cc- This file sets up all of the sockets and run the main
17  *                       message loop.
18  *
19  *
20  * config.h is included so that I can use the ntohll/htonll on platforms that
21  * doesn't have that (this is a private function inside libmemcached, so you
22  * cannot use it directly from libmemcached without special modifications to
23  * the library)
24  */
25 
26 #include <mem_config.h>
27 
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
33 
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
37 
38 using namespace datadifferential;
39 
40 #include <event.h>
41 
42 #include <cassert>
43 #include <cerrno>
44 #include <cstdio>
45 #include <cstdlib>
46 #include <cstring>
47 #include <fcntl.h>
48 #include <getopt.h>
49 #include <iostream>
50 #include <sys/types.h>
51 #include <unistd.h>
52 
53 extern memcached_binary_protocol_callback_st interface_v0_impl;
54 extern memcached_binary_protocol_callback_st interface_v1_impl;
55 
56 static memcached_socket_t server_sockets[1024];
57 static int num_server_sockets= 0;
58 
59 struct connection
60 {
61   void *userdata;
62   struct event event;
63 };
64 
65 /* The default maximum number of connections... (change with -c) */
66 static int maxconns= 1024;
67 
68 static struct connection *socket_userdata_map;
69 static struct event_base *event_base= NULL;
70 
71 struct options_st {
72   std::string pid_file;
73   std::string service;
74   std::string log_file;
75   bool is_verbose;
76   bool opt_daemon;
77 
options_stoptions_st78   options_st() :
79     service("9999"),
80     is_verbose(false),
81     opt_daemon(false)
82   {
83   }
84 };
85 
86 static options_st global_options;
87 
88 /**
89  * Callback for driving a client connection
90  * @param fd the socket for the client socket
91  * @param which identifying the event that occurred (not used)
92  * @param arg the connection structure for the client
93  */
drive_client(memcached_socket_t fd,short,void * arg)94 static void drive_client(memcached_socket_t fd, short, void *arg)
95 {
96   struct connection *client= (struct connection*)arg;
97   struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
98   assert(c != NULL);
99 
100   memcached_protocol_event_t events= memcached_protocol_client_work(c);
101   if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
102   {
103     if (global_options.is_verbose)
104     {
105       struct sockaddr_in sin;
106       socklen_t addrlen= sizeof(sin);
107 
108       if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
109       {
110         std::cout << __FILE__ << ":" << __LINE__
111           << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
112           << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
113           << " fd:" << fd
114           << std::endl;
115       }
116       else
117       {
118         std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
119       }
120     }
121 
122     memcached_protocol_client_destroy(c);
123     closesocket(fd);
124   }
125   else
126   {
127     short flags = 0;
128     if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
129     {
130       flags= EV_WRITE;
131     }
132 
133     if (events & MEMCACHED_PROTOCOL_READ_EVENT)
134     {
135       flags|= EV_READ;
136     }
137 
138     event_set(&client->event, int(fd), flags, drive_client, client);
139     event_base_set(event_base, &client->event);
140 
141     if (event_add(&client->event, 0) == -1)
142     {
143       memcached_protocol_client_destroy(c);
144       closesocket(fd);
145     }
146   }
147 }
148 
149 /**
150  * Callback for accepting new connections
151  * @param fd the socket for the server socket
152  * @param which identifying the event that occurred (not used)
153  * @param arg the connection structure for the server
154  */
accept_handler(memcached_socket_t fd,short,void * arg)155 static void accept_handler(memcached_socket_t fd, short, void *arg)
156 {
157   struct connection *server= (struct connection *)arg;
158   /* accept new client */
159   struct sockaddr_storage addr;
160   socklen_t addrlen= sizeof(addr);
161   memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
162 
163   if (sock == INVALID_SOCKET)
164   {
165     perror("Failed to accept client");
166   }
167 
168 #ifndef WIN32
169   if (sock >= maxconns)
170   {
171     closesocket(sock);
172     return ;
173   }
174 #endif
175 
176   struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
177   if (c == NULL)
178   {
179     closesocket(sock);
180   }
181   else
182   {
183     memcached_protocol_client_set_verbose(c, global_options.is_verbose);
184     struct connection *client = &socket_userdata_map[sock];
185     client->userdata= c;
186 
187     event_set(&client->event, int(sock), EV_READ, drive_client, client);
188     event_base_set(event_base, &client->event);
189     if (event_add(&client->event, 0) == -1)
190     {
191       std::cerr << "Failed to add event for " << sock << std::endl;
192       memcached_protocol_client_destroy(c);
193       closesocket(sock);
194     }
195   }
196 }
197 
server_socket(util::log_info_st & log_file,const std::string & service)198 static bool server_socket(util::log_info_st& log_file, const std::string& service)
199 {
200   struct addrinfo *ai;
201   struct addrinfo hints;
202   memset(&hints, 0, sizeof(struct addrinfo));
203 
204   hints.ai_flags= AI_PASSIVE;
205   hints.ai_family= AF_UNSPEC;
206   hints.ai_socktype= SOCK_STREAM;
207 
208   int error= getaddrinfo("127.0.0.1", service.c_str(), &hints, &ai);
209   if (error != 0)
210   {
211     if (error != EAI_SYSTEM)
212     {
213       std::string buffer("getaddrinfo: ");
214       buffer+= gai_strerror(error);
215       log_file.write(util::VERBOSE_ERROR, buffer.c_str());
216     }
217     else
218     {
219       std::string buffer("getaddrinfo: ");
220       buffer+= strerror(errno);
221       log_file.write(util::VERBOSE_ERROR, buffer.c_str());
222     }
223 
224     return false;
225   }
226 
227   struct linger ling= {0, 0};
228 
229   for (struct addrinfo *next= ai; next; next= next->ai_next)
230   {
231     memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
232     if (sock == INVALID_SOCKET)
233     {
234       std::string buffer("Failed to create socket: ");
235       buffer+= strerror(errno);
236       log_file.write(util::VERBOSE_ERROR, buffer.c_str());
237       continue;
238     }
239 
240     int flags;
241 #ifdef WIN32
242     u_long arg = 1;
243     if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
244     {
245       std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
246       closesocket(sock);
247       continue;
248     }
249 #else
250     flags= fcntl(sock, F_GETFL, 0);
251     if (flags == -1)
252     {
253       std::string buffer("Failed to get socket flags: ");
254       buffer+= strerror(errno);
255       log_file.write(util::VERBOSE_ERROR, buffer.c_str());
256       closesocket(sock);
257       continue;
258     }
259 
260     if ((flags & O_NONBLOCK) != O_NONBLOCK)
261     {
262       if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
263       {
264         std::string buffer("Failed to set socket to nonblocking mode: ");
265         buffer+= strerror(errno);
266         log_file.write(util::VERBOSE_ERROR, buffer.c_str());
267         closesocket(sock);
268         continue;
269       }
270     }
271 #endif
272 
273     flags= 1;
274     if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
275     {
276       std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
277     }
278 
279     if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
280     {
281       std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
282     }
283 
284     if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
285     {
286       std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
287     }
288 
289     if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
290     {
291       std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
292     }
293 
294     if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
295     {
296       if (get_socket_errno() != EADDRINUSE)
297       {
298         std::cerr << "bind(): " << strerror(errno) << std::endl;
299         freeaddrinfo(ai);
300       }
301       closesocket(sock);
302       continue;
303     }
304 
305     if (listen(sock, 1024) == SOCKET_ERROR)
306     {
307       std::string buffer("listen(): ");
308       buffer+= strerror(errno);
309       log_file.write(util::VERBOSE_ERROR, buffer.c_str());
310       closesocket(sock);
311       continue;
312     }
313 
314     if (global_options.is_verbose)
315     {
316       std::string buffer("Listening to: ");
317       buffer+= global_options.service;
318       log_file.write(util::VERBOSE_NOTICE, buffer.c_str());
319     }
320 
321     server_sockets[num_server_sockets++]= sock;
322   }
323 
324   freeaddrinfo(ai);
325 
326   return (num_server_sockets > 0) ? true : false;
327 }
328 
329 /**
330  * Convert a command code to a textual string
331  * @param cmd the comcode to convert
332  * @return a textual string with the command or NULL for unknown commands
333  */
comcode2str(uint8_t cmd)334 static const char* comcode2str(uint8_t cmd)
335 {
336   static const char * const text[] = {
337     "GET", "SET", "ADD", "REPLACE", "DELETE",
338     "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
339     "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
340     "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
341     "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
342     "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
343   };
344 
345   if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
346   {
347     return text[cmd];
348   }
349 
350   return NULL;
351 }
352 
353 /**
354  * Print out the command we are about to execute
355  */
pre_execute(const void * cookie,protocol_binary_request_header * header)356 static void pre_execute(const void *cookie,
357                         protocol_binary_request_header *header)
358 {
359   if (global_options.is_verbose)
360   {
361     if (header)
362     {
363       const char *cmd= comcode2str(header->request.opcode);
364       if (cmd != NULL)
365       {
366         std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
367       }
368       else
369       {
370         std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
371       }
372     }
373     else
374     {
375       std::cout << "pre_execute from " << cookie << std::endl;
376     }
377   }
378 }
379 
380 /**
381  * Print out the command we just executed
382  */
post_execute(const void * cookie,protocol_binary_request_header * header)383 static void post_execute(const void *cookie,
384                          protocol_binary_request_header *header)
385 {
386   if (global_options.is_verbose)
387   {
388     if (header)
389     {
390       const char *cmd= comcode2str(header->request.opcode);
391       if (cmd != NULL)
392       {
393         std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
394       }
395       else
396       {
397         std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
398       }
399     }
400     else
401     {
402       std::cout << "post_execute from " << cookie << std::endl;
403     }
404   }
405 }
406 
407 /**
408  * Callback handler for all unknown commands.
409  * Send an unknown command back to the client
410  */
unknown(const void * cookie,protocol_binary_request_header * header,memcached_binary_protocol_raw_response_handler response_handler)411 static protocol_binary_response_status unknown(const void *cookie,
412                                                protocol_binary_request_header *header,
413                                                memcached_binary_protocol_raw_response_handler response_handler)
414 {
415   protocol_binary_response_no_extras response;
416   memset(&response, 0, sizeof(protocol_binary_response_no_extras));
417 
418   response.message.header.response.magic= PROTOCOL_BINARY_RES;
419   response.message.header.response.opcode= header->request.opcode;
420   response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
421   response.message.header.response.opaque= header->request.opaque;
422 
423   return response_handler(cookie, header, (protocol_binary_response_header*)&response);
424 }
425 
426 /**
427  * Program entry point. Bind to the specified port(s) and serve clients
428  *
429  * @param argc number of items in the argument vector
430  * @param argv argument vector
431  * @return EXIT_SUCCESS on success, 1 otherwise
432  */
main(int argc,char ** argv)433 int main(int argc, char **argv)
434 {
435   memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
436 
437   {
438     enum long_option_t {
439       OPT_HELP,
440       OPT_VERBOSE,
441       OPT_DAEMON,
442       OPT_PROTOCOL_VERSION,
443       OPT_VERSION,
444       OPT_PORT,
445       OPT_MAX_CONNECTIONS,
446       OPT_LOGFILE,
447       OPT_PIDFILE
448     };
449 
450     static struct option long_options[]=
451     {
452       { "help", no_argument, NULL, OPT_HELP },
453       { "port", required_argument, NULL, OPT_PORT },
454       { "verbose", no_argument, NULL, OPT_VERBOSE },
455       { "daemon", no_argument, NULL, OPT_DAEMON },
456       { "protocol", no_argument, NULL, OPT_PROTOCOL_VERSION },
457       { "version", no_argument, NULL, OPT_VERSION },
458       { "max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS },
459       { "pid-file", required_argument, NULL, OPT_PIDFILE },
460       { "log-file", required_argument, NULL, OPT_LOGFILE },
461       {0, 0, 0, 0}
462     };
463 
464     bool opt_help= false;
465     int option_index;
466     bool done= false;
467     while (done == false)
468     {
469       switch (getopt_long(argc, argv, "", long_options, &option_index))
470       {
471       case -1:
472         done= true;
473         break;
474 
475       case OPT_PROTOCOL_VERSION:
476         interface= &interface_v1_impl;
477         break;
478 
479       case OPT_PIDFILE:
480         global_options.pid_file= optarg;
481         break;
482 
483       case OPT_LOGFILE:
484         global_options.log_file= optarg;
485         break;
486 
487       case OPT_VERBOSE:
488         global_options.is_verbose= true;
489         break;
490 
491       case OPT_VERSION:
492         break;
493 
494       case OPT_DAEMON:
495         global_options.opt_daemon= true;
496         break;
497 
498       case OPT_PORT:
499         global_options.service= optarg;
500         break;
501 
502       case OPT_MAX_CONNECTIONS:
503         maxconns= atoi(optarg);
504         break;
505 
506       case OPT_HELP:  /* FALLTHROUGH */
507         opt_help= true;
508         break;
509 
510       default:
511         {
512           std::cerr << "Unknown option: " << optarg << std::endl;
513           return EXIT_FAILURE;
514         }
515       }
516     }
517 
518     if (opt_help)
519     {
520       std::cout << "Usage: " << argv[0] << std::endl;
521       for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
522       {
523         std::cout << "\t" << ptr_option->name << std::endl;
524       }
525       return EXIT_SUCCESS;
526     }
527   }
528 
529   if (global_options.opt_daemon)
530   {
531     util::daemonize(false, true);
532   }
533 
534   if (initialize_storage() == false)
535   {
536     /* Error message already printed */
537     return EXIT_FAILURE;
538   }
539 
540   util::Pidfile _pid_file(global_options.pid_file);
541 
542   if (_pid_file.create() == false)
543   {
544     std::cerr << "Failed to create pid-file" <<  _pid_file.error_message() << std::endl;
545     return EXIT_FAILURE;
546   }
547 
548   util::log_info_st log_file(argv[0], global_options.log_file, false);
549   log_file.write(util::VERBOSE_NOTICE, "starting log");
550 
551   /*
552    * We need to initialize the handlers manually due to a bug in the
553    * warnings generated by struct initialization in gcc (all the way up to 4.4)
554    */
555   initialize_interface_v0_handler(log_file);
556   initialize_interface_v1_handler(log_file);
557 
558 
559   if (server_socket(log_file, global_options.service) == false)
560   {
561     return EXIT_FAILURE;
562   }
563 
564   if (num_server_sockets == 0)
565   {
566     log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
567     return EXIT_FAILURE;
568   }
569 
570   /*
571    * Create and initialize the handles to the protocol handlers. I want
572    * to be able to trace the traffic throught the pre/post handlers, and
573    * set up a common handler for unknown messages
574    */
575   interface->pre_execute= pre_execute;
576   interface->post_execute= post_execute;
577   interface->unknown= unknown;
578 
579   struct memcached_protocol_st *protocol_handle;
580   if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
581   {
582     log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
583     return EXIT_FAILURE;
584   }
585 
586   socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
587   if (socket_userdata_map == NULL)
588   {
589     log_file.write(util::VERBOSE_ERROR, "Failed to allocate room for connections");
590     return EXIT_FAILURE;
591   }
592 
593   memcached_binary_protocol_set_callbacks(protocol_handle, interface);
594   memcached_binary_protocol_set_pedantic(protocol_handle, true);
595 
596   event_base= event_init();
597   if (event_base == NULL)
598   {
599     std::cerr << "Failed to create an instance of libevent" << std::endl;
600     return EXIT_FAILURE;
601   }
602 
603   for (int xx= 0; xx < num_server_sockets; ++xx)
604   {
605     struct connection *conn= &socket_userdata_map[server_sockets[xx]];
606     conn->userdata= protocol_handle;
607 
608     event_set(&conn->event, int(server_sockets[xx]), EV_READ | EV_PERSIST, accept_handler, conn);
609 
610     event_base_set(event_base, &conn->event);
611     if (event_add(&conn->event, 0) == -1)
612     {
613       log_file.write(util::VERBOSE_ERROR, "Failed to add event");
614       closesocket(server_sockets[xx]);
615     }
616   }
617 
618   if (global_options.opt_daemon)
619   {
620     if (util::daemon_is_ready(true) == false)
621     {
622       log_file.write(util::VERBOSE_ERROR, "Failed for util::daemon_is_ready()");
623       return EXIT_FAILURE;
624     }
625   }
626 
627 
628   /* Serve all of the clients */
629   switch (event_base_loop(event_base, 0))
630   {
631   case -1:
632     log_file.write(util::VERBOSE_ERROR, "event_base_loop() failed");
633     break;
634 
635   case 1:
636     log_file.write(util::VERBOSE_ERROR, "event_base_loop(), no events were registered");
637     break;
638 
639   default:
640     break;
641   }
642   log_file.write(util::VERBOSE_NOTICE, "exiting");
643 
644   /* NOTREACHED */
645   return EXIT_SUCCESS;
646 }
647