1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Libmemcached library
4  *
5  *  Copyright (C) 2011-2013 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2006-2009 Brian Aker All rights reserved.
7  *
8  *  Redistribution and use in source and binary forms, with or without
9  *  modification, are permitted provided that the following conditions are
10  *  met:
11  *
12  *      * Redistributions of source code must retain the above copyright
13  *  notice, this list of conditions and the following disclaimer.
14  *
15  *      * Redistributions in binary form must reproduce the above
16  *  copyright notice, this list of conditions and the following disclaimer
17  *  in the documentation and/or other materials provided with the
18  *  distribution.
19  *
20  *      * The names of its contributors may not be used to endorse or
21  *  promote products derived from this software without specific prior
22  *  written permission.
23  *
24  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35  *
36  */
37 
38 #include <libmemcached/common.h>
39 
40 /*
41   What happens if no servers exist?
42 */
memcached_get(memcached_st * ptr,const char * key,size_t key_length,size_t * value_length,uint32_t * flags,memcached_return_t * error)43 char *memcached_get(memcached_st *ptr, const char *key,
44                     size_t key_length,
45                     size_t *value_length,
46                     uint32_t *flags,
47                     memcached_return_t *error)
48 {
49   return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
50                               flags, error);
51 }
52 
53 static memcached_return_t __mget_by_key_real(memcached_st *ptr,
54                                              const char *group_key,
55                                              size_t group_key_length,
56                                              const char * const *keys,
57                                              const size_t *key_length,
58                                              size_t number_of_keys,
59                                              const bool mget_mode);
memcached_get_by_key(memcached_st * shell,const char * group_key,size_t group_key_length,const char * key,size_t key_length,size_t * value_length,uint32_t * flags,memcached_return_t * error)60 char *memcached_get_by_key(memcached_st *shell,
61                            const char *group_key,
62                            size_t group_key_length,
63                            const char *key, size_t key_length,
64                            size_t *value_length,
65                            uint32_t *flags,
66                            memcached_return_t *error)
67 {
68   Memcached* ptr= memcached2Memcached(shell);
69   memcached_return_t unused;
70   if (error == NULL)
71   {
72     error= &unused;
73   }
74 
75   uint64_t query_id= 0;
76   if (ptr)
77   {
78     query_id= ptr->query_id;
79   }
80 
81   /* Request the key */
82   *error= __mget_by_key_real(ptr, group_key, group_key_length,
83                              (const char * const *)&key, &key_length,
84                              1, false);
85   if (ptr)
86   {
87     assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
88   }
89 
90   if (memcached_failed(*error))
91   {
92     if (ptr)
93     {
94       if (memcached_has_current_error(*ptr)) // Find the most accurate error
95       {
96         *error= memcached_last_error(ptr);
97       }
98     }
99 
100     if (value_length)
101     {
102       *value_length= 0;
103     }
104 
105     return NULL;
106   }
107 
108   char *value= memcached_fetch(ptr, NULL, NULL,
109                                value_length, flags, error);
110   assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
111 
112   /* This is for historical reasons */
113   if (*error == MEMCACHED_END)
114   {
115     *error= MEMCACHED_NOTFOUND;
116   }
117   if (value == NULL)
118   {
119     if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND)
120     {
121       memcached_result_st key_failure_result;
122       memcached_result_st* result_ptr= memcached_result_create(ptr, &key_failure_result);
123       memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, result_ptr);
124 
125       /* On all failure drop to returning NULL */
126       if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
127       {
128         if (rc == MEMCACHED_BUFFERED)
129         {
130           uint64_t latch; /* We use latch to track the state of the original socket */
131           latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
132           if (latch == 0)
133           {
134             memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
135           }
136 
137           rc= memcached_set(ptr, key, key_length,
138                             (memcached_result_value(result_ptr)),
139                             (memcached_result_length(result_ptr)),
140                             0,
141                             (memcached_result_flags(result_ptr)));
142 
143           if (rc == MEMCACHED_BUFFERED and latch == 0)
144           {
145             memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
146           }
147         }
148         else
149         {
150           rc= memcached_set(ptr, key, key_length,
151                             (memcached_result_value(result_ptr)),
152                             (memcached_result_length(result_ptr)),
153                             0,
154                             (memcached_result_flags(result_ptr)));
155         }
156 
157         if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
158         {
159           *error= rc;
160           *value_length= memcached_result_length(result_ptr);
161           *flags= memcached_result_flags(result_ptr);
162           char *result_value=  memcached_string_take_value(&result_ptr->value);
163           memcached_result_free(result_ptr);
164 
165           return result_value;
166         }
167       }
168 
169       memcached_result_free(result_ptr);
170     }
171     assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
172 
173     return NULL;
174   }
175 
176   return value;
177 }
178 
memcached_mget(memcached_st * ptr,const char * const * keys,const size_t * key_length,size_t number_of_keys)179 memcached_return_t memcached_mget(memcached_st *ptr,
180                                   const char * const *keys,
181                                   const size_t *key_length,
182                                   size_t number_of_keys)
183 {
184   return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
185 }
186 
187 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
188                                              const uint32_t master_server_key,
189                                              const bool is_group_key_set,
190                                              const char * const *keys,
191                                              const size_t *key_length,
192                                              const size_t number_of_keys,
193                                              const bool mget_mode);
194 
__mget_by_key_real(memcached_st * ptr,const char * group_key,const size_t group_key_length,const char * const * keys,const size_t * key_length,size_t number_of_keys,const bool mget_mode)195 static memcached_return_t __mget_by_key_real(memcached_st *ptr,
196                                              const char *group_key,
197                                              const size_t group_key_length,
198                                              const char * const *keys,
199                                              const size_t *key_length,
200                                              size_t number_of_keys,
201                                              const bool mget_mode)
202 {
203   bool failures_occured_in_sending= false;
204   const char *get_command= "get";
205   uint8_t get_command_length= 3;
206   unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
207 
208   memcached_return_t rc;
209   if (memcached_failed(rc= initialize_query(ptr, true)))
210   {
211     return rc;
212   }
213 
214   if (memcached_is_udp(ptr))
215   {
216     return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
217   }
218 
219   LIBMEMCACHED_MEMCACHED_MGET_START();
220 
221   if (number_of_keys == 0)
222   {
223     return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, memcached_literal_param("Numbers of keys provided was zero"));
224   }
225 
226   if (memcached_failed((rc= memcached_key_test(*ptr, keys, key_length, number_of_keys))))
227   {
228     assert(memcached_last_error(ptr) == rc);
229 
230     return rc;
231   }
232 
233   bool is_group_key_set= false;
234   if (group_key and group_key_length)
235   {
236     master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length);
237     is_group_key_set= true;
238   }
239 
240   /*
241     Here is where we pay for the non-block API. We need to remove any data sitting
242     in the queue before we start our get.
243 
244     It might be optimum to bounce the connection if count > some number.
245   */
246   for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
247   {
248     memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
249 
250     if (instance->response_count())
251     {
252       char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
253 
254       if (ptr->flags.no_block)
255       {
256         memcached_io_write(instance);
257       }
258 
259       while(instance->response_count())
260       {
261         (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
262       }
263     }
264   }
265 
266   if (memcached_is_binary(ptr))
267   {
268     return binary_mget_by_key(ptr, master_server_key, is_group_key_set, keys,
269                               key_length, number_of_keys, mget_mode);
270   }
271 
272   if (ptr->flags.support_cas)
273   {
274     get_command= "gets";
275     get_command_length= 4;
276   }
277 
278   /*
279     If a server fails we warn about errors and start all over with sending keys
280     to the server.
281   */
282   WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
283   size_t hosts_connected= 0;
284   for (uint32_t x= 0; x < number_of_keys; x++)
285   {
286     uint32_t server_key;
287 
288     if (is_group_key_set)
289     {
290       server_key= master_server_key;
291     }
292     else
293     {
294       server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
295     }
296 
297     memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
298 
299     libmemcached_io_vector_st vector[]=
300     {
301       { get_command, get_command_length },
302       { memcached_literal_param(" ") },
303       { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
304       { keys[x], key_length[x] }
305     };
306 
307 
308     if (instance->response_count() == 0)
309     {
310       rc= memcached_connect(instance);
311 
312       if (memcached_failed(rc))
313       {
314         memcached_set_error(*instance, rc, MEMCACHED_AT);
315         continue;
316       }
317       hosts_connected++;
318 
319       if ((memcached_io_writev(instance, vector, 1, false)) == false)
320       {
321         failures_occured_in_sending= true;
322         continue;
323       }
324       WATCHPOINT_ASSERT(instance->cursor_active_ == 0);
325       memcached_instance_response_increment(instance);
326       WATCHPOINT_ASSERT(instance->cursor_active_ == 1);
327     }
328 
329     {
330       if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false)
331       {
332         memcached_instance_response_reset(instance);
333         failures_occured_in_sending= true;
334         continue;
335       }
336     }
337   }
338 
339   if (hosts_connected == 0)
340   {
341     LIBMEMCACHED_MEMCACHED_MGET_END();
342 
343     if (memcached_failed(rc))
344     {
345       return rc;
346     }
347 
348     return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT);
349   }
350 
351 
352   /*
353     Should we muddle on if some servers are dead?
354   */
355   bool success_happened= false;
356   for (uint32_t x= 0; x < memcached_server_count(ptr); x++)
357   {
358     memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
359 
360     if (instance->response_count())
361     {
362       /* We need to do something about non-connnected hosts in the future */
363       if ((memcached_io_write(instance, "\r\n", 2, true)) == -1)
364       {
365         failures_occured_in_sending= true;
366       }
367       else
368       {
369         success_happened= true;
370       }
371     }
372   }
373 
374   LIBMEMCACHED_MEMCACHED_MGET_END();
375 
376   if (failures_occured_in_sending and success_happened)
377   {
378     return MEMCACHED_SOME_ERRORS;
379   }
380 
381   if (success_happened)
382   {
383     return MEMCACHED_SUCCESS;
384   }
385 
386   return MEMCACHED_FAILURE; // Complete failure occurred
387 }
388 
memcached_mget_by_key(memcached_st * shell,const char * group_key,size_t group_key_length,const char * const * keys,const size_t * key_length,size_t number_of_keys)389 memcached_return_t memcached_mget_by_key(memcached_st *shell,
390                                          const char *group_key,
391                                          size_t group_key_length,
392                                          const char * const *keys,
393                                          const size_t *key_length,
394                                          size_t number_of_keys)
395 {
396   Memcached* ptr= memcached2Memcached(shell);
397   return __mget_by_key_real(ptr, group_key, group_key_length, keys, key_length, number_of_keys, true);
398 }
399 
memcached_mget_execute(memcached_st * ptr,const char * const * keys,const size_t * key_length,size_t number_of_keys,memcached_execute_fn * callback,void * context,unsigned int number_of_callbacks)400 memcached_return_t memcached_mget_execute(memcached_st *ptr,
401                                           const char * const *keys,
402                                           const size_t *key_length,
403                                           size_t number_of_keys,
404                                           memcached_execute_fn *callback,
405                                           void *context,
406                                           unsigned int number_of_callbacks)
407 {
408   return memcached_mget_execute_by_key(ptr, NULL, 0, keys, key_length,
409                                        number_of_keys, callback,
410                                        context, number_of_callbacks);
411 }
412 
memcached_mget_execute_by_key(memcached_st * shell,const char * group_key,size_t group_key_length,const char * const * keys,const size_t * key_length,size_t number_of_keys,memcached_execute_fn * callback,void * context,unsigned int number_of_callbacks)413 memcached_return_t memcached_mget_execute_by_key(memcached_st *shell,
414                                                  const char *group_key,
415                                                  size_t group_key_length,
416                                                  const char * const *keys,
417                                                  const size_t *key_length,
418                                                  size_t number_of_keys,
419                                                  memcached_execute_fn *callback,
420                                                  void *context,
421                                                  unsigned int number_of_callbacks)
422 {
423   Memcached* ptr= memcached2Memcached(shell);
424   memcached_return_t rc;
425   if (memcached_failed(rc= initialize_query(ptr, false)))
426   {
427     return rc;
428   }
429 
430   if (memcached_is_udp(ptr))
431   {
432     return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT);
433   }
434 
435   if (memcached_is_binary(ptr) == false)
436   {
437     return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT,
438                                memcached_literal_param("ASCII protocol is not supported for memcached_mget_execute_by_key()"));
439   }
440 
441   memcached_callback_st *original_callbacks= ptr->callbacks;
442   memcached_callback_st cb= {
443     callback,
444     context,
445     number_of_callbacks
446   };
447 
448   ptr->callbacks= &cb;
449   rc= memcached_mget_by_key(ptr, group_key, group_key_length, keys,
450                             key_length, number_of_keys);
451   ptr->callbacks= original_callbacks;
452 
453   return rc;
454 }
455 
simple_binary_mget(memcached_st * ptr,const uint32_t master_server_key,bool is_group_key_set,const char * const * keys,const size_t * key_length,const size_t number_of_keys,const bool mget_mode)456 static memcached_return_t simple_binary_mget(memcached_st *ptr,
457                                              const uint32_t master_server_key,
458                                              bool is_group_key_set,
459                                              const char * const *keys,
460                                              const size_t *key_length,
461                                              const size_t number_of_keys, const bool mget_mode)
462 {
463   memcached_return_t rc= MEMCACHED_NOTFOUND;
464 
465   bool flush= (number_of_keys == 1);
466 
467   if (memcached_failed(rc= memcached_key_test(*ptr, keys, key_length, number_of_keys)))
468   {
469     return rc;
470   }
471 
472   /*
473     If a server fails we warn about errors and start all over with sending keys
474     to the server.
475   */
476   for (uint32_t x= 0; x < number_of_keys; ++x)
477   {
478     uint32_t server_key;
479 
480     if (is_group_key_set)
481     {
482       server_key= master_server_key;
483     }
484     else
485     {
486       server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
487     }
488 
489     memcached_instance_st* instance= memcached_instance_fetch(ptr, server_key);
490 
491     if (instance->response_count() == 0)
492     {
493       rc= memcached_connect(instance);
494       if (memcached_failed(rc))
495       {
496         continue;
497       }
498     }
499 
500     protocol_binary_request_getk request= { }; //= {.bytes= {0}};
501     initialize_binary_request(instance, request.message.header);
502     if (mget_mode)
503     {
504       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
505     }
506     else
507     {
508       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
509     }
510 
511 #if 0
512     {
513       memcached_return_t vk= memcached_validate_key_length(key_length[x], ptr->flags.binary_protocol);
514       if (memcached_failed(rc= memcached_key_test(*memc, (const char **)&key, &key_length, 1)))
515       {
516         memcached_set_error(ptr, vk, MEMCACHED_AT, memcached_literal_param("Key was too long."));
517 
518         if (x > 0)
519         {
520           memcached_io_reset(instance);
521         }
522 
523         return vk;
524       }
525     }
526 #endif
527 
528     request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
529     request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
530     request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->_namespace)));
531 
532     libmemcached_io_vector_st vector[]=
533     {
534       { request.bytes, sizeof(request.bytes) },
535       { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
536       { keys[x], key_length[x] }
537     };
538 
539     if (memcached_io_writev(instance, vector, 3, flush) == false)
540     {
541       memcached_server_response_reset(instance);
542       rc= MEMCACHED_SOME_ERRORS;
543       continue;
544     }
545 
546     /* We just want one pending response per server */
547     memcached_server_response_reset(instance);
548     memcached_server_response_increment(instance);
549     if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
550     {
551       rc= MEMCACHED_SOME_ERRORS;
552     }
553   }
554 
555   if (mget_mode)
556   {
557     /*
558       Send a noop command to flush the buffers
559     */
560     protocol_binary_request_noop request= {}; //= {.bytes= {0}};
561     request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
562     request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
563 
564     for (uint32_t x= 0; x < memcached_server_count(ptr); ++x)
565     {
566       memcached_instance_st* instance= memcached_instance_fetch(ptr, x);
567 
568       if (instance->response_count())
569       {
570         initialize_binary_request(instance, request.message.header);
571         if ((memcached_io_write(instance) == false) or
572             (memcached_io_write(instance, request.bytes, sizeof(request.bytes), true) == -1))
573         {
574           memcached_instance_response_reset(instance);
575           memcached_io_reset(instance);
576           rc= MEMCACHED_SOME_ERRORS;
577         }
578       }
579     }
580   }
581 
582   return rc;
583 }
584 
replication_binary_mget(memcached_st * ptr,uint32_t * hash,bool * dead_servers,const char * const * keys,const size_t * key_length,const size_t number_of_keys)585 static memcached_return_t replication_binary_mget(memcached_st *ptr,
586                                                   uint32_t* hash,
587                                                   bool* dead_servers,
588                                                   const char *const *keys,
589                                                   const size_t *key_length,
590                                                   const size_t number_of_keys)
591 {
592   memcached_return_t rc= MEMCACHED_NOTFOUND;
593   uint32_t start= 0;
594   uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
595 
596   if (randomize_read)
597   {
598     start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
599   }
600 
601   /* Loop for each replica */
602   for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
603   {
604     bool success= true;
605 
606     for (uint32_t x= 0; x < number_of_keys; ++x)
607     {
608       if (hash[x] == memcached_server_count(ptr))
609       {
610         continue; /* Already successfully sent */
611       }
612 
613       uint32_t server= hash[x] +replica;
614 
615       /* In case of randomized reads */
616       if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas)))
617       {
618         server+= start;
619       }
620 
621       while (server >= memcached_server_count(ptr))
622       {
623         server -= memcached_server_count(ptr);
624       }
625 
626       if (dead_servers[server])
627       {
628         continue;
629       }
630 
631       memcached_instance_st* instance= memcached_instance_fetch(ptr, server);
632 
633       if (instance->response_count() == 0)
634       {
635         rc= memcached_connect(instance);
636 
637         if (memcached_failed(rc))
638         {
639           memcached_io_reset(instance);
640           dead_servers[server]= true;
641           success= false;
642           continue;
643         }
644       }
645 
646       protocol_binary_request_getk request= {};
647       initialize_binary_request(instance, request.message.header);
648       request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
649       request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
650       request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
651       request.message.header.request.bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->_namespace)));
652 
653       /*
654        * We need to disable buffering to actually know that the request was
655        * successfully sent to the server (so that we should expect a result
656        * back). It would be nice to do this in buffered mode, but then it
657        * would be complex to handle all error situations if we got to send
658        * some of the messages, and then we failed on writing out some others
659        * and we used the callback interface from memcached_mget_execute so
660        * that we might have processed some of the responses etc. For now,
661        * just make sure we work _correctly_
662      */
663       libmemcached_io_vector_st vector[]=
664       {
665         { request.bytes, sizeof(request.bytes) },
666         { memcached_array_string(ptr->_namespace), memcached_array_size(ptr->_namespace) },
667         { keys[x], key_length[x] }
668       };
669 
670       if (memcached_io_writev(instance, vector, 3, true) == false)
671       {
672         memcached_io_reset(instance);
673         dead_servers[server]= true;
674         success= false;
675         continue;
676       }
677 
678       memcached_server_response_increment(instance);
679       hash[x]= memcached_server_count(ptr);
680     }
681 
682     if (success)
683     {
684       break;
685     }
686   }
687 
688   return rc;
689 }
690 
binary_mget_by_key(memcached_st * ptr,const uint32_t master_server_key,bool is_group_key_set,const char * const * keys,const size_t * key_length,const size_t number_of_keys,const bool mget_mode)691 static memcached_return_t binary_mget_by_key(memcached_st *ptr,
692                                              const uint32_t master_server_key,
693                                              bool is_group_key_set,
694                                              const char * const *keys,
695                                              const size_t *key_length,
696                                              const size_t number_of_keys,
697                                              const bool mget_mode)
698 {
699   if (ptr->number_of_replicas == 0)
700   {
701     return simple_binary_mget(ptr, master_server_key, is_group_key_set,
702                               keys, key_length, number_of_keys, mget_mode);
703   }
704 
705   uint32_t* hash= libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
706   bool* dead_servers= libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
707 
708   if (hash == NULL or dead_servers == NULL)
709   {
710     libmemcached_free(ptr, hash);
711     libmemcached_free(ptr, dead_servers);
712     return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
713   }
714 
715   if (is_group_key_set)
716   {
717     for (size_t x= 0; x < number_of_keys; x++)
718     {
719       hash[x]= master_server_key;
720     }
721   }
722   else
723   {
724     for (size_t x= 0; x < number_of_keys; x++)
725     {
726       hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]);
727     }
728   }
729 
730   memcached_return_t rc= replication_binary_mget(ptr, hash, dead_servers, keys,
731                                                  key_length, number_of_keys);
732 
733   WATCHPOINT_IFERROR(rc);
734   libmemcached_free(ptr, hash);
735   libmemcached_free(ptr, dead_servers);
736 
737   return MEMCACHED_SUCCESS;
738 }
739