1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2012 Data Differential, http://datadifferential.com/
6  *  All rights reserved.
7  *
8  *  Redistribution and use in source and binary forms, with or without
9  *  modification, are permitted provided that the following conditions are
10  *  met:
11  *
12  *      * Redistributions of source code must retain the above copyright
13  *  notice, this list of conditions and the following disclaimer.
14  *
15  *      * Redistributions in binary form must reproduce the above
16  *  copyright notice, this list of conditions and the following disclaimer
17  *  in the documentation and/or other materials provided with the
18  *  distribution.
19  *
20  *      * The names of its contributors may not be used to endorse or
21  *  promote products derived from this software without specific prior
22  *  written permission.
23  *
24  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35  *
36  */
37 
38 
39 /**
40  * @file
41  * @brief Gear Protocol Definitions
42  */
43 
44 #include "gear_config.h"
45 
46 #include <libgearman-server/common.h>
47 #include <libgearman/strcommand.h>
48 #include <libgearman-server/packet.h>
49 
50 #include <cstdio>
51 #include <cstdlib>
52 
53 #include <libgearman-server/plugins/protocol/gear/protocol.h>
54 #include "libgearman/command.h"
55 
gearmand_packet_unpack_header(gearmand_packet_st * packet)56 static gearmand_error_t gearmand_packet_unpack_header(gearmand_packet_st *packet)
57 {
58   uint32_t tmp;
59 
60   if (memcmp(packet->args, "\0REQ", 4) == 0)
61   {
62     packet->magic= GEARMAN_MAGIC_REQUEST;
63   }
64   else if (memcmp(packet->args, "\0RES", 4) == 0)
65   {
66     packet->magic= GEARMAN_MAGIC_RESPONSE;
67   }
68   else
69   {
70     gearmand_warning("invalid magic value");
71     return GEARMAN_INVALID_MAGIC;
72   }
73 
74   memcpy(&tmp, packet->args + 4, 4);
75   packet->command= static_cast<gearman_command_t>(ntohl(tmp));
76 
77   if (packet->command == GEARMAN_COMMAND_TEXT ||
78       packet->command >= GEARMAN_COMMAND_MAX)
79   {
80     gearmand_error("invalid command value");
81     return GEARMAN_INVALID_COMMAND;
82   }
83 
84   memcpy(&tmp, packet->args + 8, 4);
85   packet->data_size= ntohl(tmp);
86 
87   return GEARMAN_SUCCESS;
88 }
89 
90 class Geartext : public gearmand::protocol::Context {
91 
92 public:
~Geartext()93   ~Geartext()
94   { }
95 
is_owner()96   bool is_owner()
97   {
98     return false;
99   }
100 
notify(gearman_server_con_st *)101   void notify(gearman_server_con_st *)
102   {
103     gearmand_info("Gear connection disconnected");
104   }
105 
unpack(gearmand_packet_st * packet,gearman_server_con_st *,const void * data,const size_t data_size,gearmand_error_t & ret_ptr)106   size_t unpack(gearmand_packet_st *packet,
107                 gearman_server_con_st *,
108                 const void *data, const size_t data_size,
109                 gearmand_error_t& ret_ptr)
110   {
111     size_t used_size;
112     gearmand_info("Gear unpack");
113 
114     if (packet->args_size == 0)
115     {
116       if (data_size > 0 && ((uint8_t *)data)[0] != 0)
117       {
118         /* Try to parse a text-based command. */
119         uint8_t* ptr= (uint8_t *)memchr(data, '\n', data_size);
120         if (ptr == NULL)
121         {
122           ret_ptr= GEARMAN_IO_WAIT;
123           return 0;
124         }
125 
126         packet->magic= GEARMAN_MAGIC_TEXT;
127         packet->command= GEARMAN_COMMAND_TEXT;
128 
129         used_size= size_t(ptr - ((uint8_t *)data)) +1;
130         *ptr= 0;
131         if (used_size > 1 && *(ptr - 1) == '\r')
132         {
133           *(ptr - 1)= 0;
134         }
135 
136         size_t arg_size;
137         for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
138         {
139           ptr= (uint8_t *)memchr(data, ' ', arg_size);
140           if (ptr != NULL)
141           {
142             *ptr= 0;
143             ptr++;
144             while (*ptr == ' ')
145             {
146               ptr++;
147             }
148 
149             arg_size-= size_t(ptr - ((uint8_t *)data));
150           }
151 
152           ret_ptr= gearmand_packet_create(packet, data, ptr == NULL ? arg_size :
153                                           size_t(ptr - ((uint8_t *)data)));
154           if (ret_ptr != GEARMAN_SUCCESS)
155           {
156             return used_size;
157           }
158         }
159 
160         return used_size;
161       }
162       else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
163       {
164         ret_ptr= GEARMAN_IO_WAIT;
165         return 0;
166       }
167 
168       packet->args= packet->args_buffer;
169       packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
170       memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
171 
172       if (gearmand_failed(ret_ptr= gearmand_packet_unpack_header(packet)))
173       {
174         return 0;
175       }
176 
177       used_size= GEARMAN_PACKET_HEADER_SIZE;
178     }
179     else
180     {
181       used_size= 0;
182     }
183 
184     while (packet->argc != gearman_command_info(packet->command)->argc)
185     {
186       if (packet->argc != (gearman_command_info(packet->command)->argc - 1) or
187           gearman_command_info(packet->command)->data)
188       {
189         uint8_t* ptr= (uint8_t *)memchr(((uint8_t *)data) +used_size, 0,
190                                data_size -used_size);
191         if (ptr == NULL)
192         {
193           gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
194                              "Possible protocol error for %s, received only %u args",
195                              gearman_command_info(packet->command)->name, packet->argc);
196           ret_ptr= GEARMAN_IO_WAIT;
197           return used_size;
198         }
199 
200         size_t arg_size= size_t(ptr - (((uint8_t *)data) + used_size)) +1;
201         if (gearmand_failed((ret_ptr= gearmand_packet_create(packet, ((uint8_t *)data) + used_size, arg_size))))
202         {
203           return used_size;
204         }
205 
206         packet->data_size-= arg_size;
207         used_size+= arg_size;
208       }
209       else
210       {
211         if ((data_size - used_size) < packet->data_size)
212         {
213           ret_ptr= GEARMAN_IO_WAIT;
214           return used_size;
215         }
216 
217         ret_ptr= gearmand_packet_create(packet, ((uint8_t *)data) + used_size, packet->data_size);
218         if (gearmand_failed(ret_ptr))
219         {
220           return used_size;
221         }
222 
223         used_size+= packet->data_size;
224         packet->data_size= 0;
225       }
226     }
227 
228     if (packet->command == GEARMAN_COMMAND_ECHO_RES)
229     {
230       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
231                          "GEAR length: %" PRIu64 " gearmand_command_t: %s echo: %.*s",
232                          uint64_t(packet->data_size),
233                          gearman_strcommand(packet->command),
234                          int(packet->data_size),
235                          packet->data);
236     }
237     else if (packet->command == GEARMAN_COMMAND_TEXT)
238     {
239       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
240                          "GEAR length: %" PRIu64 " gearmand_command_t: %s text: %.*s",
241                          uint64_t(packet->data_size),
242                          gearman_strcommand(packet->command),
243                          int(packet->data_size),
244                          packet->data);
245     }
246     else
247     {
248       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
249                          "GEAR length: %" PRIu64 " gearmand_command_t: %s",
250                          uint64_t(packet->data_size),
251                          gearman_strcommand(packet->command));
252     }
253 
254     ret_ptr= GEARMAN_SUCCESS;
255     return used_size;
256   }
257 
pack(const gearmand_packet_st * packet,gearman_server_con_st *,void * data,const size_t data_size,gearmand_error_t & ret_ptr)258   size_t pack(const gearmand_packet_st *packet,
259               gearman_server_con_st*,
260               void *data, const size_t data_size,
261               gearmand_error_t& ret_ptr)
262   {
263     if (packet->command == GEARMAN_COMMAND_ECHO_RES)
264     {
265       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
266                          "GEAR length: %" PRIu64 " gearmand_command_t: %s echo: %.*",
267                          uint64_t(packet->data_size),
268                          gearman_strcommand(packet->command),
269                          int(packet->data_size),
270                          packet->data);
271     }
272     else
273     {
274       gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
275                          "GEAR length: %" PRIu64 " gearmand_command_t: %s",
276                          uint64_t(packet->data_size),
277                          gearman_strcommand(packet->command));
278     }
279 
280     if (packet->args_size == 0)
281     {
282       ret_ptr= GEARMAN_SUCCESS;
283       return 0;
284     }
285 
286     if (packet->args_size > data_size)
287     {
288       ret_ptr= GEARMAN_FLUSH_DATA;
289       return 0;
290     }
291 
292     memcpy(data, packet->args, packet->args_size);
293     ret_ptr= GEARMAN_SUCCESS;
294 
295     return packet->args_size;
296   }
297 
298 private:
299 };
300 
301 static Geartext gear_context;
302 
_gear_con_add(gearman_server_con_st * connection)303 static gearmand_error_t _gear_con_add(gearman_server_con_st *connection)
304 {
305   gearmand_info("Gear connection made");
306 
307   connection->set_protocol(&gear_context);
308 
309   return GEARMAN_SUCCESS;
310 }
311 
312 namespace gearmand {
313 namespace protocol {
314 
Gear()315 Gear::Gear() :
316   Plugin("Gear")
317 {
318   command_line_options().add_options()
319     ("port,p", boost::program_options::value(&_port)->default_value(GEARMAN_DEFAULT_TCP_PORT_STRING),
320      "Port the server should listen on.");
321 }
322 
~Gear()323 Gear::~Gear()
324 {
325 }
326 
start(gearmand_st * gearmand)327 gearmand_error_t Gear::start(gearmand_st *gearmand)
328 {
329   gearmand_info("Initializing Gear");
330 
331   gearmand_error_t rc;
332   if (_port.empty())
333   {
334     struct servent *gearman_servent= getservbyname(GEARMAN_DEFAULT_TCP_SERVICE, NULL);
335 
336     if (gearman_servent and gearman_servent->s_name)
337     {
338       rc= gearmand_port_add(gearmand, gearman_servent->s_name, _gear_con_add);
339     }
340     else
341     {
342       rc= gearmand_port_add(gearmand, GEARMAN_DEFAULT_TCP_PORT_STRING, _gear_con_add);
343     }
344   }
345   else
346   {
347     rc= gearmand_port_add(gearmand, _port.c_str(), _gear_con_add);
348   }
349 
350   return rc;
351 }
352 
353 } // namespace protocol
354 } // namespace gearmand
355