1 /* Copyright (c) 2006, 2020, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    Without limiting anything contained in the foregoing, this file,
15    which is part of C Driver for MySQL (Connector/C), is also subject to the
16    Universal FOSS Exception, version 1.0, a copy of which can be found at
17    http://oss.oracle.com/licenses/universal-foss-exception.
18 
19    This program is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22    GNU General Public License, version 2.0, for more details.
23 
24    You should have received a copy of the GNU General Public License
25    along with this program; if not, write to the Free Software
26    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
27 
28 /**
29   @file mysys/lf_hash.cc
30   extensible hash
31 
32   @todo
33      try to get rid of dummy nodes ?
34      for non-unique hash, count only _distinct_ values
35      (but how to do it in lf_hash_delete ?)
36 */
37 #include <stddef.h>
38 #include <string.h>
39 #include <sys/types.h>
40 #include <atomic>
41 
42 #include "lf.h"
43 #include "m_ctype.h"
44 #include "my_atomic.h"
45 #include "my_bit.h"
46 #include "my_compiler.h"
47 #include "my_dbug.h"
48 #include "my_inttypes.h"
49 #include "my_sys.h"
50 #include "mysql/service_mysql_alloc.h"
51 #include "mysys/mysys_priv.h"
52 #include "template_utils.h"
53 
54 LF_REQUIRE_PINS(3)
55 
56 /* An element of the list */
57 struct LF_SLIST {
58   std::atomic<LF_SLIST *>
59       link;      /* a pointer to the next element in a listand a flag */
60   uint32 hashnr; /* reversed hash number, for sorting                 */
61   const uchar *key;
62   size_t keylen;
63   /*
64     data is stored here, directly after the keylen.
65     thus the pointer to data is (void*)(slist_element_ptr+1)
66   */
67 };
68 
69 const int LF_HASH_OVERHEAD = sizeof(LF_SLIST);
70 
71 /*
72   a structure to pass the context (pointers two the three successive elements
73   in a list) from my_lfind to linsert/ldelete
74 */
75 typedef struct {
76   std::atomic<LF_SLIST *> *prev;
77   LF_SLIST *curr, *next;
78 } CURSOR;
79 
80 /*
81   the last bit in LF_SLIST::link is a "deleted" flag.
82   the helper functions below convert it to a pure pointer or a pure flag
83 */
84 template <class T>
PTR(T * ptr)85 static inline T *PTR(T *ptr) {
86   intptr_t i = reinterpret_cast<intptr_t>(ptr);
87   i &= (intptr_t)~1;
88   return reinterpret_cast<T *>(i);
89 }
90 
91 template <class T>
DELETED(T * ptr)92 static inline bool DELETED(T *ptr) {
93   const intptr_t i = reinterpret_cast<intptr_t>(ptr);
94   return i & 1;
95 }
96 
97 template <class T>
SET_DELETED(T * ptr)98 static inline T *SET_DELETED(T *ptr) {
99   intptr_t i = reinterpret_cast<intptr_t>(ptr);
100   i |= 1;
101   return reinterpret_cast<T *>(i);
102 }
103 
104 /*
105   DESCRIPTION
106     Search for hashnr/key/keylen in the list starting from 'head' and
107     position the cursor. The list is ORDER BY hashnr, key
108 
109   RETURN
110     0 - not found
111     1 - found
112 
113   NOTE
114     cursor is positioned in either case
115     pins[0..2] are used, they are NOT removed on return
116 */
my_lfind(std::atomic<LF_SLIST * > * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,size_t keylen,CURSOR * cursor,LF_PINS * pins)117 static int my_lfind(std::atomic<LF_SLIST *> *head, CHARSET_INFO *cs,
118                     uint32 hashnr, const uchar *key, size_t keylen,
119                     CURSOR *cursor, LF_PINS *pins) {
120   uint32 cur_hashnr;
121   const uchar *cur_key;
122   size_t cur_keylen;
123   LF_SLIST *link;
124 
125 retry:
126   cursor->prev = head;
127   do /* PTR() isn't necessary below, head is a dummy node */
128   {
129     cursor->curr = (LF_SLIST *)(*cursor->prev);
130     lf_pin(pins, 1, cursor->curr);
131   } while (*cursor->prev != cursor->curr && LF_BACKOFF);
132   for (;;) {
133     if (unlikely(!cursor->curr)) {
134       return 0; /* end of the list */
135     }
136     do {
137       /* QQ: XXX or goto retry ? */
138       link = cursor->curr->link.load();
139       cursor->next = PTR(link);
140       lf_pin(pins, 0, cursor->next);
141     } while (link != cursor->curr->link && LF_BACKOFF);
142     cur_hashnr = cursor->curr->hashnr;
143     cur_key = cursor->curr->key;
144     cur_keylen = cursor->curr->keylen;
145     if (*cursor->prev != cursor->curr) {
146       (void)LF_BACKOFF;
147       goto retry;
148     }
149     if (!DELETED(link)) {
150       if (cur_hashnr >= hashnr) {
151         int r = 1;
152         if (cur_hashnr > hashnr ||
153             (r = my_strnncoll(cs, cur_key, cur_keylen, key, keylen)) >= 0) {
154           return !r;
155         }
156       }
157       cursor->prev = &(cursor->curr->link);
158       lf_pin(pins, 2, cursor->curr);
159     } else {
160       /*
161         we found a deleted node - be nice, help the other thread
162         and remove this deleted node
163       */
164       if (atomic_compare_exchange_strong(cursor->prev, &cursor->curr,
165                                          cursor->next)) {
166         lf_pinbox_free(pins, cursor->curr);
167       } else {
168         (void)LF_BACKOFF;
169         goto retry;
170       }
171     }
172     cursor->curr = cursor->next;
173     lf_pin(pins, 1, cursor->curr);
174   }
175 }
176 
177 /**
178   Search for list element satisfying condition specified by match
179   function and position cursor on it.
180 
181   @param head          Head of the list to search in.
182   @param first_hashnr  Hash value to start search from.
183   @param last_hashnr   Hash value to stop search after.
184   @param match         Match function.
185   @param cursor        Cursor to be position.
186   @param pins          LF_PINS for the calling thread to be used during
187                        search for pinning result.
188 
189   @retval 0 - not found
190   @retval 1 - found
191 */
192 
my_lfind_match(std::atomic<LF_SLIST * > * head,uint32 first_hashnr,uint32 last_hashnr,lf_hash_match_func * match,CURSOR * cursor,LF_PINS * pins)193 static int my_lfind_match(std::atomic<LF_SLIST *> *head, uint32 first_hashnr,
194                           uint32 last_hashnr, lf_hash_match_func *match,
195                           CURSOR *cursor, LF_PINS *pins) {
196   uint32 cur_hashnr;
197   LF_SLIST *link;
198 
199 retry:
200   cursor->prev = head;
201   do /* PTR() isn't necessary below, head is a dummy node */
202   {
203     cursor->curr = (LF_SLIST *)(*cursor->prev);
204     lf_pin(pins, 1, cursor->curr);
205   } while (*cursor->prev != cursor->curr && LF_BACKOFF);
206   for (;;) {
207     if (unlikely(!cursor->curr)) {
208       return 0; /* end of the list */
209     }
210     do {
211       /* QQ: XXX or goto retry ? */
212       link = cursor->curr->link.load();
213       cursor->next = PTR(link);
214       lf_pin(pins, 0, cursor->next);
215     } while (link != cursor->curr->link && LF_BACKOFF);
216     cur_hashnr = cursor->curr->hashnr;
217     if (*cursor->prev != cursor->curr) {
218       (void)LF_BACKOFF;
219       goto retry;
220     }
221     if (!DELETED(link)) {
222       if (cur_hashnr >= first_hashnr) {
223         if (cur_hashnr > last_hashnr) {
224           return 0;
225         }
226 
227         if (cur_hashnr & 1) {
228           /* Normal node. Check if element matches condition. */
229           if ((*match)((uchar *)(cursor->curr + 1))) {
230             return 1;
231           }
232         } else {
233           /*
234             Dummy node. Nothing to check here.
235 
236             Still thanks to the fact that dummy nodes are never deleted we
237             can save it as a safe place to restart iteration if ever needed.
238           */
239           head = &cursor->curr->link;
240         }
241       }
242 
243       cursor->prev = &(cursor->curr->link);
244       lf_pin(pins, 2, cursor->curr);
245     } else {
246       /*
247         we found a deleted node - be nice, help the other thread
248         and remove this deleted node
249       */
250       if (atomic_compare_exchange_strong(cursor->prev, &cursor->curr,
251                                          cursor->next)) {
252         lf_pinbox_free(pins, cursor->curr);
253       } else {
254         (void)LF_BACKOFF;
255         goto retry;
256       }
257     }
258     cursor->curr = cursor->next;
259     lf_pin(pins, 1, cursor->curr);
260   }
261 }
262 
263 /*
264   DESCRIPTION
265     insert a 'node' in the list that starts from 'head' in the correct
266     position (as found by my_lfind)
267 
268   RETURN
269     0     - inserted
270     not 0 - a pointer to a duplicate (not pinned and thus unusable)
271 
272   NOTE
273     it uses pins[0..2], on return all pins are removed.
274     if there're nodes with the same key value, a new node is added before them.
275 */
linsert(std::atomic<LF_SLIST * > * head,CHARSET_INFO * cs,LF_SLIST * node,LF_PINS * pins,uint flags)276 static LF_SLIST *linsert(std::atomic<LF_SLIST *> *head, CHARSET_INFO *cs,
277                          LF_SLIST *node, LF_PINS *pins, uint flags) {
278   CURSOR cursor;
279   int res;
280 
281   for (;;) {
282     if (my_lfind(head, cs, node->hashnr, node->key, node->keylen, &cursor,
283                  pins) &&
284         (flags & LF_HASH_UNIQUE)) {
285       res = 0; /* duplicate found */
286       break;
287     } else {
288       node->link = cursor.curr;
289       DBUG_ASSERT(node->link != node);         /* no circular references */
290       DBUG_ASSERT(cursor.prev != &node->link); /* no circular references */
291       if (atomic_compare_exchange_strong(cursor.prev, &cursor.curr, node)) {
292         res = 1; /* inserted ok */
293         break;
294       }
295     }
296   }
297   lf_unpin(pins, 0);
298   lf_unpin(pins, 1);
299   lf_unpin(pins, 2);
300   /*
301     Note that cursor.curr is not pinned here and the pointer is unreliable,
302     the object may dissapear anytime. But if it points to a dummy node, the
303     pointer is safe, because dummy nodes are never freed - initialize_bucket()
304     uses this fact.
305   */
306   return res ? nullptr : cursor.curr;
307 }
308 
309 /*
310   DESCRIPTION
311     deletes a node as identified by hashnr/keey/keylen from the list
312     that starts from 'head'
313 
314   RETURN
315     0 - ok
316     1 - not found
317 
318   NOTE
319     it uses pins[0..2], on return all pins are removed.
320 */
ldelete(std::atomic<LF_SLIST * > * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,LF_PINS * pins)321 static int ldelete(std::atomic<LF_SLIST *> *head, CHARSET_INFO *cs,
322                    uint32 hashnr, const uchar *key, uint keylen,
323                    LF_PINS *pins) {
324   CURSOR cursor;
325   int res;
326 
327   for (;;) {
328     if (!my_lfind(head, cs, hashnr, key, keylen, &cursor, pins)) {
329       res = 1; /* not found */
330       break;
331     } else {
332       /* mark the node deleted */
333       if (atomic_compare_exchange_strong(&cursor.curr->link, &cursor.next,
334                                          SET_DELETED(cursor.next))) {
335         /* and remove it from the list */
336         if (atomic_compare_exchange_strong(cursor.prev, &cursor.curr,
337                                            cursor.next)) {
338           lf_pinbox_free(pins, cursor.curr);
339         } else {
340           /*
341             somebody already "helped" us and removed the node ?
342             Let's check if we need to help that someone too!
343             (to ensure the number of "set DELETED flag" actions
344             is equal to the number of "remove from the list" actions)
345           */
346           my_lfind(head, cs, hashnr, key, keylen, &cursor, pins);
347         }
348         res = 0;
349         break;
350       }
351     }
352   }
353   lf_unpin(pins, 0);
354   lf_unpin(pins, 1);
355   lf_unpin(pins, 2);
356   return res;
357 }
358 
359 /*
360   DESCRIPTION
361     searches for a node as identified by hashnr/keey/keylen in the list
362     that starts from 'head'
363 
364   RETURN
365     0 - not found
366     node - found
367 
368   NOTE
369     it uses pins[0..2], on return the pin[2] keeps the node found
370     all other pins are removed.
371 */
my_lsearch(std::atomic<LF_SLIST * > * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,LF_PINS * pins)372 static LF_SLIST *my_lsearch(std::atomic<LF_SLIST *> *head, CHARSET_INFO *cs,
373                             uint32 hashnr, const uchar *key, uint keylen,
374                             LF_PINS *pins) {
375   CURSOR cursor;
376   int res = my_lfind(head, cs, hashnr, key, keylen, &cursor, pins);
377   if (res) {
378     lf_pin(pins, 2, cursor.curr);
379   }
380   lf_unpin(pins, 0);
381   lf_unpin(pins, 1);
382   return res ? cursor.curr : nullptr;
383 }
384 
hash_key(const LF_HASH * hash,const uchar * record,size_t * length)385 static inline const uchar *hash_key(const LF_HASH *hash, const uchar *record,
386                                     size_t *length) {
387   if (hash->get_key) {
388     return (*hash->get_key)(record, length);
389   }
390   *length = hash->key_length;
391   return record + hash->key_offset;
392 }
393 
394 /*
395   Compute the hash key value from the raw key.
396 
397   @note, that the hash value is limited to 2^31, because we need one
398   bit to distinguish between normal and dummy nodes.
399 */
calc_hash(LF_HASH * hash,const uchar * key,size_t keylen)400 static inline uint calc_hash(LF_HASH *hash, const uchar *key, size_t keylen) {
401   return (hash->hash_function(hash, key, keylen)) & INT_MAX32;
402 }
403 
404 #define MAX_LOAD 1.0 /* average number of elements in a bucket */
405 
406 static int initialize_bucket(LF_HASH *, std::atomic<LF_SLIST *> *, uint,
407                              LF_PINS *);
408 
409 /**
410   Adaptor function which allows to use hash function from character
411   set with LF_HASH.
412 */
cset_hash_sort_adapter(const LF_HASH * hash,const uchar * key,size_t length)413 static uint cset_hash_sort_adapter(const LF_HASH *hash, const uchar *key,
414                                    size_t length) {
415   uint64 nr1 = 1, nr2 = 4;
416   hash->charset->coll->hash_sort(hash->charset, key, length, &nr1, &nr2);
417   return (uint)nr1;
418 }
419 
420 /*
421   Initializes lf_hash, the arguments are compatible with hash_init
422 
423   @note element_size sets both the size of allocated memory block for
424   lf_alloc and a size of memcpy'ed block size in lf_hash_insert. Typically
425   they are the same, indeed. But LF_HASH::element_size can be decreased
426   after lf_hash_init, and then lf_alloc will allocate larger block that
427   lf_hash_insert will copy over. It is desireable if part of the element
428   is expensive to initialize - for example if there is a mutex or
429   DYNAMIC_ARRAY. In this case they should be initialize in the
430   LF_ALLOCATOR::constructor, and lf_hash_insert should not overwrite them.
431   See wt_init() for example.
432   As an alternative to using the above trick with decreasing
433   LF_HASH::element_size one can provide an "initialize" hook that will finish
434   initialization of object provided by LF_ALLOCATOR and set element key from
435   object passed as parameter to lf_hash_insert instead of doing simple memcpy.
436 */
lf_hash_init2(LF_HASH * hash,uint element_size,uint flags,uint key_offset,uint key_length,hash_get_key_function get_key,CHARSET_INFO * charset,lf_hash_func * hash_function,lf_allocator_func * ctor,lf_allocator_func * dtor,lf_hash_init_func * init)437 void lf_hash_init2(LF_HASH *hash, uint element_size, uint flags,
438                    uint key_offset, uint key_length,
439                    hash_get_key_function get_key, CHARSET_INFO *charset,
440                    lf_hash_func *hash_function, lf_allocator_func *ctor,
441                    lf_allocator_func *dtor, lf_hash_init_func *init) {
442   lf_alloc_init2(&hash->alloc, sizeof(LF_SLIST) + element_size,
443                  offsetof(LF_SLIST, key), ctor, dtor);
444   lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
445   hash->size = 1;
446   hash->count = 0;
447   hash->element_size = element_size;
448   hash->flags = flags;
449   hash->charset = charset ? charset : &my_charset_bin;
450   hash->key_offset = key_offset;
451   hash->key_length = key_length;
452   hash->get_key = get_key;
453   hash->hash_function = hash_function ? hash_function : cset_hash_sort_adapter;
454   hash->initialize = init;
455   DBUG_ASSERT(get_key ? !key_offset && !key_length : key_length);
456 }
457 
lf_hash_destroy(LF_HASH * hash)458 void lf_hash_destroy(LF_HASH *hash) {
459   LF_SLIST *el, **head = (LF_SLIST **)lf_dynarray_value(&hash->array, 0);
460 
461   if (unlikely(!head)) {
462     lf_alloc_destroy(&hash->alloc);
463     return;
464   }
465   el = *head;
466 
467   while (el) {
468     LF_SLIST *next = el->link;
469     if (el->hashnr & 1) {
470       lf_alloc_direct_free(&hash->alloc, el); /* normal node */
471     } else {
472       my_free(el); /* dummy node */
473     }
474     el = (LF_SLIST *)next;
475   }
476   lf_alloc_destroy(&hash->alloc);
477   lf_dynarray_destroy(&hash->array);
478 }
479 
480 /*
481   DESCRIPTION
482     inserts a new element to a hash. it will have a _copy_ of
483     data, not a pointer to it.
484 
485   RETURN
486     0 - inserted
487     1 - didn't (unique key conflict)
488    -1 - out of memory
489 
490   NOTE
491     see linsert() for pin usage notes
492 */
lf_hash_insert(LF_HASH * hash,LF_PINS * pins,const void * data)493 int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data) {
494   int csize, bucket, hashnr;
495   LF_SLIST *node;
496   std::atomic<LF_SLIST *> *el;
497 
498   node = (LF_SLIST *)lf_alloc_new(pins);
499   if (unlikely(!node)) {
500     return -1;
501   }
502   uchar *extra_data =
503       (uchar *)(node + 1);  // Stored immediately after the node.
504   if (hash->initialize) {
505     (*hash->initialize)(extra_data, (const uchar *)data);
506   } else {
507     memcpy(extra_data, data, hash->element_size);
508   }
509   node->key = hash_key(hash, (uchar *)(node + 1), &node->keylen);
510   hashnr = calc_hash(hash, node->key, node->keylen);
511   bucket = hashnr % hash->size;
512   el = static_cast<std::atomic<LF_SLIST *> *>(
513       lf_dynarray_lvalue(&hash->array, bucket));
514   if (unlikely(!el)) {
515     lf_pinbox_free(pins, node);
516     return -1;
517   }
518   if (el->load() == nullptr &&
519       unlikely(initialize_bucket(hash, el, bucket, pins))) {
520     lf_pinbox_free(pins, node);
521     return -1;
522   }
523   node->hashnr = my_reverse_bits(hashnr) | 1; /* normal node */
524   if (linsert(el, hash->charset, node, pins, hash->flags)) {
525     lf_pinbox_free(pins, node);
526     return 1;
527   }
528   csize = hash->size;
529   if ((hash->count.fetch_add(1) + 1.0) / csize > MAX_LOAD) {
530     atomic_compare_exchange_strong(&hash->size, &csize, csize * 2);
531   }
532   return 0;
533 }
534 
535 /*
536   DESCRIPTION
537     deletes an element with the given key from the hash (if a hash is
538     not unique and there're many elements with this key - the "first"
539     matching element is deleted)
540   RETURN
541     0 - deleted
542     1 - didn't (not found)
543    -1 - out of memory
544   NOTE
545     see ldelete() for pin usage notes
546 */
lf_hash_delete(LF_HASH * hash,LF_PINS * pins,const void * key,uint keylen)547 int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) {
548   std::atomic<LF_SLIST *> *el;
549   uint bucket,
550       hashnr = calc_hash(hash, pointer_cast<const uchar *>(key), keylen);
551 
552   bucket = hashnr % hash->size;
553   el = static_cast<std::atomic<LF_SLIST *> *>(
554       lf_dynarray_lvalue(&hash->array, bucket));
555   if (unlikely(!el)) {
556     return -1;
557   }
558   /*
559     note that we still need to initialize_bucket here,
560     we cannot return "node not found", because an old bucket of that
561     node may've been split and the node was assigned to a new bucket
562     that was never accessed before and thus is not initialized.
563   */
564   if (el->load() == nullptr &&
565       unlikely(initialize_bucket(hash, el, bucket, pins))) {
566     return -1;
567   }
568   if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
569               pointer_cast<const uchar *>(key), keylen, pins)) {
570     return 1;
571   }
572   --hash->count;
573   return 0;
574 }
575 
576 /**
577   Find hash element corresponding to the key.
578 
579   @param hash    The hash to search element in.
580   @param pins    Pins for the calling thread which were earlier
581                  obtained from this hash using lf_hash_get_pins().
582   @param key     Key
583   @param keylen  Key length
584 
585   @retval A pointer to an element with the given key (if a hash is not unique
586           and there're many elements with this key - the "first" matching
587           element).
588   @retval NULL         - if nothing is found
589   @retval MY_LF_ERRPTR - if OOM
590 
591   @note Uses pins[0..2]. On return pins[0..1] are removed and pins[2]
592         is used to pin object found. It is also not removed in case when
593         object is not found/error occurs but pin value is undefined in
594         this case.
595         So calling lf_hash_unpin() is mandatory after call to this function
596         in case of both success and failure.
597         @sa my_lsearch().
598 */
599 
lf_hash_search(LF_HASH * hash,LF_PINS * pins,const void * key,uint keylen)600 void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key,
601                      uint keylen) {
602   std::atomic<LF_SLIST *> *el;
603   LF_SLIST *found;
604   uint bucket,
605       hashnr = calc_hash(hash, pointer_cast<const uchar *>(key), keylen);
606 
607   bucket = hashnr % hash->size;
608   el = static_cast<std::atomic<LF_SLIST *> *>(
609       lf_dynarray_lvalue(&hash->array, bucket));
610   if (unlikely(!el)) {
611     return MY_LF_ERRPTR;
612   }
613   if (el->load() == nullptr &&
614       unlikely(initialize_bucket(hash, el, bucket, pins))) {
615     return MY_LF_ERRPTR;
616   }
617   found = my_lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
618                      pointer_cast<const uchar *>(key), keylen, pins);
619   return found ? found + 1 : nullptr;
620 }
621 
622 /**
623   Find random hash element which satisfies condition specified by
624   match function.
625 
626   @param hash      Hash to search element in.
627   @param pins      Pins for calling thread to be used during search
628                    and for pinning its result.
629   @param match     Pointer to match function. This function takes
630                    pointer to object stored in hash as parameter
631                    and returns 0 if object doesn't satisfy its
632                    condition (and non-0 value if it does).
633   @param rand_val  Random value to be used for selecting hash
634                    bucket from which search in sort-ordered
635                    list needs to be started.
636 
637   @retval A pointer to a random element matching condition.
638   @retval NULL         - if nothing is found
639   @retval MY_LF_ERRPTR - OOM.
640 
641   @note This function follows the same pinning protocol as lf_hash_search(),
642         i.e. uses pins[0..2]. On return pins[0..1] are removed and pins[2]
643         is used to pin object found. It is also not removed in case when
644         object is not found/error occurs but its value is undefined in
645         this case.
646         So calling lf_hash_unpin() is mandatory after call to this function
647         in case of both success and failure.
648 */
649 
lf_hash_random_match(LF_HASH * hash,LF_PINS * pins,lf_hash_match_func * match,uint rand_val)650 void *lf_hash_random_match(LF_HASH *hash, LF_PINS *pins,
651                            lf_hash_match_func *match, uint rand_val) {
652   /* Convert random value to valid hash value. */
653   uint hashnr = (rand_val & INT_MAX32);
654   uint bucket;
655   uint32 rev_hashnr;
656   std::atomic<LF_SLIST *> *el;
657   CURSOR cursor;
658   int res;
659 
660   bucket = hashnr % hash->size;
661   rev_hashnr = my_reverse_bits(hashnr);
662 
663   el = static_cast<std::atomic<LF_SLIST *> *>(
664       lf_dynarray_lvalue(&hash->array, bucket));
665   if (unlikely(!el)) {
666     return MY_LF_ERRPTR;
667   }
668   /*
669     Bucket might be totally empty if it has not been accessed since last
670     time LF_HASH::size has been increased. In this case we initialize it
671     by inserting dummy node for this bucket to the correct position in
672     split-ordered list. This should help future lf_hash_* calls trying to
673     access the same bucket.
674   */
675   if (el->load() == nullptr &&
676       unlikely(initialize_bucket(hash, el, bucket, pins))) {
677     return MY_LF_ERRPTR;
678   }
679 
680   /*
681     To avoid bias towards the first matching element in the bucket, we start
682     looking for elements with inversed hash value greater or equal than
683     inversed value of our random hash.
684   */
685   res = my_lfind_match(el, rev_hashnr | 1, UINT_MAX32, match, &cursor, pins);
686 
687   if (!res && hashnr != 0) {
688     /*
689       We have not found matching element - probably we were too close to
690       the tail of our split-ordered list. To avoid bias against elements
691       at the head of the list we restart our search from its head. Unless
692       we were already searching from it.
693 
694       To avoid going through elements at which we have already looked
695       twice we stop once we reach element from which we have begun our
696       first search.
697     */
698     el = static_cast<std::atomic<LF_SLIST *> *>(
699         lf_dynarray_lvalue(&hash->array, 0));
700     if (unlikely(!el)) {
701       return MY_LF_ERRPTR;
702     }
703     res = my_lfind_match(el, 1, rev_hashnr, match, &cursor, pins);
704   }
705 
706   if (res) {
707     lf_pin(pins, 2, cursor.curr);
708   }
709   lf_unpin(pins, 0);
710   lf_unpin(pins, 1);
711 
712   return res ? cursor.curr + 1 : nullptr;
713 }
714 
715 static const uchar *dummy_key = pointer_cast<const uchar *>("");
716 
717 /*
718   RETURN
719     0 - ok
720    -1 - out of memory
721 */
initialize_bucket(LF_HASH * hash,std::atomic<LF_SLIST * > * node,uint bucket,LF_PINS * pins)722 static int initialize_bucket(LF_HASH *hash, std::atomic<LF_SLIST *> *node,
723                              uint bucket, LF_PINS *pins) {
724   uint parent = my_clear_highest_bit(bucket);
725   LF_SLIST *dummy =
726       (LF_SLIST *)my_malloc(key_memory_lf_slist, sizeof(LF_SLIST), MYF(MY_WME));
727   if (unlikely(!dummy)) {
728     return -1;
729   }
730   LF_SLIST *tmp = nullptr, *cur;
731   std::atomic<LF_SLIST *> *el = static_cast<std::atomic<LF_SLIST *> *>(
732       lf_dynarray_lvalue(&hash->array, parent));
733   if (unlikely(!el)) {
734     my_free(dummy);
735     return -1;
736   }
737   if (el->load() == nullptr && bucket &&
738       unlikely(initialize_bucket(hash, el, parent, pins))) {
739     my_free(dummy);
740     return -1;
741   }
742   dummy->hashnr = my_reverse_bits(bucket) | 0; /* dummy node */
743   dummy->key = dummy_key;
744   dummy->keylen = 0;
745   if ((cur = linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE))) {
746     my_free(dummy);
747     dummy = cur;
748   }
749   atomic_compare_exchange_strong(node, &tmp, dummy);
750   /*
751     note that if the CAS above failed (after linsert() succeeded),
752     it would mean that some other thread has executed linsert() for
753     the same dummy node, its linsert() failed, it picked up our
754     dummy node (in "dummy= cur") and executed the same CAS as above.
755     Which means that even if CAS above failed we don't need to retry,
756     and we should not free(dummy) - there's no memory leak here
757   */
758   return 0;
759 }
760