1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Libmemcached library
4  *
5  *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2010 Brian Aker All rights reserved.
7  *
8  *  Redistribution and use in source and binary forms, with or without
9  *  modification, are permitted provided that the following conditions are
10  *  met:
11  *
12  *      * Redistributions of source code must retain the above copyright
13  *  notice, this list of conditions and the following disclaimer.
14  *
15  *      * Redistributions in binary form must reproduce the above
16  *  copyright notice, this list of conditions and the following disclaimer
17  *  in the documentation and/or other materials provided with the
18  *  distribution.
19  *
20  *      * The names of its contributors may not be used to endorse or
21  *  promote products derived from this software without specific prior
22  *  written permission.
23  *
24  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35  *
36  */
37 
38 
39 #include <libmemcachedutil/common.h>
40 
41 #include <cassert>
42 #include <cerrno>
43 #include <pthread.h>
44 #include <memory>
45 
46 struct memcached_pool_st
47 {
48   pthread_mutex_t mutex;
49   pthread_cond_t cond;
50   memcached_st *master;
51   memcached_st **server_pool;
52   int firstfree;
53   const uint32_t size;
54   uint32_t current_size;
55   bool _owns_master;
56   struct timespec _timeout;
57 
memcached_pool_stmemcached_pool_st58   memcached_pool_st(memcached_st *master_arg, size_t max_arg) :
59     master(master_arg),
60     server_pool(NULL),
61     firstfree(-1),
62     size(uint32_t(max_arg)),
63     current_size(0),
64     _owns_master(false)
65   {
66     pthread_mutex_init(&mutex, NULL);
67     pthread_cond_init(&cond, NULL);
68     _timeout.tv_sec= 5;
69     _timeout.tv_nsec= 0;
70   }
71 
timeoutmemcached_pool_st72   const struct timespec& timeout() const
73   {
74     return _timeout;
75   }
76 
77   bool release(memcached_st*, memcached_return_t& rc);
78 
79   memcached_st *fetch(memcached_return_t& rc);
80   memcached_st *fetch(const struct timespec&, memcached_return_t& rc);
81 
82   bool init(uint32_t initial);
83 
~memcached_pool_stmemcached_pool_st84   ~memcached_pool_st()
85   {
86     for (int x= 0; x <= firstfree; ++x)
87     {
88       memcached_free(server_pool[x]);
89       server_pool[x]= NULL;
90     }
91 
92     int error;
93     if ((error= pthread_mutex_destroy(&mutex)) != 0)
94     {
95       assert_vmsg(error != 0, "pthread_mutex_destroy() %s(%d)", strerror(error), error);
96     }
97 
98     if ((error= pthread_cond_destroy(&cond)) != 0)
99     {
100       assert_vmsg(error != 0, "pthread_cond_destroy() %s", strerror(error));
101     }
102 
103     delete [] server_pool;
104     if (_owns_master)
105     {
106       memcached_free(master);
107     }
108   }
109 
increment_versionmemcached_pool_st110   void increment_version()
111   {
112     ++master->configure.version;
113   }
114 
compare_versionmemcached_pool_st115   bool compare_version(const memcached_st *arg) const
116   {
117     return (arg->configure.version == version());
118   }
119 
versionmemcached_pool_st120   int32_t version() const
121   {
122     return master->configure.version;
123   }
124 };
125 
126 
127 /**
128  * Grow the connection pool by creating a connection structure and clone the
129  * original memcached handle.
130  */
grow_pool(memcached_pool_st * pool)131 static bool grow_pool(memcached_pool_st* pool)
132 {
133   assert(pool);
134 
135   memcached_st *obj;
136   if (not (obj= memcached_clone(NULL, pool->master)))
137   {
138     return false;
139   }
140 
141   pool->server_pool[++pool->firstfree]= obj;
142   pool->current_size++;
143   obj->configure.version= pool->version();
144 
145   return true;
146 }
147 
init(uint32_t initial)148 bool memcached_pool_st::init(uint32_t initial)
149 {
150   server_pool= new (std::nothrow) memcached_st *[size];
151   if (server_pool == NULL)
152   {
153     return false;
154   }
155 
156   /*
157     Try to create the initial size of the pool. An allocation failure at
158     this time is not fatal..
159   */
160   for (unsigned int x= 0; x < initial; ++x)
161   {
162     if (grow_pool(this) == false)
163     {
164       break;
165     }
166   }
167 
168   return true;
169 }
170 
171 
_pool_create(memcached_st * master,uint32_t initial,uint32_t max)172 static inline memcached_pool_st *_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
173 {
174   if (initial == 0 or max == 0 or (initial > max))
175   {
176     return NULL;
177   }
178 
179   memcached_pool_st *object= new (std::nothrow) memcached_pool_st(master, max);
180   if (object == NULL)
181   {
182     return NULL;
183   }
184 
185   /*
186     Try to create the initial size of the pool. An allocation failure at
187     this time is not fatal..
188   */
189   if (not object->init(initial))
190   {
191     delete object;
192     return NULL;
193   }
194 
195   return object;
196 }
197 
memcached_pool_create(memcached_st * master,uint32_t initial,uint32_t max)198 memcached_pool_st *memcached_pool_create(memcached_st* master, uint32_t initial, uint32_t max)
199 {
200   return _pool_create(master, initial, max);
201 }
202 
memcached_pool(const char * option_string,size_t option_string_length)203 memcached_pool_st * memcached_pool(const char *option_string, size_t option_string_length)
204 {
205   memcached_st *memc= memcached(option_string, option_string_length);
206 
207   if (memc == NULL)
208   {
209     return NULL;
210   }
211 
212   memcached_pool_st *self= memcached_pool_create(memc, memc->configure.initial_pool_size, memc->configure.max_pool_size);
213   if (self == NULL)
214   {
215     memcached_free(memc);
216     return NULL;
217   }
218 
219   self->_owns_master= true;
220 
221   return self;
222 }
223 
memcached_pool_destroy(memcached_pool_st * pool)224 memcached_st*  memcached_pool_destroy(memcached_pool_st* pool)
225 {
226   if (pool == NULL)
227   {
228     return NULL;
229   }
230 
231   // Legacy that we return the original structure
232   memcached_st *ret= NULL;
233   if (pool->_owns_master)
234   { }
235   else
236   {
237     ret= pool->master;
238   }
239 
240   delete pool;
241 
242   return ret;
243 }
244 
fetch(memcached_return_t & rc)245 memcached_st* memcached_pool_st::fetch(memcached_return_t& rc)
246 {
247   static struct timespec relative_time= { 0, 0 };
248   return fetch(relative_time, rc);
249 }
250 
fetch(const struct timespec & relative_time,memcached_return_t & rc)251 memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc)
252 {
253   rc= MEMCACHED_SUCCESS;
254 
255   int error;
256   if ((error= pthread_mutex_lock(&mutex)) != 0)
257   {
258     rc= MEMCACHED_IN_PROGRESS;
259     return NULL;
260   }
261 
262   memcached_st *ret= NULL;
263   do
264   {
265     if (firstfree > -1)
266     {
267       ret= server_pool[firstfree--];
268     }
269     else if (current_size == size)
270     {
271       if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0)
272       {
273         error= pthread_mutex_unlock(&mutex);
274         rc= MEMCACHED_NOTFOUND;
275 
276         return NULL;
277       }
278 
279       struct timespec time_to_wait= {0, 0};
280       time_to_wait.tv_sec= time(NULL) +relative_time.tv_sec;
281       time_to_wait.tv_nsec= relative_time.tv_nsec;
282 
283       int thread_ret;
284       if ((thread_ret= pthread_cond_timedwait(&cond, &mutex, &time_to_wait)) != 0)
285       {
286         int unlock_error;
287         if ((unlock_error= pthread_mutex_unlock(&mutex)) != 0)
288         {
289           assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
290         }
291 
292         if (thread_ret == ETIMEDOUT)
293         {
294           rc= MEMCACHED_TIMEOUT;
295         }
296         else
297         {
298           errno= thread_ret;
299           rc= MEMCACHED_ERRNO;
300         }
301 
302         return NULL;
303       }
304     }
305     else if (grow_pool(this) == false)
306     {
307       int unlock_error;
308       if ((unlock_error= pthread_mutex_unlock(&mutex)) != 0)
309       {
310         assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
311       }
312 
313       return NULL;
314     }
315   } while (ret == NULL);
316 
317   if ((error= pthread_mutex_unlock(&mutex)) != 0)
318   {
319     assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
320   }
321 
322   return ret;
323 }
324 
release(memcached_st * released,memcached_return_t & rc)325 bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc)
326 {
327   rc= MEMCACHED_SUCCESS;
328   if (released == NULL)
329   {
330     rc= MEMCACHED_INVALID_ARGUMENTS;
331     return false;
332   }
333 
334   int error;
335   if ((error= pthread_mutex_lock(&mutex)))
336   {
337     rc= MEMCACHED_IN_PROGRESS;
338     return false;
339   }
340 
341   /*
342     Someone updated the behavior on the object, so we clone a new memcached_st with the new settings. If we fail to clone, we keep the old one around.
343   */
344   if (compare_version(released) == false)
345   {
346     memcached_st *memc;
347     if ((memc= memcached_clone(NULL, master)))
348     {
349       memcached_free(released);
350       released= memc;
351     }
352   }
353 
354   server_pool[++firstfree]= released;
355 
356   if (firstfree == 0 and current_size == size)
357   {
358     /* we might have people waiting for a connection.. wake them up :-) */
359     if ((error= pthread_cond_broadcast(&cond)) != 0)
360     {
361       assert_vmsg(error != 0, "pthread_cond_broadcast() %s", strerror(error));
362     }
363   }
364 
365   if ((error= pthread_mutex_unlock(&mutex)) != 0)
366   {
367   }
368 
369   return true;
370 }
371 
memcached_pool_fetch(memcached_pool_st * pool,struct timespec * relative_time,memcached_return_t * rc)372 memcached_st* memcached_pool_fetch(memcached_pool_st* pool, struct timespec* relative_time, memcached_return_t* rc)
373 {
374   if (pool == NULL)
375   {
376     return NULL;
377   }
378 
379   memcached_return_t unused;
380   if (rc == NULL)
381   {
382     rc= &unused;
383   }
384 
385   if (relative_time == NULL)
386   {
387     return pool->fetch(*rc);
388   }
389 
390   return pool->fetch(*relative_time, *rc);
391 }
392 
memcached_pool_pop(memcached_pool_st * pool,bool block,memcached_return_t * rc)393 memcached_st* memcached_pool_pop(memcached_pool_st* pool,
394                                  bool block,
395                                  memcached_return_t *rc)
396 {
397   if (pool == NULL)
398   {
399     return NULL;
400   }
401 
402   memcached_return_t unused;
403   if (rc == NULL)
404   {
405     rc= &unused;
406   }
407 
408   memcached_st *memc;
409   if (block)
410   {
411     memc= pool->fetch(pool->timeout(), *rc);
412   }
413   else
414   {
415     memc= pool->fetch(*rc);
416   }
417 
418   return memc;
419 }
420 
memcached_pool_release(memcached_pool_st * pool,memcached_st * released)421 memcached_return_t memcached_pool_release(memcached_pool_st* pool, memcached_st *released)
422 {
423   if (pool == NULL)
424   {
425     return MEMCACHED_INVALID_ARGUMENTS;
426   }
427 
428   memcached_return_t rc;
429 
430   (void) pool->release(released, rc);
431 
432   return rc;
433 }
434 
memcached_pool_push(memcached_pool_st * pool,memcached_st * released)435 memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *released)
436 {
437   return memcached_pool_release(pool, released);
438 }
439 
440 
memcached_pool_behavior_set(memcached_pool_st * pool,memcached_behavior_t flag,uint64_t data)441 memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool,
442                                                memcached_behavior_t flag,
443                                                uint64_t data)
444 {
445   if (pool == NULL)
446   {
447     return MEMCACHED_INVALID_ARGUMENTS;
448   }
449 
450   int error;
451   if ((error= pthread_mutex_lock(&pool->mutex)))
452   {
453     return MEMCACHED_IN_PROGRESS;
454   }
455 
456   /* update the master */
457   memcached_return_t rc= memcached_behavior_set(pool->master, flag, data);
458   if (memcached_failed(rc))
459   {
460     if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
461     {
462       assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
463     }
464     return rc;
465   }
466 
467   pool->increment_version();
468   /* update the clones */
469   for (int xx= 0; xx <= pool->firstfree; ++xx)
470   {
471     if (memcached_success(memcached_behavior_set(pool->server_pool[xx], flag, data)))
472     {
473       pool->server_pool[xx]->configure.version= pool->version();
474     }
475     else
476     {
477       memcached_st *memc;
478       if ((memc= memcached_clone(NULL, pool->master)))
479       {
480         memcached_free(pool->server_pool[xx]);
481         pool->server_pool[xx]= memc;
482         /* I'm not sure what to do in this case.. this would happen
483           if we fail to push the server list inside the client..
484           I should add a testcase for this, but I believe the following
485           would work, except that you would add a hole in the pool list..
486           in theory you could end up with an empty pool....
487         */
488       }
489     }
490   }
491 
492   if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
493   {
494     assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
495   }
496 
497   return rc;
498 }
499 
memcached_pool_behavior_get(memcached_pool_st * pool,memcached_behavior_t flag,uint64_t * value)500 memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool,
501                                                memcached_behavior_t flag,
502                                                uint64_t *value)
503 {
504   if (pool == NULL)
505   {
506     return MEMCACHED_INVALID_ARGUMENTS;
507   }
508 
509   int error;
510   if ((error= pthread_mutex_lock(&pool->mutex)))
511   {
512     return MEMCACHED_IN_PROGRESS;
513   }
514 
515   *value= memcached_behavior_get(pool->master, flag);
516 
517   if ((error= pthread_mutex_unlock(&pool->mutex)) != 0)
518   {
519     assert_vmsg(error != 0, "pthread_mutex_unlock() %s", strerror(error));
520   }
521 
522   return MEMCACHED_SUCCESS;
523 }
524