1 /* Copyright (C) 2006 MySQL AB & Alexey Botchkov & MySQL Finland AB
2    & TCX DataKonsult AB
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 #include "maria_def.h"
18 #include "trnman.h"
19 #include "ma_key_recover.h"
20 
21 #ifdef HAVE_RTREE_KEYS
22 
23 #include "ma_rt_index.h"
24 #include "ma_rt_key.h"
25 #include "ma_rt_mbr.h"
26 
27 typedef struct
28 {
29   double square;
30   int n_node;
31   const uchar *key;
32   double *coords;
33 } SplitStruct;
34 
reserve_coords(double ** d_buffer,int n_dim)35 inline static double *reserve_coords(double **d_buffer, int n_dim)
36 {
37   double *coords= *d_buffer;
38   (*d_buffer)+= n_dim * 2;
39   return coords;
40 }
41 
mbr_join(double * a,const double * b,int n_dim)42 static void mbr_join(double *a, const double *b, int n_dim)
43 {
44   double *end= a + n_dim * 2;
45   do
46   {
47     if (a[0] > b[0])
48       a[0]= b[0];
49 
50     if (a[1] < b[1])
51       a[1]= b[1];
52 
53     a+= 2;
54     b+= 2;
55   } while (a != end);
56 }
57 
58 /*
59 Counts the square of mbr which is a join of a and b
60 */
mbr_join_square(const double * a,const double * b,int n_dim)61 static double mbr_join_square(const double *a, const double *b, int n_dim)
62 {
63   const double *end= a + n_dim * 2;
64   double square= 1.0;
65   do
66   {
67     square *=
68       ((a[1] < b[1]) ? b[1] : a[1]) - ((a[0] > b[0]) ? b[0] : a[0]);
69 
70     a+= 2;
71     b+= 2;
72   } while (a != end);
73 
74   return square;
75 }
76 
count_square(const double * a,int n_dim)77 static double count_square(const double *a, int n_dim)
78 {
79   const double *end= a + n_dim * 2;
80   double square= 1.0;
81   do
82   {
83     square *= a[1] - a[0];
84     a+= 2;
85   } while (a != end);
86   return square;
87 }
88 
copy_coords(double * dst,const double * src,int n_dim)89 inline static void copy_coords(double *dst, const double *src, int n_dim)
90 {
91   memcpy(dst, src, sizeof(double) * (n_dim * 2));
92 }
93 
94 /**
95   Select two nodes to collect group upon.
96 
97   Note that such function uses 'double' arithmetic so may behave differently
98   on different platforms/builds. There are others in this file.
99 */
pick_seeds(SplitStruct * node,int n_entries,SplitStruct ** seed_a,SplitStruct ** seed_b,int n_dim)100 static void pick_seeds(SplitStruct *node, int n_entries,
101      SplitStruct **seed_a, SplitStruct **seed_b, int n_dim)
102 {
103   SplitStruct *cur1;
104   SplitStruct *lim1= node + (n_entries - 1);
105   SplitStruct *cur2;
106   SplitStruct *lim2= node + n_entries;
107 
108   double max_d= -DBL_MAX;
109   double d;
110 
111   for (cur1= node; cur1 < lim1; cur1++)
112   {
113     for (cur2=cur1 + 1; cur2 < lim2; cur2++)
114     {
115 
116       d= mbr_join_square(cur1->coords, cur2->coords, n_dim) - cur1->square -
117           cur2->square;
118       if (d > max_d)
119       {
120         max_d= d;
121         *seed_a= cur1;
122         *seed_b= cur2;
123       }
124     }
125   }
126 }
127 
128 /*
129 Select next node and group where to add
130 */
pick_next(SplitStruct * node,int n_entries,double * g1,double * g2,SplitStruct ** choice,int * n_group,int n_dim)131 static void pick_next(SplitStruct *node, int n_entries, double *g1, double *g2,
132     SplitStruct **choice, int *n_group, int n_dim)
133 {
134   SplitStruct *cur= node;
135   SplitStruct *end= node + n_entries;
136 
137   double max_diff= -DBL_MAX;
138 
139   for (; cur < end; cur++)
140   {
141     double diff;
142     double abs_diff;
143 
144     if (cur->n_node)
145     {
146       continue;
147     }
148 
149     diff= mbr_join_square(g1, cur->coords, n_dim) -
150       mbr_join_square(g2, cur->coords, n_dim);
151 
152     abs_diff= fabs(diff);
153     if (abs_diff  > max_diff)
154     {
155       max_diff= abs_diff;
156       *n_group= 1 + (diff > 0);
157       *choice= cur;
158     }
159   }
160 }
161 
162 /*
163 Mark not-in-group entries as n_group
164 */
mark_all_entries(SplitStruct * node,int n_entries,int n_group)165 static void mark_all_entries(SplitStruct *node, int n_entries, int n_group)
166 {
167   SplitStruct *cur= node;
168   SplitStruct *end= node + n_entries;
169 
170   for (; cur < end; cur++)
171   {
172     if (cur->n_node)
173     {
174       continue;
175     }
176     cur->n_node= n_group;
177   }
178 }
179 
split_maria_rtree_node(SplitStruct * node,int n_entries,int all_size,int key_size,int min_size,int size1,int size2,double ** d_buffer,int n_dim)180 static int split_maria_rtree_node(SplitStruct *node, int n_entries,
181                                   int all_size, /* Total key's size */
182                                   int key_size,
183                                   int min_size, /* Minimal group size */
184                                   int size1, int size2 /* initial group sizes */,
185                                   double **d_buffer, int n_dim)
186 {
187   SplitStruct *cur;
188   SplitStruct *UNINIT_VAR(a);
189   SplitStruct *UNINIT_VAR(b);
190   double *g1= reserve_coords(d_buffer, n_dim);
191   double *g2= reserve_coords(d_buffer, n_dim);
192   SplitStruct *UNINIT_VAR(next);
193   int UNINIT_VAR(next_node);
194   int i;
195   SplitStruct *end= node + n_entries;
196 
197   if (all_size < min_size * 2)
198   {
199     return 1;
200   }
201 
202   cur= node;
203   for (; cur < end; cur++)
204   {
205     cur->square= count_square(cur->coords, n_dim);
206     cur->n_node= 0;
207   }
208 
209   pick_seeds(node, n_entries, &a, &b, n_dim);
210   a->n_node= 1;
211   b->n_node= 2;
212 
213 
214   copy_coords(g1, a->coords, n_dim);
215   size1+= key_size;
216   copy_coords(g2, b->coords, n_dim);
217   size2+= key_size;
218 
219 
220   for (i=n_entries - 2; i>0; --i)
221   {
222     if (all_size - (size2 + key_size) < min_size) /* Can't write into group 2 */
223     {
224       mark_all_entries(node, n_entries, 1);
225       break;
226     }
227 
228     if (all_size - (size1 + key_size) < min_size) /* Can't write into group 1 */
229     {
230       mark_all_entries(node, n_entries, 2);
231       break;
232     }
233 
234     pick_next(node, n_entries, g1, g2, &next, &next_node, n_dim);
235     if (next_node == 1)
236     {
237       size1+= key_size;
238       mbr_join(g1, next->coords, n_dim);
239     }
240     else
241     {
242       size2+= key_size;
243       mbr_join(g2, next->coords, n_dim);
244     }
245     next->n_node= next_node;
246   }
247 
248   return 0;
249 }
250 
251 
252 /**
253   Logs key reorganization done in a split page (new page is logged elsewhere).
254 
255   The effect of a split on the split page is three changes:
256   - some piece of the page move to different places inside this page (we are
257   not interested here in the pieces which move to the new page)
258   - the key is inserted into the page or not (could be in the new page)
259   - page is shrunk
260   All this is uniquely determined by a few parameters:
261   - the key (starting at 'key-nod_flag', for 'full_length' bytes
262   (maria_rtree_split_page() seems to depend on its parameters key&key_length
263   but in fact it reads more (to the left: nod_flag, and to the right:
264   full_length)
265   - the binary content of the page
266   - some variables in the share
267   - double arithmetic, which is unpredictable from machine to machine and
268   from build to build (see pick_seeds() above: it has a comparison between
269   double-s 'if (d > max_d)' so the comparison can go differently from machine
270   to machine or build to build, it has happened in real life).
271   If one day we use precision-math instead of double-math, in GIS, then the
272   last parameter would become constant accross machines and builds and we
273   could some cheap logging: just log the few parameters above.
274   Until then, we log the list of memcpy() operations (fortunately, we often do
275   not have to log the source bytes, as they can be found in the page before
276   applying the REDO; the only source bytes to log are the key), the key if it
277   was inserted into this page, and the shrinking.
278 
279   @param  info             table
280   @param  page             page's offset in the file
281   @param  buff             content of the page (post-split)
282   @param  key_with_nod_flag pointer to key-nod_flag
283   @param  full_length      length of (key + (nod_flag (if node) or rowid (if
284                            leaf)))
285   @param  log_internal_copy encoded list of mempcy() operations done on
286                            split page, having their source in the page
287   @param  log_internal_copy_length length of above list, in bytes
288   @param  log_key_copy     operation describing the key's copy, or NULL if the
289                            inserted key was not put into the page (was put in
290                            new page, so does not have to be logged here)
291   @param  length_diff      by how much the page has shrunk during split
292 */
293 
_ma_log_rt_split(MARIA_PAGE * page,const uchar * key_with_nod_flag,uint full_length,const uchar * log_internal_copy,uint log_internal_copy_length,const uchar * log_key_copy,uint length_diff)294 static my_bool _ma_log_rt_split(MARIA_PAGE *page,
295                                 const uchar *key_with_nod_flag,
296                                 uint full_length,
297                                 const uchar *log_internal_copy,
298                                 uint log_internal_copy_length,
299                                 const uchar *log_key_copy,
300                                 uint length_diff)
301 {
302   MARIA_HA    *info=  page->info;
303   MARIA_SHARE *share= info->s;
304   LSN lsn;
305   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 2 + 1 + 2 + 2 + 7],
306     *log_pos;
307   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
308   uint translog_parts, extra_length= 0;
309   my_off_t page_pos;
310   DBUG_ENTER("_ma_log_rt_split");
311   DBUG_PRINT("enter", ("page: %p", page));
312 
313   DBUG_ASSERT(share->now_transactional);
314   page_pos= page->pos / share->block_size;
315   page_store(log_data + FILEID_STORE_SIZE, page_pos);
316   log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
317   log_pos[0]= KEY_OP_DEL_SUFFIX;
318   log_pos++;
319   DBUG_ASSERT((int)length_diff > 0);
320   int2store(log_pos, length_diff);
321   log_pos+= 2;
322   log_pos[0]= KEY_OP_MULTI_COPY;
323   log_pos++;
324   int2store(log_pos, full_length);
325   log_pos+= 2;
326   int2store(log_pos, log_internal_copy_length);
327   log_pos+= 2;
328   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
329   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
330   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    log_internal_copy;
331   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= log_internal_copy_length;
332   translog_parts= 2;
333   if (log_key_copy != NULL) /* need to store key into record */
334   {
335     log_array[TRANSLOG_INTERNAL_PARTS + 2].str=    log_key_copy;
336     log_array[TRANSLOG_INTERNAL_PARTS + 2].length= 1 + 2 + 1 + 2;
337     log_array[TRANSLOG_INTERNAL_PARTS + 3].str=    key_with_nod_flag;
338     log_array[TRANSLOG_INTERNAL_PARTS + 3].length= full_length;
339     extra_length= 1 + 2 + 1 + 2 + full_length;
340     translog_parts+= 2;
341   }
342 
343   _ma_log_key_changes(page,
344                       log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
345                       log_pos, &extra_length, &translog_parts);
346   /* Remember new page length for future log entires for same page */
347   page->org_size= page->size;
348 
349   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
350                             info->trn, info,
351                             (translog_size_t) ((log_pos - log_data) +
352                                                log_internal_copy_length +
353                                                extra_length),
354                             TRANSLOG_INTERNAL_PARTS + translog_parts,
355                             log_array, log_data, NULL))
356     DBUG_RETURN(1);
357   DBUG_RETURN(0);
358 }
359 
360 /**
361    0 ok; the created page is put into page cache; the shortened one is not (up
362    to the caller to do it)
363    1 or -1: error.
364    If new_page_offs==NULL, won't create new page (for redo phase).
365 */
366 
maria_rtree_split_page(const MARIA_KEY * key,MARIA_PAGE * page,my_off_t * new_page_offs)367 int maria_rtree_split_page(const MARIA_KEY *key, MARIA_PAGE *page,
368                            my_off_t *new_page_offs)
369 {
370   MARIA_HA   *info= page->info;
371   MARIA_SHARE *share= info->s;
372   const my_bool transactional= share->now_transactional;
373   int n1, n2; /* Number of items in groups */
374   SplitStruct *task;
375   SplitStruct *cur;
376   SplitStruct *stop;
377   double *coord_buf;
378   double *next_coord;
379   int n_dim;
380   uchar *source_cur, *cur1, *cur2;
381   uchar *new_page_buff, *log_internal_copy, *log_internal_copy_ptr,
382     *log_key_copy= NULL;
383   int err_code= 0;
384   uint new_page_length;
385   uint nod_flag= page->node;
386   uint org_length= page->size;
387   uint full_length= key->data_length + (nod_flag ? nod_flag :
388                                         key->ref_length);
389   uint key_data_length= key->data_length;
390   int max_keys= ((org_length - share->keypage_header) / (full_length));
391   MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
392   MARIA_KEYDEF *keyinfo= key->keyinfo;
393   DBUG_ENTER("maria_rtree_split_page");
394   DBUG_PRINT("rtree", ("splitting block"));
395 
396   n_dim= keyinfo->keysegs / 2;
397 
398   if (!(coord_buf= (double*) my_alloca(n_dim * 2 * sizeof(double) *
399                                        (max_keys + 1 + 4) +
400                                        sizeof(SplitStruct) * (max_keys + 1))))
401     DBUG_RETURN(-1); /* purecov: inspected */
402 
403   task= (SplitStruct *)(coord_buf + n_dim * 2 * (max_keys + 1 + 4));
404 
405   next_coord= coord_buf;
406 
407   stop= task + max_keys;
408   source_cur= rt_PAGE_FIRST_KEY(share, page->buff, nod_flag);
409 
410   for (cur= task;
411        cur < stop;
412        cur++, source_cur= rt_PAGE_NEXT_KEY(share, source_cur, key_data_length,
413                                            nod_flag))
414   {
415     cur->coords= reserve_coords(&next_coord, n_dim);
416     cur->key= source_cur;
417     maria_rtree_d_mbr(keyinfo->seg, source_cur, key_data_length, cur->coords);
418   }
419 
420   cur->coords= reserve_coords(&next_coord, n_dim);
421   maria_rtree_d_mbr(keyinfo->seg, key->data, key_data_length, cur->coords);
422   cur->key= key->data;
423 
424 
425   if (split_maria_rtree_node(task, max_keys + 1,
426                              page->size + full_length + 2,
427                              full_length,
428        rt_PAGE_MIN_SIZE(keyinfo->block_length),
429        2, 2, &next_coord, n_dim))
430   {
431     err_code= 1;
432     goto split_err;
433   }
434 
435   /* Allocate buffer for new page and piece of log record */
436   if (!(new_page_buff= (uchar*) my_alloca((uint)keyinfo->block_length +
437                                           (transactional ?
438                                            (max_keys * (2 + 2) +
439                                             1 + 2 + 1 + 2) : 0))))
440   {
441     err_code= -1;
442     goto split_err;
443   }
444   log_internal_copy= log_internal_copy_ptr= new_page_buff +
445     keyinfo->block_length;
446   bzero(new_page_buff, share->block_size);
447 
448   stop= task + (max_keys + 1);
449   cur1= rt_PAGE_FIRST_KEY(share, page->buff, nod_flag);
450   cur2= rt_PAGE_FIRST_KEY(share, new_page_buff, nod_flag);
451 
452   n1= n2= 0;
453   for (cur= task; cur < stop; cur++)
454   {
455     uchar *to;
456     const uchar *cur_key= cur->key;
457     my_bool log_this_change;
458     DBUG_ASSERT(log_key_copy == NULL);
459     if (cur->n_node == 1)
460     {
461       to= cur1;
462       cur1= rt_PAGE_NEXT_KEY(share, cur1, key_data_length, nod_flag);
463       n1++;
464       log_this_change= transactional;
465     }
466     else
467     {
468       to= cur2;
469       cur2= rt_PAGE_NEXT_KEY(share, cur2, key_data_length, nod_flag);
470       n2++;
471       log_this_change= FALSE;
472     }
473     if (to != cur_key)
474     {
475       uchar *to_with_nod_flag= to - nod_flag;
476       const uchar *cur_key_with_nod_flag= cur_key - nod_flag;
477       memcpy(to_with_nod_flag, cur_key_with_nod_flag, full_length);
478       if (log_this_change)
479       {
480         size_t to_with_nod_flag_offs= to_with_nod_flag - page->buff;
481         if (likely(cur_key != key->data))
482         {
483           /* this memcpy() is internal to the page (source in the page) */
484           size_t cur_key_with_nod_flag_offs= cur_key_with_nod_flag - page->buff;
485           int2store(log_internal_copy_ptr, to_with_nod_flag_offs);
486           log_internal_copy_ptr+= 2;
487           int2store(log_internal_copy_ptr, cur_key_with_nod_flag_offs);
488           log_internal_copy_ptr+= 2;
489         }
490         else
491         {
492           /* last iteration, and this involves *key: source is external */
493           log_key_copy= log_internal_copy_ptr;
494           log_key_copy[0]= KEY_OP_OFFSET;
495           int2store(log_key_copy + 1, to_with_nod_flag_offs);
496           log_key_copy[3]= KEY_OP_CHANGE;
497           int2store(log_key_copy + 4, full_length);
498           /* _ma_log_rt_split() will store *key, right after */
499         }
500       }
501     }
502   }
503   { /* verify that above loop didn't touch header bytes */
504     uint i;
505     for (i= 0; i < share->keypage_header; i++)
506       DBUG_ASSERT(new_page_buff[i]==0);
507   }
508 
509   if (nod_flag)
510     _ma_store_keypage_flag(share, new_page_buff, KEYPAGE_FLAG_ISNOD);
511   _ma_store_keynr(share, new_page_buff, keyinfo->key_nr);
512   new_page_length= share->keypage_header + n2 * full_length;
513   _ma_store_page_used(share, new_page_buff, new_page_length);
514   page->size= share->keypage_header + n1 * full_length;
515   page_store_size(share, page);
516 
517   if ((*new_page_offs= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
518       HA_OFFSET_ERROR)
519     err_code= -1;
520   else
521   {
522     MARIA_PAGE new_page;
523     _ma_page_setup(&new_page, info, keyinfo, *new_page_offs, new_page_buff);
524 
525     if (transactional &&
526         ( /* log change to split page */
527          _ma_log_rt_split(page, key->data - nod_flag,
528                           full_length, log_internal_copy,
529                           (uint)(log_internal_copy_ptr - log_internal_copy),
530                           log_key_copy, (uint)(org_length - page->size)) ||
531          /* and to new page */
532          _ma_log_new(&new_page, 0)))
533       err_code= -1;
534 
535     if (_ma_write_keypage(&new_page, page_link->write_lock,
536                           DFLT_INIT_HITS))
537       err_code= -1;
538   }
539   DBUG_PRINT("rtree", ("split new block: %lu", (ulong) *new_page_offs));
540 
541   my_afree(new_page_buff);
542 split_err:
543   my_afree(coord_buf);
544   DBUG_RETURN(err_code);
545 }
546 
547 #endif /*HAVE_RTREE_KEYS*/
548