1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2012 Data Differential, http://datadifferential.com/
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are
10 * met:
11 *
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * * Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following disclaimer
17 * in the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * * The names of its contributors may not be used to endorse or
21 * promote products derived from this software without specific prior
22 * written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 *
36 */
37
38
39 /**
40 * @file
41 * @brief Gear Protocol Definitions
42 */
43
44 #include "gear_config.h"
45
46 #include <libgearman-server/common.h>
47 #include <libgearman/strcommand.h>
48 #include <libgearman-server/packet.h>
49
50 #include <cstdio>
51 #include <cstdlib>
52
53 #include <libgearman-server/plugins/protocol/gear/protocol.h>
54 #include "libgearman/command.h"
55
gearmand_packet_unpack_header(gearmand_packet_st * packet)56 static gearmand_error_t gearmand_packet_unpack_header(gearmand_packet_st *packet)
57 {
58 uint32_t tmp;
59
60 if (memcmp(packet->args, "\0REQ", 4) == 0)
61 {
62 packet->magic= GEARMAN_MAGIC_REQUEST;
63 }
64 else if (memcmp(packet->args, "\0RES", 4) == 0)
65 {
66 packet->magic= GEARMAN_MAGIC_RESPONSE;
67 }
68 else
69 {
70 gearmand_warning("invalid magic value");
71 return GEARMAN_INVALID_MAGIC;
72 }
73
74 memcpy(&tmp, packet->args + 4, 4);
75 packet->command= static_cast<gearman_command_t>(ntohl(tmp));
76
77 if (packet->command == GEARMAN_COMMAND_TEXT ||
78 packet->command >= GEARMAN_COMMAND_MAX)
79 {
80 gearmand_error("invalid command value");
81 return GEARMAN_INVALID_COMMAND;
82 }
83
84 memcpy(&tmp, packet->args + 8, 4);
85 packet->data_size= ntohl(tmp);
86
87 return GEARMAN_SUCCESS;
88 }
89
90 class Geartext : public gearmand::protocol::Context {
91
92 public:
~Geartext()93 ~Geartext()
94 { }
95
is_owner()96 bool is_owner()
97 {
98 return false;
99 }
100
notify(gearman_server_con_st *)101 void notify(gearman_server_con_st *)
102 {
103 gearmand_info("Gear connection disconnected");
104 }
105
unpack(gearmand_packet_st * packet,gearman_server_con_st *,const void * data,const size_t data_size,gearmand_error_t & ret_ptr)106 size_t unpack(gearmand_packet_st *packet,
107 gearman_server_con_st *,
108 const void *data, const size_t data_size,
109 gearmand_error_t& ret_ptr)
110 {
111 size_t used_size;
112 gearmand_info("Gear unpack");
113
114 if (packet->args_size == 0)
115 {
116 if (data_size > 0 && ((uint8_t *)data)[0] != 0)
117 {
118 /* Try to parse a text-based command. */
119 uint8_t* ptr= (uint8_t *)memchr(data, '\n', data_size);
120 if (ptr == NULL)
121 {
122 ret_ptr= GEARMAN_IO_WAIT;
123 return 0;
124 }
125
126 packet->magic= GEARMAN_MAGIC_TEXT;
127 packet->command= GEARMAN_COMMAND_TEXT;
128
129 used_size= size_t(ptr - ((uint8_t *)data)) +1;
130 *ptr= 0;
131 if (used_size > 1 && *(ptr - 1) == '\r')
132 {
133 *(ptr - 1)= 0;
134 }
135
136 size_t arg_size;
137 for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
138 {
139 ptr= (uint8_t *)memchr(data, ' ', arg_size);
140 if (ptr != NULL)
141 {
142 *ptr= 0;
143 ptr++;
144 while (*ptr == ' ')
145 {
146 ptr++;
147 }
148
149 arg_size-= size_t(ptr - ((uint8_t *)data));
150 }
151
152 ret_ptr= gearmand_packet_create(packet, data, ptr == NULL ? arg_size :
153 size_t(ptr - ((uint8_t *)data)));
154 if (ret_ptr != GEARMAN_SUCCESS)
155 {
156 return used_size;
157 }
158 }
159
160 return used_size;
161 }
162 else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
163 {
164 ret_ptr= GEARMAN_IO_WAIT;
165 return 0;
166 }
167
168 packet->args= packet->args_buffer;
169 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
170 memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
171
172 if (gearmand_failed(ret_ptr= gearmand_packet_unpack_header(packet)))
173 {
174 return 0;
175 }
176
177 used_size= GEARMAN_PACKET_HEADER_SIZE;
178 }
179 else
180 {
181 used_size= 0;
182 }
183
184 while (packet->argc != gearman_command_info(packet->command)->argc)
185 {
186 if (packet->argc != (gearman_command_info(packet->command)->argc - 1) or
187 gearman_command_info(packet->command)->data)
188 {
189 uint8_t* ptr= (uint8_t *)memchr(((uint8_t *)data) +used_size, 0,
190 data_size -used_size);
191 if (ptr == NULL)
192 {
193 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
194 "Possible protocol error for %s, received only %u args",
195 gearman_command_info(packet->command)->name, packet->argc);
196 ret_ptr= GEARMAN_IO_WAIT;
197 return used_size;
198 }
199
200 size_t arg_size= size_t(ptr - (((uint8_t *)data) + used_size)) +1;
201 if (gearmand_failed((ret_ptr= gearmand_packet_create(packet, ((uint8_t *)data) + used_size, arg_size))))
202 {
203 return used_size;
204 }
205
206 packet->data_size-= arg_size;
207 used_size+= arg_size;
208 }
209 else
210 {
211 if ((data_size - used_size) < packet->data_size)
212 {
213 ret_ptr= GEARMAN_IO_WAIT;
214 return used_size;
215 }
216
217 ret_ptr= gearmand_packet_create(packet, ((uint8_t *)data) + used_size, packet->data_size);
218 if (gearmand_failed(ret_ptr))
219 {
220 return used_size;
221 }
222
223 used_size+= packet->data_size;
224 packet->data_size= 0;
225 }
226 }
227
228 if (packet->command == GEARMAN_COMMAND_ECHO_RES)
229 {
230 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
231 "GEAR length: %" PRIu64 " gearmand_command_t: %s echo: %.*s",
232 uint64_t(packet->data_size),
233 gearman_strcommand(packet->command),
234 int(packet->data_size),
235 packet->data);
236 }
237 else if (packet->command == GEARMAN_COMMAND_TEXT)
238 {
239 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
240 "GEAR length: %" PRIu64 " gearmand_command_t: %s text: %.*s",
241 uint64_t(packet->data_size),
242 gearman_strcommand(packet->command),
243 int(packet->data_size),
244 packet->data);
245 }
246 else
247 {
248 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
249 "GEAR length: %" PRIu64 " gearmand_command_t: %s",
250 uint64_t(packet->data_size),
251 gearman_strcommand(packet->command));
252 }
253
254 ret_ptr= GEARMAN_SUCCESS;
255 return used_size;
256 }
257
pack(const gearmand_packet_st * packet,gearman_server_con_st *,void * data,const size_t data_size,gearmand_error_t & ret_ptr)258 size_t pack(const gearmand_packet_st *packet,
259 gearman_server_con_st*,
260 void *data, const size_t data_size,
261 gearmand_error_t& ret_ptr)
262 {
263 if (packet->command == GEARMAN_COMMAND_ECHO_RES)
264 {
265 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
266 "GEAR length: %" PRIu64 " gearmand_command_t: %s echo: %.*",
267 uint64_t(packet->data_size),
268 gearman_strcommand(packet->command),
269 int(packet->data_size),
270 packet->data);
271 }
272 else
273 {
274 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
275 "GEAR length: %" PRIu64 " gearmand_command_t: %s",
276 uint64_t(packet->data_size),
277 gearman_strcommand(packet->command));
278 }
279
280 if (packet->args_size == 0)
281 {
282 ret_ptr= GEARMAN_SUCCESS;
283 return 0;
284 }
285
286 if (packet->args_size > data_size)
287 {
288 ret_ptr= GEARMAN_FLUSH_DATA;
289 return 0;
290 }
291
292 memcpy(data, packet->args, packet->args_size);
293 ret_ptr= GEARMAN_SUCCESS;
294
295 return packet->args_size;
296 }
297
298 private:
299 };
300
301 static Geartext gear_context;
302
_gear_con_add(gearman_server_con_st * connection)303 static gearmand_error_t _gear_con_add(gearman_server_con_st *connection)
304 {
305 gearmand_info("Gear connection made");
306
307 connection->set_protocol(&gear_context);
308
309 return GEARMAN_SUCCESS;
310 }
311
312 namespace gearmand {
313 namespace protocol {
314
Gear()315 Gear::Gear() :
316 Plugin("Gear")
317 {
318 command_line_options().add_options()
319 ("port,p", boost::program_options::value(&_port)->default_value(GEARMAN_DEFAULT_TCP_PORT_STRING),
320 "Port the server should listen on.");
321 }
322
~Gear()323 Gear::~Gear()
324 {
325 }
326
start(gearmand_st * gearmand)327 gearmand_error_t Gear::start(gearmand_st *gearmand)
328 {
329 gearmand_info("Initializing Gear");
330
331 gearmand_error_t rc;
332 if (_port.empty())
333 {
334 struct servent *gearman_servent= getservbyname(GEARMAN_DEFAULT_TCP_SERVICE, NULL);
335
336 if (gearman_servent and gearman_servent->s_name)
337 {
338 rc= gearmand_port_add(gearmand, gearman_servent->s_name, _gear_con_add);
339 }
340 else
341 {
342 rc= gearmand_port_add(gearmand, GEARMAN_DEFAULT_TCP_PORT_STRING, _gear_con_add);
343 }
344 }
345 else
346 {
347 rc= gearmand_port_add(gearmand, _port.c_str(), _gear_con_add);
348 }
349
350 return rc;
351 }
352
353 } // namespace protocol
354 } // namespace gearmand
355