1 /* Copyright (c) 2006, 2017, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    Without limiting anything contained in the foregoing, this file,
15    which is part of C Driver for MySQL (Connector/C), is also subject to the
16    Universal FOSS Exception, version 1.0, a copy of which can be found at
17    http://oss.oracle.com/licenses/universal-foss-exception.
18 
19    This program is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22    GNU General Public License, version 2.0, for more details.
23 
24    You should have received a copy of the GNU General Public License
25    along with this program; if not, write to the Free Software
26    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
27 
28 /*
29   extensible hash
30 
31   TODO
32      try to get rid of dummy nodes ?
33      for non-unique hash, count only _distinct_ values
34      (but how to do it in lf_hash_delete ?)
35 */
36 #include <my_global.h>
37 #include <m_string.h>
38 #include <my_sys.h>
39 #include <my_bit.h>
40 #include <lf.h>
41 
42 LF_REQUIRE_PINS(3)
43 
44 /* An element of the list */
45 typedef struct {
46   intptr volatile link; /* a pointer to the next element in a listand a flag */
47   uint32 hashnr;        /* reversed hash number, for sorting                 */
48   const uchar *key;
49   size_t keylen;
50   /*
51     data is stored here, directly after the keylen.
52     thus the pointer to data is (void*)(slist_element_ptr+1)
53   */
54 } LF_SLIST;
55 
56 const int LF_HASH_OVERHEAD= sizeof(LF_SLIST);
57 
58 /*
59   a structure to pass the context (pointers two the three successive elements
60   in a list) from my_lfind to linsert/ldelete
61 */
62 typedef struct {
63   intptr volatile *prev;
64   LF_SLIST *curr, *next;
65 } CURSOR;
66 
67 /*
68   the last bit in LF_SLIST::link is a "deleted" flag.
69   the helper macros below convert it to a pure pointer or a pure flag
70 */
71 #define PTR(V)      (LF_SLIST *)((V) & (~(intptr)1))
72 #define DELETED(V)  ((V) & 1)
73 
74 /*
75   DESCRIPTION
76     Search for hashnr/key/keylen in the list starting from 'head' and
77     position the cursor. The list is ORDER BY hashnr, key
78 
79   RETURN
80     0 - not found
81     1 - found
82 
83   NOTE
84     cursor is positioned in either case
85     pins[0..2] are used, they are NOT removed on return
86 */
my_lfind(LF_SLIST * volatile * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,CURSOR * cursor,LF_PINS * pins)87 static int my_lfind(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
88                  const uchar *key, uint keylen, CURSOR *cursor, LF_PINS *pins)
89 {
90   uint32       cur_hashnr;
91   const uchar  *cur_key;
92   uint         cur_keylen;
93   intptr       link;
94 
95 retry:
96   cursor->prev= (intptr *)head;
97   do { /* PTR() isn't necessary below, head is a dummy node */
98     cursor->curr= (LF_SLIST *)(*cursor->prev);
99     _lf_pin(pins, 1, cursor->curr);
100   } while (my_atomic_loadptr((void**)cursor->prev) != cursor->curr &&
101                               LF_BACKOFF);
102   for (;;)
103   {
104     if (unlikely(!cursor->curr))
105       return 0; /* end of the list */
106     do {
107       /* QQ: XXX or goto retry ? */
108       link= cursor->curr->link;
109       cursor->next= PTR(link);
110       _lf_pin(pins, 0, cursor->next);
111     } while (link != cursor->curr->link && LF_BACKOFF);
112     cur_hashnr= cursor->curr->hashnr;
113     cur_key= cursor->curr->key;
114     cur_keylen= cursor->curr->keylen;
115     if (my_atomic_loadptr((void**)cursor->prev) != cursor->curr)
116     {
117       (void)LF_BACKOFF;
118       goto retry;
119     }
120     if (!DELETED(link))
121     {
122       if (cur_hashnr >= hashnr)
123       {
124         int r= 1;
125         if (cur_hashnr > hashnr ||
126             (r= my_strnncoll(cs, (uchar*) cur_key, cur_keylen, (uchar*) key,
127                              keylen)) >= 0)
128           return !r;
129       }
130       cursor->prev= &(cursor->curr->link);
131       _lf_pin(pins, 2, cursor->curr);
132     }
133     else
134     {
135       /*
136         we found a deleted node - be nice, help the other thread
137         and remove this deleted node
138       */
139       if (my_atomic_casptr((void **)cursor->prev,
140                            (void **)&cursor->curr, cursor->next))
141         _lf_alloc_free(pins, cursor->curr);
142       else
143       {
144         (void)LF_BACKOFF;
145         goto retry;
146       }
147     }
148     cursor->curr= cursor->next;
149     _lf_pin(pins, 1, cursor->curr);
150   }
151 }
152 
153 /*
154   DESCRIPTION
155     insert a 'node' in the list that starts from 'head' in the correct
156     position (as found by my_lfind)
157 
158   RETURN
159     0     - inserted
160     not 0 - a pointer to a duplicate (not pinned and thus unusable)
161 
162   NOTE
163     it uses pins[0..2], on return all pins are removed.
164     if there're nodes with the same key value, a new node is added before them.
165 */
linsert(LF_SLIST * volatile * head,CHARSET_INFO * cs,LF_SLIST * node,LF_PINS * pins,uint flags)166 static LF_SLIST *linsert(LF_SLIST * volatile *head, CHARSET_INFO *cs,
167                          LF_SLIST *node, LF_PINS *pins, uint flags)
168 {
169   CURSOR         cursor;
170   int            res;
171 
172   for (;;)
173   {
174     if (my_lfind(head, cs, node->hashnr, node->key, node->keylen,
175               &cursor, pins) &&
176         (flags & LF_HASH_UNIQUE))
177     {
178       res= 0; /* duplicate found */
179       break;
180     }
181     else
182     {
183       node->link= (intptr)cursor.curr;
184       DBUG_ASSERT(node->link != (intptr)node); /* no circular references */
185       DBUG_ASSERT(cursor.prev != &node->link); /* no circular references */
186       if (my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
187       {
188         res= 1; /* inserted ok */
189         break;
190       }
191     }
192   }
193   _lf_unpin(pins, 0);
194   _lf_unpin(pins, 1);
195   _lf_unpin(pins, 2);
196   /*
197     Note that cursor.curr is not pinned here and the pointer is unreliable,
198     the object may dissapear anytime. But if it points to a dummy node, the
199     pointer is safe, because dummy nodes are never freed - initialize_bucket()
200     uses this fact.
201   */
202   return res ? 0 : cursor.curr;
203 }
204 
205 /*
206   DESCRIPTION
207     deletes a node as identified by hashnr/keey/keylen from the list
208     that starts from 'head'
209 
210   RETURN
211     0 - ok
212     1 - not found
213 
214   NOTE
215     it uses pins[0..2], on return all pins are removed.
216 */
ldelete(LF_SLIST * volatile * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,LF_PINS * pins)217 static int ldelete(LF_SLIST * volatile *head, CHARSET_INFO *cs, uint32 hashnr,
218                    const uchar *key, uint keylen, LF_PINS *pins)
219 {
220   CURSOR cursor;
221   int res;
222 
223   for (;;)
224   {
225     if (!my_lfind(head, cs, hashnr, key, keylen, &cursor, pins))
226     {
227       res= 1; /* not found */
228       break;
229     }
230     else
231     {
232       /* mark the node deleted */
233       if (my_atomic_casptr((void **)&(cursor.curr->link),
234                            (void **)&cursor.next,
235                            (void *)(((intptr)cursor.next) | 1)))
236       {
237         /* and remove it from the list */
238         if (my_atomic_casptr((void **)cursor.prev,
239                              (void **)&cursor.curr, cursor.next))
240           _lf_alloc_free(pins, cursor.curr);
241         else
242         {
243           /*
244             somebody already "helped" us and removed the node ?
245             Let's check if we need to help that someone too!
246             (to ensure the number of "set DELETED flag" actions
247             is equal to the number of "remove from the list" actions)
248           */
249           my_lfind(head, cs, hashnr, key, keylen, &cursor, pins);
250         }
251         res= 0;
252         break;
253       }
254     }
255   }
256   _lf_unpin(pins, 0);
257   _lf_unpin(pins, 1);
258   _lf_unpin(pins, 2);
259   return res;
260 }
261 
262 /*
263   DESCRIPTION
264     searches for a node as identified by hashnr/keey/keylen in the list
265     that starts from 'head'
266 
267   RETURN
268     0 - not found
269     node - found
270 
271   NOTE
272     it uses pins[0..2], on return the pin[2] keeps the node found
273     all other pins are removed.
274 */
my_lsearch(LF_SLIST * volatile * head,CHARSET_INFO * cs,uint32 hashnr,const uchar * key,uint keylen,LF_PINS * pins)275 static LF_SLIST *my_lsearch(LF_SLIST * volatile *head, CHARSET_INFO *cs,
276                          uint32 hashnr, const uchar *key, uint keylen,
277                          LF_PINS *pins)
278 {
279   CURSOR cursor;
280   int res= my_lfind(head, cs, hashnr, key, keylen, &cursor, pins);
281   if (res)
282     _lf_pin(pins, 2, cursor.curr);
283   _lf_unpin(pins, 0);
284   _lf_unpin(pins, 1);
285   return res ? cursor.curr : 0;
286 }
287 
hash_key(const LF_HASH * hash,const uchar * record,size_t * length)288 static inline const uchar* hash_key(const LF_HASH *hash,
289                                     const uchar *record, size_t *length)
290 {
291   if (hash->get_key)
292     return (*hash->get_key)(record, length, 0);
293   *length= hash->key_length;
294   return record + hash->key_offset;
295 }
296 
297 /*
298   Compute the hash key value from the raw key.
299 
300   @note, that the hash value is limited to 2^31, because we need one
301   bit to distinguish between normal and dummy nodes.
302 */
calc_hash(LF_HASH * hash,const uchar * key,uint keylen)303 static inline uint calc_hash(LF_HASH *hash, const uchar *key, uint keylen)
304 {
305   ulong nr1= 1, nr2= 4;
306   hash->charset->coll->hash_sort(hash->charset, (uchar*) key, keylen,
307                                  &nr1, &nr2);
308   return nr1 & INT_MAX32;
309 }
310 
311 #define MAX_LOAD 1.0    /* average number of elements in a bucket */
312 
313 static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
314 
315 /*
316   Initializes lf_hash, the arguments are compatible with hash_init
317 
318   @note element_size sets both the size of allocated memory block for
319   lf_alloc and a size of memcpy'ed block size in lf_hash_insert. Typically
320   they are the same, indeed. But LF_HASH::element_size can be decreased
321   after lf_hash_init, and then lf_alloc will allocate larger block that
322   lf_hash_insert will copy over. It is desireable if part of the element
323   is expensive to initialize - for example if there is a mutex or
324   DYNAMIC_ARRAY. In this case they should be initialize in the
325   LF_ALLOCATOR::constructor, and lf_hash_insert should not overwrite them.
326   See wt_init() for example.
327 */
lf_hash_init(LF_HASH * hash,uint element_size,uint flags,uint key_offset,uint key_length,my_hash_get_key get_key,CHARSET_INFO * charset)328 void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
329                   uint key_offset, uint key_length, my_hash_get_key get_key,
330                   CHARSET_INFO *charset)
331 {
332   lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
333                 offsetof(LF_SLIST, key));
334   lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
335   hash->size= 1;
336   hash->count= 0;
337   hash->element_size= element_size;
338   hash->flags= flags;
339   hash->charset= charset ? charset : &my_charset_bin;
340   hash->key_offset= key_offset;
341   hash->key_length= key_length;
342   hash->get_key= get_key;
343   DBUG_ASSERT(get_key ? !key_offset && !key_length : key_length);
344 }
345 
lf_hash_destroy(LF_HASH * hash)346 void lf_hash_destroy(LF_HASH *hash)
347 {
348   LF_SLIST *el, **head= (LF_SLIST **)_lf_dynarray_value(&hash->array, 0);
349 
350   if (unlikely(!head))
351     return;
352   el= *head;
353 
354   while (el)
355   {
356     intptr next= el->link;
357     if (el->hashnr & 1)
358       lf_alloc_direct_free(&hash->alloc, el); /* normal node */
359     else
360       my_free(el); /* dummy node */
361     el= (LF_SLIST *)next;
362   }
363   lf_alloc_destroy(&hash->alloc);
364   lf_dynarray_destroy(&hash->array);
365 }
366 
367 /*
368   DESCRIPTION
369     inserts a new element to a hash. it will have a _copy_ of
370     data, not a pointer to it.
371 
372   RETURN
373     0 - inserted
374     1 - didn't (unique key conflict)
375    -1 - out of memory
376 
377   NOTE
378     see linsert() for pin usage notes
379 */
lf_hash_insert(LF_HASH * hash,LF_PINS * pins,const void * data)380 int lf_hash_insert(LF_HASH *hash, LF_PINS *pins, const void *data)
381 {
382   int csize, bucket, hashnr;
383   LF_SLIST *node, * volatile *el;
384 
385   lf_rwlock_by_pins(pins);
386   node= (LF_SLIST *)_lf_alloc_new(pins);
387   if (unlikely(!node))
388     return -1;
389   memcpy(node+1, data, hash->element_size);
390   node->key= hash_key(hash, (uchar *)(node+1), &node->keylen);
391   hashnr= calc_hash(hash, node->key, node->keylen);
392   bucket= hashnr % hash->size;
393   el= _lf_dynarray_lvalue(&hash->array, bucket);
394   if (unlikely(!el))
395     return -1;
396   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
397     return -1;
398   node->hashnr= my_reverse_bits(hashnr) | 1; /* normal node */
399   if (linsert(el, hash->charset, node, pins, hash->flags))
400   {
401     _lf_alloc_free(pins, node);
402     lf_rwunlock_by_pins(pins);
403     return 1;
404   }
405   csize= hash->size;
406   if ((my_atomic_add32(&hash->count, 1)+1.0) / csize > MAX_LOAD)
407     my_atomic_cas32(&hash->size, &csize, csize*2);
408   lf_rwunlock_by_pins(pins);
409   return 0;
410 }
411 
412 /*
413   DESCRIPTION
414     deletes an element with the given key from the hash (if a hash is
415     not unique and there're many elements with this key - the "first"
416     matching element is deleted)
417   RETURN
418     0 - deleted
419     1 - didn't (not found)
420    -1 - out of memory
421   NOTE
422     see ldelete() for pin usage notes
423 */
lf_hash_delete(LF_HASH * hash,LF_PINS * pins,const void * key,uint keylen)424 int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
425 {
426   LF_SLIST * volatile *el;
427   uint bucket, hashnr= calc_hash(hash, (uchar *)key, keylen);
428 
429   bucket= hashnr % hash->size;
430   lf_rwlock_by_pins(pins);
431   el= _lf_dynarray_lvalue(&hash->array, bucket);
432   if (unlikely(!el))
433     return -1;
434   /*
435     note that we still need to initialize_bucket here,
436     we cannot return "node not found", because an old bucket of that
437     node may've been split and the node was assigned to a new bucket
438     that was never accessed before and thus is not initialized.
439   */
440   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
441     return -1;
442   if (ldelete(el, hash->charset, my_reverse_bits(hashnr) | 1,
443               (uchar *)key, keylen, pins))
444   {
445     lf_rwunlock_by_pins(pins);
446     return 1;
447   }
448   my_atomic_add32(&hash->count, -1);
449   lf_rwunlock_by_pins(pins);
450   return 0;
451 }
452 
453 /*
454   RETURN
455     a pointer to an element with the given key (if a hash is not unique and
456     there're many elements with this key - the "first" matching element)
457     NULL         if nothing is found
458     MY_ERRPTR    if OOM
459 
460   NOTE
461     see my_lsearch() for pin usage notes
462 */
lf_hash_search(LF_HASH * hash,LF_PINS * pins,const void * key,uint keylen)463 void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
464 {
465   LF_SLIST * volatile *el, *found;
466   uint bucket, hashnr= calc_hash(hash, (uchar *)key, keylen);
467 
468   bucket= hashnr % hash->size;
469   lf_rwlock_by_pins(pins);
470   el= _lf_dynarray_lvalue(&hash->array, bucket);
471   if (unlikely(!el))
472     return MY_ERRPTR;
473   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
474     return MY_ERRPTR;
475   found= my_lsearch(el, hash->charset, my_reverse_bits(hashnr) | 1,
476                  (uchar *)key, keylen, pins);
477   lf_rwunlock_by_pins(pins);
478   return found ? found+1 : 0;
479 }
480 
481 static const uchar *dummy_key= (uchar*)"";
482 
483 /*
484   RETURN
485     0 - ok
486    -1 - out of memory
487 */
initialize_bucket(LF_HASH * hash,LF_SLIST * volatile * node,uint bucket,LF_PINS * pins)488 static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
489                               uint bucket, LF_PINS *pins)
490 {
491   uint parent= my_clear_highest_bit(bucket);
492   LF_SLIST *dummy= (LF_SLIST *)my_malloc(sizeof(LF_SLIST), MYF(MY_WME));
493   LF_SLIST **tmp= 0, *cur;
494   LF_SLIST * volatile *el= _lf_dynarray_lvalue(&hash->array, parent);
495   if (unlikely(!el || !dummy))
496     return -1;
497   if (*el == NULL && bucket &&
498       unlikely(initialize_bucket(hash, el, parent, pins)))
499     return -1;
500   dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */
501   dummy->key= dummy_key;
502   dummy->keylen= 0;
503   if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE)))
504   {
505     my_free(dummy);
506     dummy= cur;
507   }
508   my_atomic_casptr((void **)node, (void **)&tmp, dummy);
509   /*
510     note that if the CAS above failed (after linsert() succeeded),
511     it would mean that some other thread has executed linsert() for
512     the same dummy node, its linsert() failed, it picked up our
513     dummy node (in "dummy= cur") and executed the same CAS as above.
514     Which means that even if CAS above failed we don't need to retry,
515     and we should not free(dummy) - there's no memory leak here
516   */
517   return 0;
518 }
519 
520 /**
521   Search for list element satisfying condition specified by match
522   function and position cursor on it.
523 
524   @param head          Head of the list to search in.
525   @param first_hashnr  Hash value to start search from.
526   @param last_hashnr   Hash value to stop search after.
527   @param match         Match function.
528   @param cursor        Cursor to be position.
529   @param pins          LF_PINS for the calling thread to be used during
530                        search for pinning result.
531 
532   @retval 0 - not found
533   @retval 1 - found
534 */
535 
lfind_match(LF_SLIST * volatile * head,uint32 first_hashnr,uint32 last_hashnr,lf_hash_match_func * match,CURSOR * cursor,LF_PINS * pins)536 static int lfind_match(LF_SLIST * volatile *head,
537                        uint32 first_hashnr, uint32 last_hashnr,
538                        lf_hash_match_func *match,
539                        CURSOR *cursor, LF_PINS *pins)
540 {
541   uint32       cur_hashnr;
542   intptr       link;
543 
544 retry:
545   cursor->prev= (intptr *)head;
546   do { /* PTR() isn't necessary below, head is a dummy node */
547     cursor->curr= (LF_SLIST *)(*cursor->prev);
548     lf_pin(pins, 1, cursor->curr);
549   } while (my_atomic_loadptr((void**)cursor->prev) != cursor->curr &&
550                               LF_BACKOFF);
551   for (;;)
552   {
553     if (unlikely(!cursor->curr))
554       return 0; /* end of the list */
555     do {
556       /* QQ: XXX or goto retry ? */
557       link= cursor->curr->link;
558       cursor->next= PTR(link);
559       lf_pin(pins, 0, cursor->next);
560     } while (link != cursor->curr->link && LF_BACKOFF);
561     cur_hashnr= cursor->curr->hashnr;
562     if (my_atomic_loadptr((void**)cursor->prev) != cursor->curr)
563     {
564       (void)LF_BACKOFF;
565       goto retry;
566     }
567     if (!DELETED(link))
568     {
569       if (cur_hashnr >= first_hashnr)
570       {
571         if (cur_hashnr > last_hashnr)
572           return 0;
573 
574         if (cur_hashnr & 1)
575         {
576           /* Normal node. Check if element matches condition. */
577           if ((*match)((uchar *)(cursor->curr + 1)))
578             return 1;
579         }
580         else
581         {
582           /*
583             Dummy node. Nothing to check here.
584 
585             Still thanks to the fact that dummy nodes are never deleted we
586             can save it as a safe place to restart iteration if ever needed.
587           */
588           head= (LF_SLIST * volatile *)&(cursor->curr->link);
589         }
590       }
591 
592       cursor->prev= &(cursor->curr->link);
593       lf_pin(pins, 2, cursor->curr);
594     }
595     else
596     {
597       /*
598         we found a deleted node - be nice, help the other thread
599         and remove this deleted node
600       */
601       if (my_atomic_casptr((void **)cursor->prev,
602                            (void **)&cursor->curr, cursor->next))
603         lf_pinbox_free(pins, cursor->curr);
604       else
605       {
606         (void)LF_BACKOFF;
607         goto retry;
608       }
609     }
610     cursor->curr= cursor->next;
611     lf_pin(pins, 1, cursor->curr);
612   }
613 }
614 
615 /**
616   Find random hash element which satisfies condition specified by
617   match function.
618 
619   @param hash      Hash to search element in.
620   @param pin       Pins for calling thread to be used during search
621                    and for pinning its result.
622   @param match     Pointer to match function. This function takes
623                    pointer to object stored in hash as parameter
624                    and returns 0 if object doesn't satisfy its
625                    condition (and non-0 value if it does).
626   @param rand_val  Random value to be used for selecting hash
627                    bucket from which search in sort-ordered
628                    list needs to be started.
629 
630   @retval A pointer to a random element matching condition.
631   @retval NULL      - if nothing is found
632   @retval MY_ERRPTR - OOM.
633 
634   @note This function follows the same pinning protocol as lf_hash_search(),
635         i.e. uses pins[0..2]. On return pins[0..1] are removed and pins[2]
636         is used to pin object found. It is also not removed in case when
637         object is not found/error occurs but its value is undefined in
638         this case.
639         So calling lf_hash_unpin() is mandatory after call to this function
640         in case of both success and failure.
641 */
642 
lf_hash_random_match(LF_HASH * hash,LF_PINS * pins,lf_hash_match_func * match,uint rand_val)643 void *lf_hash_random_match(LF_HASH *hash, LF_PINS *pins,
644                            lf_hash_match_func *match,
645                            uint rand_val)
646 {
647   /* Convert random value to valid hash value. */
648   uint hashnr= (rand_val & INT_MAX32);
649   uint bucket;
650   uint32 rev_hashnr;
651   LF_SLIST * volatile *el;
652   CURSOR cursor;
653   int res;
654 
655   bucket= hashnr % hash->size;
656   rev_hashnr= my_reverse_bits(hashnr);
657 
658   el= lf_dynarray_lvalue(&hash->array, bucket);
659   if (unlikely(!el))
660     return MY_ERRPTR;
661   /*
662     Bucket might be totally empty if it has not been accessed since last
663     time LF_HASH::size has been increased. In this case we initialize it
664     by inserting dummy node for this bucket to the correct position in
665     split-ordered list. This should help future lf_hash_* calls trying to
666     access the same bucket.
667   */
668   if (*el == NULL && unlikely(initialize_bucket(hash, el, bucket, pins)))
669     return MY_ERRPTR;
670 
671   /*
672     To avoid bias towards the first matching element in the bucket, we start
673     looking for elements with inversed hash value greater or equal than
674     inversed value of our random hash.
675   */
676   res= lfind_match(el, rev_hashnr | 1, UINT_MAX32, match, &cursor, pins);
677 
678   if (! res && hashnr != 0)
679   {
680     /*
681       We have not found matching element - probably we were too close to
682       the tail of our split-ordered list. To avoid bias against elements
683       at the head of the list we restart our search from its head. Unless
684       we were already searching from it.
685 
686       To avoid going through elements at which we have already looked
687       twice we stop once we reach element from which we have begun our
688       first search.
689     */
690     el= lf_dynarray_lvalue(&hash->array, 0);
691     if (unlikely(!el))
692       return MY_ERRPTR;
693     res= lfind_match(el, 1, rev_hashnr, match, &cursor, pins);
694   }
695 
696   if (res)
697     lf_pin(pins, 2, cursor.curr);
698   lf_unpin(pins, 0);
699   lf_unpin(pins, 1);
700 
701   return res ? cursor.curr + 1 : 0;
702 }
703 
704