1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    Without limiting anything contained in the foregoing, this file,
15    which is part of C Driver for MySQL (Connector/C), is also subject to the
16    Universal FOSS Exception, version 1.0, a copy of which can be found at
17    http://oss.oracle.com/licenses/universal-foss-exception.
18 
19    This program is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22    GNU General Public License, version 2.0, for more details.
23 
24    You should have received a copy of the GNU General Public License
25    along with this program; if not, write to the Free Software
26    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
27 
28 /*
29   extensible hash
30 
31   TODO
32      try to get rid of dummy nodes ?
33      for non-unique hash, count only _distinct_ values
34      (but how to do it in lf_hash_delete ?)
35 */
36 #include <my_global.h>
37 #include <m_string.h>
38 #include <my_sys.h>
39 #include <mysys_priv.h>
40 #include <my_bit.h>
41 #include <lf.h>
42 
43 LF_REQUIRE_PINS(3)
44 
45 /* An element of the list */
46 typedef struct {
47   intptr volatile link; /* a pointer to the next element in a listand a flag */
48   uint32 hashnr;        /* reversed hash number, for sorting                 */
49   const uchar *key;
50   size_t keylen;
51   /*
52     data is stored here, directly after the keylen.
53     thus the pointer to data is (void*)(slist_element_ptr+1)
54   */
55 } LF_SLIST;
56 
57 const int LF_HASH_OVERHEAD= sizeof(LF_SLIST);
58 
59 /*
60   a structure to pass the context (pointers two the three successive elements
61   in a list) from my_lfind to linsert/ldelete
62 */
63 typedef struct {
64   intptr volatile *prev;
65   LF_SLIST *curr, *next;
66 } CURSOR;
67 
68 /*
69   the last bit in LF_SLIST::link is a "deleted" flag.
70   the helper macros below convert it to a pure pointer or a pure flag
71 */
72 #define PTR(V)      (LF_SLIST *)((V) & (~(intptr)1))
73 #define DELETED(V)  ((V) & 1)
74 
75 /*
76   DESCRIPTION
77     Search for hashnr/key/keylen in the list starting from 'head' and
78     position the cursor. The list is ORDER BY hashnr, key
79 
80   RETURN
81     0 - not found
82     1 - found
83 
84   NOTE
85     cursor is positioned in either case
86     pins[0..2] are used, they are NOT removed on return
87 */
my_lfind(LF_SLIST * volatile * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,size_t keylen,CURSOR * cursor,LF_PINS * pins)88 static int my_lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
89                     const uchar *key, size_t keylen, CURSOR *cursor, LF_PINS *pins)
90 {
91   uint32       cur_hashnr;
92   const uchar  *cur_key;
93   size_t       cur_keylen;
94   intptr       link;
95 
96 retry:
97   cursor->prev= (intptr *)head;
98   do { /* PTR() isn't necessary below, head is a dummy node */
99     cursor->curr= (LF_SLIST *)(*cursor->prev);
100     lf_pin(pins, 1, cursor->curr);
101   } while (my_atomic_loadptr((void**)cursor->prev) != cursor->curr &&
102                               LF_BACKOFF);
103   for (;;)
104   {
105     if (unlikely(!cursor->curr))
106       return 0; /* end of the list */
107     do {
108       /* QQ: XXX or goto retry ? */
109       link= cursor->curr->link;
110       cursor->next= PTR(link);
111       lf_pin(pins, 0, cursor->next);
112     } while (link != cursor->curr->link && LF_BACKOFF);
113     cur_hashnr= cursor->curr->hashnr;
114     cur_key= cursor->curr->key;
115     cur_keylen= cursor->curr->keylen;
116     if (my_atomic_loadptr((void**)cursor->prev) != cursor->curr)
117     {
118       (void)LF_BACKOFF;
119       goto retry;
120     }
121     if (!DELETED(link))
122     {
123       if (cur_hashnr >= hashnr)
124       {
125         int r= 1;
126         if (cur_hashnr > hashnr ||
127             (r= my_strnncoll(cs, (uchar*) cur_key, cur_keylen, (uchar*) key,
128                              keylen)) >= 0)
129           return !r;
130       }
131       cursor->prev= &(cursor->curr->link);
132       lf_pin(pins, 2, cursor->curr);
133     }
134     else
135     {
136       /*
137         we found a deleted node - be nice, help the other thread
138         and remove this deleted node
139       */
140       if (my_atomic_casptr((void **)cursor->prev,
141                            (void **)&cursor->curr, cursor->next))
142         lf_pinbox_free(pins, cursor->curr);
143       else
144       {
145         (void)LF_BACKOFF;
146         goto retry;
147       }
148     }
149     cursor->curr= cursor->next;
150     lf_pin(pins, 1, cursor->curr);
151   }
152 }
153 
154 
155 /**
156   Search for list element satisfying condition specified by match
157   function and position cursor on it.
158 
159   @param head          Head of the list to search in.
160   @param first_hashnr  Hash value to start search from.
161   @param last_hashnr   Hash value to stop search after.
162   @param match         Match function.
163   @param cursor        Cursor to be position.
164   @param pins          LF_PINS for the calling thread to be used during
165                        search for pinning result.
166 
167   @retval 0 - not found
168   @retval 1 - found
169 */
170 
my_lfind_match(LF_SLIST * volatile * head,uint32 first_hashnr,uint32 last_hashnr,lf_hash_match_func * match,CURSOR * cursor,LF_PINS * pins)171 static int my_lfind_match(LF_SLIST * volatile *head,
172                           uint32 first_hashnr, uint32 last_hashnr,
173                           lf_hash_match_func *match,
174                           CURSOR *cursor, LF_PINS *pins)
175 {
176   uint32       cur_hashnr;
177   intptr       link;
178 
179 retry:
180   cursor->prev= (intptr *)head;
181   do { /* PTR() isn't necessary below, head is a dummy node */
182     cursor->curr= (LF_SLIST *)(*cursor->prev);
183     lf_pin(pins, 1, cursor->curr);
184   } while (my_atomic_loadptr((void**)cursor->prev) != cursor->curr &&
185                               LF_BACKOFF);
186   for (;;)
187   {
188     if (unlikely(!cursor->curr))
189       return 0; /* end of the list */
190     do {
191       /* QQ: XXX or goto retry ? */
192       link= cursor->curr->link;
193       cursor->next= PTR(link);
194       lf_pin(pins, 0, cursor->next);
195     } while (link != cursor->curr->link && LF_BACKOFF);
196     cur_hashnr= cursor->curr->hashnr;
197     if (my_atomic_loadptr((void**)cursor->prev) != cursor->curr)
198     {
199       (void)LF_BACKOFF;
200       goto retry;
201     }
202     if (!DELETED(link))
203     {
204       if (cur_hashnr >= first_hashnr)
205       {
206         if (cur_hashnr > last_hashnr)
207           return 0;
208 
209         if (cur_hashnr & 1)
210         {
211           /* Normal node. Check if element matches condition. */
212           if ((*match)((uchar *)(cursor->curr + 1)))
213             return 1;
214         }
215         else
216         {
217           /*
218             Dummy node. Nothing to check here.
219 
220             Still thanks to the fact that dummy nodes are never deleted we
221             can save it as a safe place to restart iteration if ever needed.
222           */
223           head= (LF_SLIST * volatile *)&(cursor->curr->link);
224         }
225       }
226 
227       cursor->prev= &(cursor->curr->link);
228       lf_pin(pins, 2, cursor->curr);
229     }
230     else
231     {
232       /*
233         we found a deleted node - be nice, help the other thread
234         and remove this deleted node
235       */
236       if (my_atomic_casptr((void **)cursor->prev,
237                            (void **)&cursor->curr, cursor->next))
238         lf_pinbox_free(pins, cursor->curr);
239       else
240       {
241         (void)LF_BACKOFF;
242         goto retry;
243       }
244     }
245     cursor->curr= cursor->next;
246     lf_pin(pins, 1, cursor->curr);
247   }
248 }
249 
250 
251 /*
252   DESCRIPTION
253     insert a 'node' in the list that starts from 'head' in the correct
254     position (as found by my_lfind)
255 
256   RETURN
257     0     - inserted
258     not 0 - a pointer to a duplicate (not pinned and thus unusable)
259 
260   NOTE
261     it uses pins[0..2], on return all pins are removed.
262     if there're nodes with the same key value, a new node is added before them.
263 */
linsert(LF_SLIST * volatile * head,CHARSET_INFO * cs,LF_SLIST * node,LF_PINS * pins,uint flags)264 static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs,
265                          LF_SLIST *node, LF_PINS *pins, uint flags)
266 {
267   CURSOR         cursor;
268   int            res;
269 
270   for (;;)
271   {
272     if (my_lfind(head, cs, node->hashnr, node->key, node->keylen,
273               &cursor, pins) &&
274         (flags & LF_HASH_UNIQUE))
275     {
276       res= 0; /* duplicate found */
277       break;
278     }
279     else
280     {
281       node->link= (intptr)cursor.curr;
282       assert(node->link != (intptr)node); /* no circular references */
283       assert(cursor.prev != &node->link); /* no circular references */
284       if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
285       {
286         res= 1; /* inserted ok */
287         break;
288       }
289     }
290   }
291   lf_unpin(pins, 0);
292   lf_unpin(pins, 1);
293   lf_unpin(pins, 2);
294   /*
295     Note that cursor.curr is not pinned here and the pointer is unreliable,
296     the object may dissapear anytime. But if it points to a dummy node, the
297     pointer is safe, because dummy nodes are never freed - initialize_bucket()
298     uses this fact.
299   */
300   return res ? 0 : cursor.curr;
301 }
302 
303 /*
304   DESCRIPTION
305     deletes a node as identified by hashnr/keey/keylen from the list
306     that starts from 'head'
307 
308   RETURN
309     0 - ok
310     1 - not found
311 
312   NOTE
313     it uses pins[0..2], on return all pins are removed.
314 */
ldelete(LF_SLIST * volatile * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,LF_PINS * pins)315 static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
316                    const uchar *key, uint keylen, LF_PINS *pins)
317 {
318   CURSOR cursor;
319   int res;
320 
321   for (;;)
322   {
323     if (!my_lfind(head, cs, hashnr, key, keylen, &cursor, pins))
324     {
325       res= 1; /* not found */
326       break;
327     }
328     else
329     {
330       /* mark the node deleted */
331       if (my_atomic_casptr((void **)&(cursor.curr->link),
332                            (void **)&cursor.next,
333                            (void *)(((intptr)cursor.next) | 1)))
334       {
335         /* and remove it from the list */
336         if (my_atomic_casptr((void **)cursor.prev,
337                              (void **)&cursor.curr, cursor.next))
338           lf_pinbox_free(pins, cursor.curr);
339         else
340         {
341           /*
342             somebody already "helped" us and removed the node ?
343             Let's check if we need to help that someone too!
344             (to ensure the number of "set DELETED flag" actions
345             is equal to the number of "remove from the list" actions)
346           */
347           my_lfind(head, cs, hashnr, key, keylen, &cursor, pins);
348         }
349         res= 0;
350         break;
351       }
352     }
353   }
354   lf_unpin(pins, 0);
355   lf_unpin(pins, 1);
356   lf_unpin(pins, 2);
357   return res;
358 }
359 
360 /*
361   DESCRIPTION
362     searches for a node as identified by hashnr/keey/keylen in the list
363     that starts from 'head'
364 
365   RETURN
366     0 - not found
367     node - found
368 
369   NOTE
370     it uses pins[0..2], on return the pin[2] keeps the node found
371     all other pins are removed.
372 */
my_lsearch(LF_SLIST * volatile * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,LF_PINS * pins)373 static LF_SLIST *my_lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs,
374                             uint32 hashnr, const uchar *key, uint keylen,
375                             LF_PINS *pins)
376 {
377   CURSOR cursor;
378   int res= my_lfind(head, cs, hashnr, key, keylen, &cursor, pins);
379   if (res)
380     lf_pin(pins, 2, cursor.curr);
381   lf_unpin(pins, 0);
382   lf_unpin(pins, 1);
383   return res ? cursor.curr : 0;
384 }
385 
hash_key(const LF_HASH * hash,const uchar * record,size_t * length)386 static inline const uchar* hash_key(const LF_HASH *hash,
387                                     const uchar *record, size_t *length)
388 {
389   if (hash->get_key)
390     return (*hash->get_key)(record, length, 0);
391   *length= hash->key_length;
392   return record + hash->key_offset;
393 }
394 
395 /*
396   Compute the hash key value from the raw key.
397 
398   @note, that the hash value is limited to 2^31, because we need one
399   bit to distinguish between normal and dummy nodes.
400 */
calc_hash(LF_HASH * hash,const uchar * key,size_t keylen)401 static inline uint calc_hash(LF_HASH *hash, const uchar *key, size_t keylen)
402 {
403   return (hash->hash_function(hash, key, keylen)) & INT_MAX32;
404 }
405 
406 #define MAX_LOAD 1.0    /* average number of elements in a bucket */
407 
408 static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
409 
410 
411 /**
412   Adaptor function which allows to use hash function from character
413   set with LF_HASH.
414 */
cset_hash_sort_adapter(const LF_HASH * hash,const uchar * key,size_t length)415 static uint cset_hash_sort_adapter(const LF_HASH *hash, const uchar *key,
416                                    size_t length)
417 {
418   ulong nr1=1, nr2=4;
419   hash->charset->coll->hash_sort(hash->charset, key, length, &nr1, &nr2);
420   return (uint)nr1;
421 }
422 
423 
424 /*
425   Initializes lf_hash, the arguments are compatible with hash_init
426 
427   @note element_size sets both the size of allocated memory block for
428   lf_alloc and a size of memcpy'ed block size in lf_hash_insert. Typically
429   they are the same, indeed. But LF_HASH::element_size can be decreased
430   after lf_hash_init, and then lf_alloc will allocate larger block that
431   lf_hash_insert will copy over. It is desireable if part of the element
432   is expensive to initialize - for example if there is a mutex or
433   DYNAMIC_ARRAY. In this case they should be initialize in the
434   LF_ALLOCATOR::constructor, and lf_hash_insert should not overwrite them.
435   See wt_init() for example.
436   As an alternative to using the above trick with decreasing
437   LF_HASH::element_size one can provide an "initialize" hook that will finish
438   initialization of object provided by LF_ALLOCATOR and set element key from
439   object passed as parameter to lf_hash_insert instead of doing simple memcpy.
440 */
lf_hash_init2(LF_HASH * hash,uint element_size,uint flags,uint key_offset,uint key_length,my_hash_get_key get_key,CHARSET_INFO * charset,lf_hash_func * hash_function,lf_allocator_func * ctor,lf_allocator_func * dtor,lf_hash_init_func * init)441 void lf_hash_init2(LF_HASH *hash, uint element_size, uint flags,
442                    uint key_offset, uint key_length, my_hash_get_key get_key,
443                    CHARSET_INFO *charset, lf_hash_func *hash_function,
444                    lf_allocator_func *ctor, lf_allocator_func *dtor,
445                    lf_hash_init_func *init)
446 {
447   lf_alloc_init2(&hash->alloc, sizeof(LF_SLIST)+element_size,
448                  offsetof(LF_SLIST, key), ctor, dtor);
449   lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
450   hash->size= 1;
451   hash->count= 0;
452   hash->element_size= element_size;
453   hash->flags= flags;
454   hash->charset= charset ? charset : &my_charset_bin;
455   hash->key_offset= key_offset;
456   hash->key_length= key_length;
457   hash->get_key= get_key;
458   hash->hash_function= hash_function ? hash_function : cset_hash_sort_adapter;
459   hash->initialize= init;
460   assert(get_key ? !key_offset && !key_length : key_length);
461 }
462 
lf_hash_destroy(LF_HASH * hash)463 void lf_hash_destroy(LF_HASH *hash)
464 {
465   LF_SLIST *el, **head= (LF_SLIST **)lf_dynarray_value(&hash->array, 0);
466 
467   if (unlikely(!head))
468     return;
469   el= *head;
470 
471   while (el)
472   {
473     intptr next= el->link;
474     if (el->hashnr & 1)
475       lf_alloc_direct_free(&hash->alloc, el); /* normal node */
476     else
477       my_free(el); /* dummy node */
478     el= (LF_SLIST *)next;
479   }
480   lf_alloc_destroy(&hash->alloc);
481   lf_dynarray_destroy(&hash->array);
482 }
483 
484 /*
485   DESCRIPTION
486     inserts a new element to a hash. it will have a _copy_ of
487     data, not a pointer to it.
488 
489   RETURN
490     0 - inserted
491     1 - didn't (unique key conflict)
492    -1 - out of memory
493 
494   NOTE
495     see linsert() for pin usage notes
496 */
lf_hash_insert(LF_HASH * hash,LF_PINS * pins,const void * data)497 int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
498 {
499   int csize, bucket, hashnr;
500   LF_SLIST *node, * volatile *el;
501 
502   node= (LF_SLIST *)lf_alloc_new(pins);
503   if (unlikely(!node))
504     return -1;
505   if (hash->initialize)
506     (*hash->initialize)((uchar*)(node + 1), (const uchar*)data);
507   else
508     memcpy(node+1, data, hash->element_size);
509   node->key= hash_key(hash, (uchar *)(node+1), &node->keylen);
510   hashnr= calc_hash(hash, node->key, node->keylen);
511   bucket= hashnr % hash->size;
512   el= lf_dynarray_lvalue(&hash->array, bucket);
513   if (unlikely(!el))
514     return -1;
515   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
516     return -1;
517   node->hashnr= my_reverse_bits(hashnr) | 1; /* normal node */
518   if (linsert(el, hash->charset, node, pins, hash->flags))
519   {
520     lf_pinbox_free(pins, node);
521     return 1;
522   }
523   csize= hash->size;
524   if ((my_atomic_add32(&hash->count, 1)+1.0) / csize > MAX_LOAD)
525     my_atomic_cas32(&hash->size, &csize, csize*2);
526   return 0;
527 }
528 
529 /*
530   DESCRIPTION
531     deletes an element with the given key from the hash (if a hash is
532     not unique and there're many elements with this key - the "first"
533     matching element is deleted)
534   RETURN
535     0 - deleted
536     1 - didn't (not found)
537    -1 - out of memory
538   NOTE
539     see ldelete() for pin usage notes
540 */
lf_hash_delete(LF_HASH * hash,LF_PINS * pins,const void * key,uint keylen)541 int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
542 {
543   LF_SLIST * volatile *el;
544   uint bucket, hashnr= calc_hash(hash, (uchar *)key, keylen);
545 
546   bucket= hashnr % hash->size;
547   el= lf_dynarray_lvalue(&hash->array, bucket);
548   if (unlikely(!el))
549     return -1;
550   /*
551     note that we still need to initialize_bucket here,
552     we cannot return "node not found", because an old bucket of that
553     node may've been split and the node was assigned to a new bucket
554     that was never accessed before and thus is not initialized.
555   */
556   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
557     return -1;
558   if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
559               (uchar *)key, keylen, pins))
560   {
561     return 1;
562   }
563   my_atomic_add32(&hash->count, -1);
564   return 0;
565 }
566 
567 
568 /**
569   Find hash element corresponding to the key.
570 
571   @param hash    The hash to search element in.
572   @param pins    Pins for the calling thread which were earlier
573                  obtained from this hash using lf_hash_get_pins().
574   @param key     Key
575   @param keylen  Key length
576 
577   @retval A pointer to an element with the given key (if a hash is not unique
578           and there're many elements with this key - the "first" matching
579           element).
580   @retval NULL      - if nothing is found
581   @retval MY_ERRPTR - if OOM
582 
583   @note Uses pins[0..2]. On return pins[0..1] are removed and pins[2]
584         is used to pin object found. It is also not removed in case when
585         object is not found/error occurs but pin value is undefined in
586         this case.
587         So calling lf_hash_unpin() is mandatory after call to this function
588         in case of both success and failure.
589         @sa my_lsearch().
590 */
591 
lf_hash_search(LF_HASH * hash,LF_PINS * pins,const void * key,uint keylen)592 void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
593 {
594   LF_SLIST * volatile *el, *found;
595   uint bucket, hashnr= calc_hash(hash, (uchar *)key, keylen);
596 
597   bucket= hashnr % hash->size;
598   el= lf_dynarray_lvalue(&hash->array, bucket);
599   if (unlikely(!el))
600     return MY_ERRPTR;
601   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
602     return MY_ERRPTR;
603   found= my_lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
604                     (uchar *)key, keylen, pins);
605   return found ? found+1 : 0;
606 }
607 
608 
609 /**
610   Find random hash element which satisfies condition specified by
611   match function.
612 
613   @param hash      Hash to search element in.
614   @param pin       Pins for calling thread to be used during search
615                    and for pinning its result.
616   @param match     Pointer to match function. This function takes
617                    pointer to object stored in hash as parameter
618                    and returns 0 if object doesn't satisfy its
619                    condition (and non-0 value if it does).
620   @param rand_val  Random value to be used for selecting hash
621                    bucket from which search in sort-ordered
622                    list needs to be started.
623 
624   @retval A pointer to a random element matching condition.
625   @retval NULL      - if nothing is found
626   @retval MY_ERRPTR - OOM.
627 
628   @note This function follows the same pinning protocol as lf_hash_search(),
629         i.e. uses pins[0..2]. On return pins[0..1] are removed and pins[2]
630         is used to pin object found. It is also not removed in case when
631         object is not found/error occurs but its value is undefined in
632         this case.
633         So calling lf_hash_unpin() is mandatory after call to this function
634         in case of both success and failure.
635 */
636 
lf_hash_random_match(LF_HASH * hash,LF_PINS * pins,lf_hash_match_func * match,uint rand_val)637 void *lf_hash_random_match(LF_HASH *hash, LF_PINS *pins,
638                            lf_hash_match_func *match,
639                            uint rand_val)
640 {
641   /* Convert random value to valid hash value. */
642   uint hashnr= (rand_val & INT_MAX32);
643   uint bucket;
644   uint32 rev_hashnr;
645   LF_SLIST * volatile *el;
646   CURSOR cursor;
647   int res;
648 
649   bucket= hashnr % hash->size;
650   rev_hashnr= my_reverse_bits(hashnr);
651 
652   el= lf_dynarray_lvalue(&hash->array, bucket);
653   if (unlikely(!el))
654     return MY_ERRPTR;
655   /*
656     Bucket might be totally empty if it has not been accessed since last
657     time LF_HASH::size has been increased. In this case we initialize it
658     by inserting dummy node for this bucket to the correct position in
659     split-ordered list. This should help future lf_hash_* calls trying to
660     access the same bucket.
661   */
662   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
663     return MY_ERRPTR;
664 
665   /*
666     To avoid bias towards the first matching element in the bucket, we start
667     looking for elements with inversed hash value greater or equal than
668     inversed value of our random hash.
669   */
670   res= my_lfind_match(el, rev_hashnr | 1, UINT_MAX32, match, &cursor, pins);
671 
672   if (! res && hashnr != 0)
673   {
674     /*
675       We have not found matching element - probably we were too close to
676       the tail of our split-ordered list. To avoid bias against elements
677       at the head of the list we restart our search from its head. Unless
678       we were already searching from it.
679 
680       To avoid going through elements at which we have already looked
681       twice we stop once we reach element from which we have begun our
682       first search.
683     */
684     el= lf_dynarray_lvalue(&hash->array, 0);
685     if (unlikely(!el))
686       return MY_ERRPTR;
687     res= my_lfind_match(el, 1, rev_hashnr, match, &cursor, pins);
688   }
689 
690   if (res)
691     lf_pin(pins, 2, cursor.curr);
692   lf_unpin(pins, 0);
693   lf_unpin(pins, 1);
694 
695   return res ? cursor.curr + 1 : 0;
696 }
697 
698 static const uchar *dummy_key= (uchar*)"";
699 
700 /*
701   RETURN
702     0 - ok
703    -1 - out of memory
704 */
initialize_bucket(LF_HASH * hash,LF_SLIST * volatile * node,uint bucket,LF_PINS * pins)705 static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
706                               uint bucket, LF_PINS *pins)
707 {
708   uint parent= my_clear_highest_bit(bucket);
709   LF_SLIST *dummy= (LF_SLIST *)my_malloc(key_memory_lf_slist,
710                                          sizeof(LF_SLIST), MYF(MY_WME));
711   LF_SLIST **tmp= 0, *cur;
712   LF_SLIST * volatile *el= lf_dynarray_lvalue(&hash->array, parent);
713   if (unlikely(!el || !dummy))
714     return -1;
715   if (*el == NULL && bucket &&
716       unlikely(initialize_bucket(hash, el, parent, pins)))
717     return -1;
718   dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */
719   dummy->key= dummy_key;
720   dummy->keylen= 0;
721   if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE)))
722   {
723     my_free(dummy);
724     dummy= cur;
725   }
726   my_atomic_casptr((void **)node, (void **)&tmp, dummy);
727   /*
728     note that if the CAS above failed (after linsert() succeeded),
729     it would mean that some other thread has executed linsert() for
730     the same dummy node, its linsert() failed, it picked up our
731     dummy node (in "dummy= cur") and executed the same CAS as above.
732     Which means that even if CAS above failed we don't need to retry,
733     and we should not free(dummy) - there's no memory leak here
734   */
735   return 0;
736 }
737