1 /*
2 Copyright (c) 2016, 2017 MariaDB
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17 #include "mariadb.h"
18 #include "sql_parse.h"
19 #include "sql_select.h"
20 #include "sql_list.h"
21 #include "item_windowfunc.h"
22 #include "filesort.h"
23 #include "sql_base.h"
24 #include "sql_window.h"
25
26
27 bool
check_window_names(List_iterator_fast<Window_spec> & it)28 Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
29 {
30 if (window_names_are_checked)
31 return false;
32 const char *name= this->name();
33 const char *ref_name= window_reference();
34 it.rewind();
35 Window_spec *win_spec;
36 while((win_spec= it++) && win_spec != this)
37 {
38 const char *win_spec_name= win_spec->name();
39 if (!win_spec_name)
40 break;
41 if (name && my_strcasecmp(system_charset_info, name, win_spec_name) == 0)
42 {
43 my_error(ER_DUP_WINDOW_NAME, MYF(0), name);
44 return true;
45 }
46 if (ref_name &&
47 my_strcasecmp(system_charset_info, ref_name, win_spec_name) == 0)
48 {
49 if (partition_list->elements)
50 {
51 my_error(ER_PARTITION_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0),
52 ref_name);
53 return true;
54 }
55 if (win_spec->order_list->elements && order_list->elements)
56 {
57 my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0), ref_name);
58 return true;
59 }
60 if (win_spec->window_frame)
61 {
62 my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0), ref_name);
63 return true;
64 }
65 referenced_win_spec= win_spec;
66 if (partition_list->elements == 0)
67 partition_list= win_spec->partition_list;
68 if (order_list->elements == 0)
69 order_list= win_spec->order_list;
70 }
71 }
72 if (ref_name && !referenced_win_spec)
73 {
74 my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name);
75 return true;
76 }
77 window_names_are_checked= true;
78 return false;
79 }
80
81 void
print(String * str,enum_query_type query_type)82 Window_spec::print(String *str, enum_query_type query_type)
83 {
84 str->append('(');
85 print_partition(str, query_type);
86 print_order(str, query_type);
87
88 if (window_frame)
89 window_frame->print(str, query_type);
90 str->append(')');
91 }
92
93 void
print_partition(String * str,enum_query_type query_type)94 Window_spec::print_partition(String *str, enum_query_type query_type)
95 {
96 if (partition_list->first)
97 {
98 str->append(STRING_WITH_LEN(" partition by "));
99 st_select_lex::print_order(str, partition_list->first, query_type);
100 }
101 }
102
103 void
print_order(String * str,enum_query_type query_type)104 Window_spec::print_order(String *str, enum_query_type query_type)
105 {
106 if (order_list->first)
107 {
108 str->append(STRING_WITH_LEN(" order by "));
109 st_select_lex::print_order(str, order_list->first, query_type);
110 }
111 }
112
113 bool
check_frame_bounds()114 Window_frame::check_frame_bounds()
115 {
116 if ((top_bound->is_unbounded() &&
117 top_bound->precedence_type == Window_frame_bound::FOLLOWING) ||
118 (bottom_bound->is_unbounded() &&
119 bottom_bound->precedence_type == Window_frame_bound::PRECEDING) ||
120 (top_bound->precedence_type == Window_frame_bound::CURRENT &&
121 bottom_bound->precedence_type == Window_frame_bound::PRECEDING) ||
122 (bottom_bound->precedence_type == Window_frame_bound::CURRENT &&
123 top_bound->precedence_type == Window_frame_bound::FOLLOWING))
124 {
125 my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0));
126 return true;
127 }
128
129 return false;
130 }
131
132
133 void
print(String * str,enum_query_type query_type)134 Window_frame::print(String *str, enum_query_type query_type)
135 {
136 switch (units) {
137 case UNITS_ROWS:
138 str->append(STRING_WITH_LEN(" rows "));
139 break;
140 case UNITS_RANGE:
141 str->append(STRING_WITH_LEN(" range "));
142 break;
143 default:
144 DBUG_ASSERT(0);
145 }
146
147 str->append(STRING_WITH_LEN("between "));
148 top_bound->print(str, query_type);
149 str->append(STRING_WITH_LEN(" and "));
150 bottom_bound->print(str, query_type);
151
152 if (exclusion != EXCL_NONE)
153 {
154 str->append(STRING_WITH_LEN(" exclude "));
155 switch (exclusion) {
156 case EXCL_CURRENT_ROW:
157 str->append(STRING_WITH_LEN(" current row "));
158 break;
159 case EXCL_GROUP:
160 str->append(STRING_WITH_LEN(" group "));
161 break;
162 case EXCL_TIES:
163 str->append(STRING_WITH_LEN(" ties "));
164 break;
165 default:
166 DBUG_ASSERT(0);
167 ;
168 }
169 }
170 }
171
172
173 void
print(String * str,enum_query_type query_type)174 Window_frame_bound::print(String *str, enum_query_type query_type)
175 {
176 if (precedence_type == CURRENT)
177 {
178 str->append(STRING_WITH_LEN(" current row "));
179 return;
180 }
181 if (is_unbounded())
182 str->append(STRING_WITH_LEN(" unbounded "));
183 else
184 offset->print(str ,query_type);
185 switch (precedence_type) {
186 case PRECEDING:
187 str->append(STRING_WITH_LEN(" preceding "));
188 break;
189 case FOLLOWING:
190 str->append(STRING_WITH_LEN(" following "));
191 break;
192 default:
193 DBUG_ASSERT(0);
194 }
195 }
196
197 /*
198 Setup window functions in a select
199 */
200
201 int
setup_windows(THD * thd,Ref_ptr_array ref_pointer_array,TABLE_LIST * tables,List<Item> & fields,List<Item> & all_fields,List<Window_spec> & win_specs,List<Item_window_func> & win_funcs)202 setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
203 List<Item> &fields, List<Item> &all_fields,
204 List<Window_spec> &win_specs, List<Item_window_func> &win_funcs)
205 {
206 Window_spec *win_spec;
207 DBUG_ENTER("setup_windows");
208 List_iterator<Window_spec> it(win_specs);
209
210 /*
211 Move all unnamed specifications after the named ones.
212 We could have avoided it if we had built two separate lists for
213 named and unnamed specifications.
214 */
215 Query_arena *arena, backup;
216 arena= thd->activate_stmt_arena_if_needed(&backup);
217 uint i = 0;
218 uint elems= win_specs.elements;
219 while ((win_spec= it++) && i++ < elems)
220 {
221 if (win_spec->name() == NULL)
222 {
223 it.remove();
224 win_specs.push_back(win_spec);
225 }
226 }
227 if (arena)
228 thd->restore_active_arena(arena, &backup);
229
230 it.rewind();
231
232 List_iterator_fast<Window_spec> itp(win_specs);
233
234 while ((win_spec= it++))
235 {
236 bool hidden_group_fields;
237 if (win_spec->check_window_names(itp) ||
238 setup_group(thd, ref_pointer_array, tables, fields, all_fields,
239 win_spec->partition_list->first, &hidden_group_fields,
240 true) ||
241 setup_order(thd, ref_pointer_array, tables, fields, all_fields,
242 win_spec->order_list->first, true) ||
243 (win_spec->window_frame &&
244 win_spec->window_frame->check_frame_bounds()))
245 {
246 DBUG_RETURN(1);
247 }
248
249 if (win_spec->window_frame &&
250 win_spec->window_frame->exclusion != Window_frame::EXCL_NONE)
251 {
252 my_error(ER_FRAME_EXCLUSION_NOT_SUPPORTED, MYF(0));
253 DBUG_RETURN(1);
254 }
255 /*
256 For "win_func() OVER (ORDER BY order_list RANGE BETWEEN ...)",
257 - ORDER BY order_list must not be ommitted
258 - the list must have a single element.
259 */
260 if (win_spec->window_frame &&
261 win_spec->window_frame->units == Window_frame::UNITS_RANGE)
262 {
263 if (win_spec->order_list->elements != 1)
264 {
265 my_error(ER_RANGE_FRAME_NEEDS_SIMPLE_ORDERBY, MYF(0));
266 DBUG_RETURN(1);
267 }
268
269 /*
270 "The declared type of SK shall be numeric, datetime, or interval"
271 we don't support datetime or interval, yet.
272 */
273 Item_result rtype= win_spec->order_list->first->item[0]->result_type();
274 if (rtype != REAL_RESULT && rtype != INT_RESULT &&
275 rtype != DECIMAL_RESULT)
276 {
277 my_error(ER_WRONG_TYPE_FOR_RANGE_FRAME, MYF(0));
278 DBUG_RETURN(1);
279 }
280
281 /*
282 "The declared type of UVS shall be numeric if the declared type of SK
283 is numeric; otherwise, it shall be an interval type that may be added
284 to or subtracted from the declared type of SK"
285 */
286 Window_frame_bound *bounds[]= {win_spec->window_frame->top_bound,
287 win_spec->window_frame->bottom_bound,
288 NULL};
289 for (Window_frame_bound **pbound= &bounds[0]; *pbound; pbound++)
290 {
291 if (!(*pbound)->is_unbounded() &&
292 ((*pbound)->precedence_type == Window_frame_bound::FOLLOWING ||
293 (*pbound)->precedence_type == Window_frame_bound::PRECEDING))
294 {
295 Item_result rtype= (*pbound)->offset->result_type();
296 if (rtype != REAL_RESULT && rtype != INT_RESULT &&
297 rtype != DECIMAL_RESULT)
298 {
299 my_error(ER_WRONG_TYPE_FOR_RANGE_FRAME, MYF(0));
300 DBUG_RETURN(1);
301 }
302 }
303 }
304 }
305
306 /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */
307 if (win_spec->window_frame &&
308 win_spec->window_frame->units == Window_frame::UNITS_ROWS)
309 {
310 Window_frame_bound *bounds[]= {win_spec->window_frame->top_bound,
311 win_spec->window_frame->bottom_bound,
312 NULL};
313 for (Window_frame_bound **pbound= &bounds[0]; *pbound; pbound++)
314 {
315 if (!(*pbound)->is_unbounded() &&
316 ((*pbound)->precedence_type == Window_frame_bound::FOLLOWING ||
317 (*pbound)->precedence_type == Window_frame_bound::PRECEDING))
318 {
319 Item *offset= (*pbound)->offset;
320 if (offset->result_type() != INT_RESULT)
321 {
322 my_error(ER_WRONG_TYPE_FOR_ROWS_FRAME, MYF(0));
323 DBUG_RETURN(1);
324 }
325 }
326 }
327 }
328 }
329
330 List_iterator_fast<Item_window_func> li(win_funcs);
331 while (Item_window_func * win_func_item= li++)
332 {
333 if (win_func_item->check_result_type_of_order_item())
334 DBUG_RETURN(1);
335 }
336 DBUG_RETURN(0);
337 }
338
339
340 /**
341 @brief
342 Find fields common for all partition lists used in window functions
343
344 @param thd The thread handle
345
346 @details
347 This function looks for the field references in the partition lists
348 of all window functions used in this select that are common for
349 all the partition lists. The function returns an ORDER list contained
350 all such references.The list either is specially built by the function
351 or is taken directly from the first window specification.
352
353 @retval
354 pointer to the first element of the ORDER list contained field
355 references common for all partition lists
356 0 if no such reference is found.
357 */
358
find_common_window_func_partition_fields(THD * thd)359 ORDER *st_select_lex::find_common_window_func_partition_fields(THD *thd)
360 {
361 ORDER *ord;
362 Item *item;
363 DBUG_ASSERT(window_funcs.elements);
364 List_iterator_fast<Item_window_func> it(window_funcs);
365 Item_window_func *first_wf= it++;
366 if (!first_wf->window_spec->partition_list)
367 return 0;
368 List<Item> common_fields;
369 uint first_partition_elements= 0;
370 for (ord= first_wf->window_spec->partition_list->first; ord; ord= ord->next)
371 {
372 if ((*ord->item)->real_item()->type() == Item::FIELD_ITEM)
373 common_fields.push_back(*ord->item, thd->mem_root);
374 first_partition_elements++;
375 }
376 if (window_specs.elements == 1 &&
377 common_fields.elements == first_partition_elements)
378 return first_wf->window_spec->partition_list->first;
379 List_iterator<Item> li(common_fields);
380 Item_window_func *wf;
381 while (common_fields.elements && (wf= it++))
382 {
383 if (!wf->window_spec->partition_list)
384 return 0;
385 while ((item= li++))
386 {
387 for (ord= wf->window_spec->partition_list->first; ord; ord= ord->next)
388 {
389 if (item->eq(*ord->item, false))
390 break;
391 }
392 if (!ord)
393 li.remove();
394 }
395 li.rewind();
396 }
397 if (!common_fields.elements)
398 return 0;
399 if (common_fields.elements == first_partition_elements)
400 return first_wf->window_spec->partition_list->first;
401 SQL_I_List<ORDER> res_list;
402 for (ord= first_wf->window_spec->partition_list->first, item= li++;
403 ord; ord= ord->next)
404 {
405 if (item != *ord->item)
406 continue;
407 if (add_to_list(thd, res_list, item, ord->direction))
408 return 0;
409 item= li++;
410 }
411 return res_list.first;
412 }
413
414
415 /////////////////////////////////////////////////////////////////////////////
416 // Sorting window functions to minimize the number of table scans
417 // performed during the computation of these functions
418 /////////////////////////////////////////////////////////////////////////////
419
420 #define CMP_LT -2 // Less than
421 #define CMP_LT_C -1 // Less than and compatible
422 #define CMP_EQ 0 // Equal to
423 #define CMP_GT_C 1 // Greater than and compatible
424 #define CMP_GT 2 // Greater then
425
426 static
compare_order_elements(ORDER * ord1,ORDER * ord2)427 int compare_order_elements(ORDER *ord1, ORDER *ord2)
428 {
429 if (*ord1->item == *ord2->item && ord1->direction == ord2->direction)
430 return CMP_EQ;
431 Item *item1= (*ord1->item)->real_item();
432 Item *item2= (*ord2->item)->real_item();
433 DBUG_ASSERT(item1->type() == Item::FIELD_ITEM &&
434 item2->type() == Item::FIELD_ITEM);
435 int cmp= ((Item_field *) item1)->field->field_index -
436 ((Item_field *) item2)->field->field_index;
437 if (cmp == 0)
438 {
439 if (ord1->direction == ord2->direction)
440 return CMP_EQ;
441 return ord1->direction > ord2->direction ? CMP_GT : CMP_LT;
442 }
443 else
444 return cmp > 0 ? CMP_GT : CMP_LT;
445 }
446
447 static
compare_order_lists(SQL_I_List<ORDER> * part_list1,SQL_I_List<ORDER> * part_list2)448 int compare_order_lists(SQL_I_List<ORDER> *part_list1,
449 SQL_I_List<ORDER> *part_list2)
450 {
451 if (part_list1 == part_list2)
452 return CMP_EQ;
453 ORDER *elem1= part_list1->first;
454 ORDER *elem2= part_list2->first;
455 for ( ; elem1 && elem2; elem1= elem1->next, elem2= elem2->next)
456 {
457 int cmp;
458 // remove all constants as we don't need them for comparision
459 while(elem1 && ((*elem1->item)->real_item())->const_item())
460 {
461 elem1= elem1->next;
462 continue;
463 }
464
465 while(elem2 && ((*elem2->item)->real_item())->const_item())
466 {
467 elem2= elem2->next;
468 continue;
469 }
470
471 if (!elem1 || !elem2)
472 break;
473
474 if ((cmp= compare_order_elements(elem1, elem2)))
475 return cmp;
476 }
477 if (elem1)
478 return CMP_GT_C;
479 if (elem2)
480 return CMP_LT_C;
481 return CMP_EQ;
482 }
483
484
485 static
compare_window_frame_bounds(Window_frame_bound * win_frame_bound1,Window_frame_bound * win_frame_bound2,bool is_bottom_bound)486 int compare_window_frame_bounds(Window_frame_bound *win_frame_bound1,
487 Window_frame_bound *win_frame_bound2,
488 bool is_bottom_bound)
489 {
490 int res;
491 if (win_frame_bound1->precedence_type != win_frame_bound2->precedence_type)
492 {
493 res= win_frame_bound1->precedence_type > win_frame_bound2->precedence_type ?
494 CMP_GT : CMP_LT;
495 if (is_bottom_bound)
496 res= -res;
497 return res;
498 }
499
500 if (win_frame_bound1->is_unbounded() && win_frame_bound2->is_unbounded())
501 return CMP_EQ;
502
503 if (!win_frame_bound1->is_unbounded() && !win_frame_bound2->is_unbounded())
504 {
505 if (win_frame_bound1->offset->eq(win_frame_bound2->offset, true))
506 return CMP_EQ;
507 else
508 {
509 res= strcmp(win_frame_bound1->offset->name.str,
510 win_frame_bound2->offset->name.str);
511 res= res > 0 ? CMP_GT : CMP_LT;
512 if (is_bottom_bound)
513 res= -res;
514 return res;
515 }
516 }
517
518 /*
519 Here we have:
520 win_frame_bound1->is_unbounded() != win_frame_bound1->is_unbounded()
521 */
522 return is_bottom_bound != win_frame_bound1->is_unbounded() ? CMP_LT : CMP_GT;
523 }
524
525
526 static
compare_window_frames(Window_frame * win_frame1,Window_frame * win_frame2)527 int compare_window_frames(Window_frame *win_frame1,
528 Window_frame *win_frame2)
529 {
530 int cmp;
531
532 if (win_frame1 == win_frame2)
533 return CMP_EQ;
534
535 if (!win_frame1)
536 return CMP_LT;
537
538 if (!win_frame2)
539 return CMP_GT;
540
541 if (win_frame1->units != win_frame2->units)
542 return win_frame1->units > win_frame2->units ? CMP_GT : CMP_LT;
543
544 cmp= compare_window_frame_bounds(win_frame1->top_bound,
545 win_frame2->top_bound,
546 false);
547 if (cmp)
548 return cmp;
549
550 cmp= compare_window_frame_bounds(win_frame1->bottom_bound,
551 win_frame2->bottom_bound,
552 true);
553 if (cmp)
554 return cmp;
555
556 if (win_frame1->exclusion != win_frame2->exclusion)
557 return win_frame1->exclusion > win_frame2->exclusion ? CMP_GT_C : CMP_LT_C;
558
559 return CMP_EQ;
560 }
561
562 static
compare_window_spec_joined_lists(Window_spec * win_spec1,Window_spec * win_spec2)563 int compare_window_spec_joined_lists(Window_spec *win_spec1,
564 Window_spec *win_spec2)
565 {
566 win_spec1->join_partition_and_order_lists();
567 win_spec2->join_partition_and_order_lists();
568 int cmp= compare_order_lists(win_spec1->partition_list,
569 win_spec2->partition_list);
570 win_spec1->disjoin_partition_and_order_lists();
571 win_spec2->disjoin_partition_and_order_lists();
572 return cmp;
573 }
574
575
576 static
compare_window_funcs_by_window_specs(Item_window_func * win_func1,Item_window_func * win_func2,void * arg)577 int compare_window_funcs_by_window_specs(Item_window_func *win_func1,
578 Item_window_func *win_func2,
579 void *arg)
580 {
581 int cmp;
582 Window_spec *win_spec1= win_func1->window_spec;
583 Window_spec *win_spec2= win_func2->window_spec;
584 if (win_spec1 == win_spec2)
585 return CMP_EQ;
586 cmp= compare_order_lists(win_spec1->partition_list,
587 win_spec2->partition_list);
588 if (cmp == CMP_EQ)
589 {
590 /*
591 Partition lists contain the same elements.
592 Let's use only one of the lists.
593 */
594 if (!win_spec1->name() && win_spec2->name())
595 {
596 win_spec1->save_partition_list= win_spec1->partition_list;
597 win_spec1->partition_list= win_spec2->partition_list;
598 }
599 else
600 {
601 win_spec2->save_partition_list= win_spec2->partition_list;
602 win_spec2->partition_list= win_spec1->partition_list;
603 }
604
605 cmp= compare_order_lists(win_spec1->order_list,
606 win_spec2->order_list);
607
608 if (cmp != CMP_EQ)
609 return cmp;
610
611 /*
612 Order lists contain the same elements.
613 Let's use only one of the lists.
614 */
615 if (!win_spec1->name() && win_spec2->name())
616 {
617 win_spec1->save_order_list= win_spec2->order_list;
618 win_spec1->order_list= win_spec2->order_list;
619 }
620 else
621 {
622 win_spec1->save_order_list= win_spec2->order_list;
623 win_spec2->order_list= win_spec1->order_list;
624 }
625
626 cmp= compare_window_frames(win_spec1->window_frame,
627 win_spec2->window_frame);
628
629 if (cmp != CMP_EQ)
630 return cmp;
631
632 /* Window frames are equal. Let's use only one of them. */
633 if (!win_spec1->name() && win_spec2->name())
634 win_spec1->window_frame= win_spec2->window_frame;
635 else
636 win_spec2->window_frame= win_spec1->window_frame;
637
638 return CMP_EQ;
639 }
640
641 if (cmp == CMP_GT || cmp == CMP_LT)
642 return cmp;
643
644 /* one of the partitions lists is the proper beginning of the another */
645 cmp= compare_window_spec_joined_lists(win_spec1, win_spec2);
646
647 if (CMP_LT_C <= cmp && cmp <= CMP_GT_C)
648 cmp= win_spec1->partition_list->elements <
649 win_spec2->partition_list->elements ? CMP_GT_C : CMP_LT_C;
650
651 return cmp;
652 }
653
654
655 #define SORTORDER_CHANGE_FLAG 1
656 #define PARTITION_CHANGE_FLAG 2
657 #define FRAME_CHANGE_FLAG 4
658
659 typedef int (*Item_window_func_cmp)(Item_window_func *f1,
660 Item_window_func *f2,
661 void *arg);
662 /*
663 @brief
664 Sort window functions so that those that can be computed together are
665 adjacent.
666
667 @detail
668 Sort window functions by their
669 - required sorting order,
670 - partition list,
671 - window frame compatibility.
672
673 The changes between the groups are marked by setting item_window_func->marker.
674 */
675
676 static
order_window_funcs_by_window_specs(List<Item_window_func> * win_func_list)677 void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
678 {
679 if (win_func_list->elements == 0)
680 return;
681
682 bubble_sort<Item_window_func>(win_func_list,
683 compare_window_funcs_by_window_specs,
684 NULL);
685
686 List_iterator_fast<Item_window_func> it(*win_func_list);
687 Item_window_func *prev= it++;
688 prev->marker= SORTORDER_CHANGE_FLAG |
689 PARTITION_CHANGE_FLAG |
690 FRAME_CHANGE_FLAG;
691 Item_window_func *curr;
692 while ((curr= it++))
693 {
694 Window_spec *win_spec_prev= prev->window_spec;
695 Window_spec *win_spec_curr= curr->window_spec;
696 curr->marker= 0;
697 if (!(win_spec_prev->partition_list == win_spec_curr->partition_list &&
698 win_spec_prev->order_list == win_spec_curr->order_list))
699 {
700 int cmp;
701 if (win_spec_prev->partition_list == win_spec_curr->partition_list)
702 cmp= compare_order_lists(win_spec_prev->order_list,
703 win_spec_curr->order_list);
704 else
705 cmp= compare_window_spec_joined_lists(win_spec_prev, win_spec_curr);
706 if (!(CMP_LT_C <= cmp && cmp <= CMP_GT_C))
707 {
708 curr->marker= SORTORDER_CHANGE_FLAG |
709 PARTITION_CHANGE_FLAG |
710 FRAME_CHANGE_FLAG;
711 }
712 else if (win_spec_prev->partition_list != win_spec_curr->partition_list)
713 {
714 curr->marker|= PARTITION_CHANGE_FLAG | FRAME_CHANGE_FLAG;
715 }
716 }
717 else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
718 curr->marker|= FRAME_CHANGE_FLAG;
719
720 prev= curr;
721 }
722 }
723
724
725 /////////////////////////////////////////////////////////////////////////////
726
727
728 /////////////////////////////////////////////////////////////////////////////
729 // Window Frames support
730 /////////////////////////////////////////////////////////////////////////////
731
732 // note: make rr_from_pointers static again when not need it here anymore
733 int rr_from_pointers(READ_RECORD *info);
734
735
736 /////////////////////////////////////////////////////////////////////////////
737
738
739 /*
740 A cursor over a sequence of rowids. One can
741 - Move to next rowid
742 - jump to given number in the sequence
743 - Know the number of the current rowid (i.e. how many rowids have been read)
744 */
745
746 class Rowid_seq_cursor
747 {
748 public:
Rowid_seq_cursor()749 Rowid_seq_cursor() : io_cache(NULL), ref_buffer(0) {}
~Rowid_seq_cursor()750 virtual ~Rowid_seq_cursor()
751 {
752 if (ref_buffer)
753 my_free(ref_buffer);
754 if (io_cache)
755 {
756 end_slave_io_cache(io_cache);
757 my_free(io_cache);
758 io_cache= NULL;
759 }
760 }
761
762 private:
763 /* Length of one rowid element */
764 size_t ref_length;
765
766 /* If io_cache=!NULL, use it */
767 IO_CACHE *io_cache;
768 uchar *ref_buffer; /* Buffer for the last returned rowid */
769 ha_rows rownum; /* Number of the rowid that is about to be returned */
770 ha_rows current_ref_buffer_rownum;
771 bool ref_buffer_valid;
772
773 /* The following are used when we are reading from an array of pointers */
774 uchar *cache_start;
775 uchar *cache_pos;
776 uchar *cache_end;
777 public:
778
init(READ_RECORD * info)779 void init(READ_RECORD *info)
780 {
781 ref_length= info->ref_length;
782 if (info->read_record_func == rr_from_pointers)
783 {
784 io_cache= NULL;
785 cache_start= info->cache_pos;
786 cache_pos= info->cache_pos;
787 cache_end= info->cache_end;
788 }
789 else
790 {
791 //DBUG_ASSERT(info->read_record == rr_from_tempfile);
792 rownum= 0;
793 io_cache= (IO_CACHE*)my_malloc(PSI_INSTRUMENT_ME, sizeof(IO_CACHE), MYF(0));
794 init_slave_io_cache(info->io_cache, io_cache);
795
796 ref_buffer= (uchar*)my_malloc(PSI_INSTRUMENT_ME, ref_length, MYF(0));
797 ref_buffer_valid= false;
798 }
799 }
800
next()801 virtual int next()
802 {
803 /* Allow multiple next() calls in EOF state. */
804 if (at_eof())
805 return -1;
806
807 if (io_cache)
808 {
809 rownum++;
810 }
811 else
812 {
813 cache_pos+= ref_length;
814 DBUG_ASSERT(cache_pos <= cache_end);
815 }
816 return 0;
817 }
818
prev()819 virtual int prev()
820 {
821 if (io_cache)
822 {
823 if (rownum == 0)
824 return -1;
825
826 rownum--;
827 return 0;
828 }
829 else
830 {
831 /* Allow multiple prev() calls when positioned at the start. */
832 if (cache_pos == cache_start)
833 return -1;
834 cache_pos-= ref_length;
835 DBUG_ASSERT(cache_pos >= cache_start);
836 return 0;
837 }
838 }
839
get_rownum() const840 ha_rows get_rownum() const
841 {
842 if (io_cache)
843 return rownum;
844 else
845 return (cache_pos - cache_start) / ref_length;
846 }
847
move_to(ha_rows row_number)848 void move_to(ha_rows row_number)
849 {
850 if (io_cache)
851 {
852 rownum= row_number;
853 }
854 else
855 {
856 cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
857 DBUG_ASSERT(cache_pos <= cache_end);
858 }
859 }
860
861 protected:
at_eof()862 bool at_eof()
863 {
864 if (io_cache)
865 {
866 return rownum * ref_length >= io_cache->end_of_file;
867 }
868 else
869 return (cache_pos == cache_end);
870 }
871
get_curr_rowid(uchar ** row_id)872 bool get_curr_rowid(uchar **row_id)
873 {
874 if (io_cache)
875 {
876 DBUG_ASSERT(!at_eof());
877 if (!ref_buffer_valid || current_ref_buffer_rownum != rownum)
878 {
879 seek_io_cache(io_cache, rownum * ref_length);
880 if (my_b_read(io_cache,ref_buffer,ref_length))
881 {
882 /* Error reading from file. */
883 return true;
884 }
885 ref_buffer_valid= true;
886 current_ref_buffer_rownum = rownum;
887 }
888 *row_id = ref_buffer;
889 return false;
890 }
891 else
892 {
893 *row_id= cache_pos;
894 return false;
895 }
896 }
897 };
898
899
900 /*
901 Cursor which reads from rowid sequence and also retrieves table rows.
902 */
903
904 class Table_read_cursor : public Rowid_seq_cursor
905 {
906 public:
~Table_read_cursor()907 virtual ~Table_read_cursor() {}
908
init(READ_RECORD * info)909 void init(READ_RECORD *info)
910 {
911 Rowid_seq_cursor::init(info);
912 table= info->table;
913 record= info->record();
914 }
915
fetch()916 virtual int fetch()
917 {
918 if (at_eof())
919 return -1;
920
921 uchar* curr_rowid;
922 if (get_curr_rowid(&curr_rowid))
923 return -1;
924 return table->file->ha_rnd_pos(record, curr_rowid);
925 }
926
927 private:
928 /* The table that is acccesed by this cursor. */
929 TABLE *table;
930 /* Buffer where to store the table's record data. */
931 uchar *record;
932
933 // TODO(spetrunia): should move_to() also read row here?
934 };
935
936
937 /*
938 A cursor which only moves within a partition. The scan stops at the partition
939 end, and it needs an explicit command to move to the next partition.
940
941 This cursor can not move backwards.
942 */
943
944 class Partition_read_cursor : public Table_read_cursor
945 {
946 public:
Partition_read_cursor(THD * thd,SQL_I_List<ORDER> * partition_list)947 Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) :
948 bound_tracker(thd, partition_list) {}
949
init(READ_RECORD * info)950 void init(READ_RECORD *info)
951 {
952 Table_read_cursor::init(info);
953 bound_tracker.init();
954 end_of_partition= false;
955 }
956
957 /*
958 Informs the cursor that we need to move into the next partition.
959 The next partition is provided in two ways:
960 - in table->record[0]..
961 - rownum parameter has the row number.
962 */
on_next_partition(ha_rows rownum)963 void on_next_partition(ha_rows rownum)
964 {
965 /* Remember the sort key value from the new partition */
966 move_to(rownum);
967 bound_tracker.check_if_next_group();
968 end_of_partition= false;
969
970 }
971
972 /*
973 This returns -1 when end of partition was reached.
974 */
next()975 int next()
976 {
977 int res;
978 if (end_of_partition)
979 return -1;
980
981 if ((res= Table_read_cursor::next()) ||
982 (res= fetch()))
983 {
984 /* TODO(cvicentiu) This does not consider table read failures.
985 Perhaps assuming end of table like this is fine in that case. */
986
987 /* This row is the final row in the table. To maintain semantics
988 that cursors always point to the last valid row, move back one step,
989 but mark end_of_partition as true. */
990 Table_read_cursor::prev();
991 end_of_partition= true;
992 return res;
993 }
994
995 if (bound_tracker.compare_with_cache())
996 {
997 /* This row is part of a new partition, don't move
998 forward any more untill we get informed of a new partition. */
999 Table_read_cursor::prev();
1000 end_of_partition= true;
1001 return -1;
1002 }
1003 return 0;
1004 }
1005
1006 private:
1007 Group_bound_tracker bound_tracker;
1008 bool end_of_partition;
1009 };
1010
1011
1012
1013 /////////////////////////////////////////////////////////////////////////////
1014
1015 /*
1016 Window frame bound cursor. Abstract interface.
1017
1018 @detail
1019 The cursor moves within the partition that the current row is in.
1020 It may be ahead or behind the current row.
1021
1022 The cursor also assumes that the current row moves forward through the
1023 partition and will move to the next adjacent partition after this one.
1024
1025 List of all cursor classes:
1026 Frame_cursor
1027 Frame_range_n_top
1028 Frame_range_n_bottom
1029
1030 Frame_range_current_row_top
1031 Frame_range_current_row_bottom
1032
1033 Frame_n_rows_preceding
1034 Frame_n_rows_following
1035
1036 Frame_rows_current_row_top = Frame_n_rows_preceding(0)
1037 Frame_rows_current_row_bottom
1038
1039 // These handle both RANGE and ROWS-type bounds
1040 Frame_unbounded_preceding
1041 Frame_unbounded_following
1042
1043 // This is not used as a frame bound, it counts rows in the partition:
1044 Frame_unbounded_following_set_count : public Frame_unbounded_following
1045
1046 @todo
1047 - if we want to allocate this on the MEM_ROOT we should make sure
1048 it is not re-allocated for every subquery execution.
1049 */
1050
1051 class Frame_cursor : public Sql_alloc
1052 {
1053 public:
Frame_cursor()1054 Frame_cursor() : sum_functions(), perform_no_action(false) {}
1055
init(READ_RECORD * info)1056 virtual void init(READ_RECORD *info) {};
1057
add_sum_func(Item_sum * item)1058 bool add_sum_func(Item_sum* item)
1059 {
1060 return sum_functions.push_back(item);
1061 }
1062 /*
1063 Current row has moved to the next partition and is positioned on the first
1064 row there. Position the frame bound accordingly.
1065
1066 @param first - TRUE means this is the first partition
1067 @param item - Put or remove rows from there.
1068
1069 @detail
1070 - if first==false, the caller guarantees that tbl->record[0] points at the
1071 first row in the new partition.
1072 - if first==true, we are just starting in the first partition and no such
1073 guarantee is provided.
1074
1075 - The callee may move tbl->file and tbl->record[0] to point to some other
1076 row.
1077 */
pre_next_partition(ha_rows rownum)1078 virtual void pre_next_partition(ha_rows rownum) {};
1079 virtual void next_partition(ha_rows rownum)=0;
1080
1081 /*
1082 The current row has moved one row forward.
1083 Move this frame bound accordingly, and update the value of aggregate
1084 function as necessary.
1085 */
pre_next_row()1086 virtual void pre_next_row() {};
1087 virtual void next_row()=0;
1088
is_outside_computation_bounds() const1089 virtual bool is_outside_computation_bounds() const { return false; };
1090
~Frame_cursor()1091 virtual ~Frame_cursor() {}
1092
1093 /*
1094 Regular frame cursors add or remove values from the sum functions they
1095 manage. By calling this method, they will only perform the required
1096 movement within the table, but no adding/removing will happen.
1097 */
set_no_action()1098 void set_no_action()
1099 {
1100 perform_no_action= true;
1101 }
1102
1103 /* Retrieves the row number that this cursor currently points at. */
1104 virtual ha_rows get_curr_rownum() const= 0;
1105
1106 protected:
add_value_to_items()1107 inline void add_value_to_items()
1108 {
1109 if (perform_no_action)
1110 return;
1111
1112 List_iterator_fast<Item_sum> it(sum_functions);
1113 Item_sum *item_sum;
1114 while ((item_sum= it++))
1115 {
1116 item_sum->add();
1117 }
1118 }
1119
remove_value_from_items()1120 inline void remove_value_from_items()
1121 {
1122 if (perform_no_action)
1123 return;
1124
1125 List_iterator_fast<Item_sum> it(sum_functions);
1126 Item_sum *item_sum;
1127 while ((item_sum= it++))
1128 {
1129 item_sum->remove();
1130 }
1131 }
1132
1133 /* Clear all sum functions handled by this cursor. */
clear_sum_functions()1134 void clear_sum_functions()
1135 {
1136 List_iterator_fast<Item_sum> iter_sum_func(sum_functions);
1137 Item_sum *sum_func;
1138 while ((sum_func= iter_sum_func++))
1139 {
1140 sum_func->clear();
1141 }
1142 }
1143
1144 /* Sum functions that this cursor handles. */
1145 List<Item_sum> sum_functions;
1146
1147 private:
1148 bool perform_no_action;
1149 };
1150
1151 /*
1152 A class that owns cursor objects associated with a specific window function.
1153 */
1154 class Cursor_manager
1155 {
1156 public:
add_cursor(Frame_cursor * cursor)1157 bool add_cursor(Frame_cursor *cursor)
1158 {
1159 return cursors.push_back(cursor);
1160 }
1161
initialize_cursors(READ_RECORD * info)1162 void initialize_cursors(READ_RECORD *info)
1163 {
1164 List_iterator_fast<Frame_cursor> iter(cursors);
1165 Frame_cursor *fc;
1166 while ((fc= iter++))
1167 fc->init(info);
1168 }
1169
notify_cursors_partition_changed(ha_rows rownum)1170 void notify_cursors_partition_changed(ha_rows rownum)
1171 {
1172 List_iterator_fast<Frame_cursor> iter(cursors);
1173 Frame_cursor *cursor;
1174 while ((cursor= iter++))
1175 cursor->pre_next_partition(rownum);
1176
1177 iter.rewind();
1178 while ((cursor= iter++))
1179 cursor->next_partition(rownum);
1180 }
1181
notify_cursors_next_row()1182 void notify_cursors_next_row()
1183 {
1184 List_iterator_fast<Frame_cursor> iter(cursors);
1185 Frame_cursor *cursor;
1186 while ((cursor= iter++))
1187 cursor->pre_next_row();
1188
1189 iter.rewind();
1190 while ((cursor= iter++))
1191 cursor->next_row();
1192 }
1193
~Cursor_manager()1194 ~Cursor_manager() { cursors.delete_elements(); }
1195
1196 private:
1197 /* List of the cursors that this manager owns. */
1198 List<Frame_cursor> cursors;
1199 };
1200
1201
1202
1203 //////////////////////////////////////////////////////////////////////////////
1204 // RANGE-type frames
1205 //////////////////////////////////////////////////////////////////////////////
1206
1207 /*
1208 Frame_range_n_top handles the top end of RANGE-type frame.
1209
1210 That is, it handles:
1211 RANGE BETWEEN n PRECEDING AND ...
1212 RANGE BETWEEN n FOLLOWING AND ...
1213
1214 Top of the frame doesn't need to check for partition end, since bottom will
1215 reach it before.
1216 */
1217
1218 class Frame_range_n_top : public Frame_cursor
1219 {
1220 Partition_read_cursor cursor;
1221
1222 Cached_item_item *range_expr;
1223
1224 Item *n_val;
1225 Item *item_add;
1226
1227 const bool is_preceding;
1228
1229 bool end_of_partition;
1230
1231 /*
1232 1 when order_list uses ASC ordering
1233 -1 when order_list uses DESC ordering
1234 */
1235 int order_direction;
1236 public:
Frame_range_n_top(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list,bool is_preceding_arg,Item * n_val_arg)1237 Frame_range_n_top(THD *thd,
1238 SQL_I_List<ORDER> *partition_list,
1239 SQL_I_List<ORDER> *order_list,
1240 bool is_preceding_arg, Item *n_val_arg) :
1241 cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
1242 is_preceding(is_preceding_arg)
1243 {
1244 DBUG_ASSERT(order_list->elements == 1);
1245 Item *src_expr= order_list->first->item[0];
1246 if (order_list->first->direction == ORDER::ORDER_ASC)
1247 order_direction= 1;
1248 else
1249 order_direction= -1;
1250
1251 range_expr= (Cached_item_item*) new_Cached_item(thd, src_expr, FALSE);
1252
1253 bool use_minus= is_preceding;
1254 if (order_direction == -1)
1255 use_minus= !use_minus;
1256
1257 if (use_minus)
1258 item_add= new (thd->mem_root) Item_func_minus(thd, src_expr, n_val);
1259 else
1260 item_add= new (thd->mem_root) Item_func_plus(thd, src_expr, n_val);
1261
1262 item_add->fix_fields(thd, &item_add);
1263 }
1264
init(READ_RECORD * info)1265 void init(READ_RECORD *info)
1266 {
1267 cursor.init(info);
1268 }
1269
pre_next_partition(ha_rows rownum)1270 void pre_next_partition(ha_rows rownum)
1271 {
1272 // Save the value of FUNC(current_row)
1273 range_expr->fetch_value_from(item_add);
1274
1275 cursor.on_next_partition(rownum);
1276 end_of_partition= false;
1277 }
1278
next_partition(ha_rows rownum)1279 void next_partition(ha_rows rownum)
1280 {
1281 walk_till_non_peer();
1282 }
1283
pre_next_row()1284 void pre_next_row()
1285 {
1286 if (end_of_partition)
1287 return;
1288 range_expr->fetch_value_from(item_add);
1289 }
1290
next_row()1291 void next_row()
1292 {
1293 if (end_of_partition)
1294 return;
1295 /*
1296 Ok, our cursor is at the first row R where
1297 (prev_row + n) >= R
1298 We need to check about the current row.
1299 */
1300 walk_till_non_peer();
1301 }
1302
get_curr_rownum() const1303 ha_rows get_curr_rownum() const
1304 {
1305 return cursor.get_rownum();
1306 }
1307
is_outside_computation_bounds() const1308 bool is_outside_computation_bounds() const
1309 {
1310 if (end_of_partition)
1311 return true;
1312 return false;
1313 }
1314
1315 private:
walk_till_non_peer()1316 void walk_till_non_peer()
1317 {
1318 if (cursor.fetch()) // ERROR
1319 return;
1320 // Current row is not a peer.
1321 if (order_direction * range_expr->cmp_read_only() <= 0)
1322 return;
1323 remove_value_from_items();
1324
1325 int res;
1326 while (!(res= cursor.next()))
1327 {
1328 /* Note, no need to fetch the value explicitly here. The partition
1329 read cursor will fetch it to check if the partition has changed.
1330 TODO(cvicentiu) make this piece of information not necessary by
1331 reimplementing Partition_read_cursor.
1332 */
1333 if (order_direction * range_expr->cmp_read_only() <= 0)
1334 break;
1335 remove_value_from_items();
1336 }
1337 if (res)
1338 end_of_partition= true;
1339 }
1340
1341 };
1342
1343
1344 /*
1345 Frame_range_n_bottom handles bottom end of RANGE-type frame.
1346
1347 That is, it handles frame bounds in form:
1348 RANGE BETWEEN ... AND n PRECEDING
1349 RANGE BETWEEN ... AND n FOLLOWING
1350
1351 Bottom end moves first so it needs to check for partition end
1352 (todo: unless it's PRECEDING and in that case it doesnt)
1353 (todo: factor out common parts with Frame_range_n_top into
1354 a common ancestor)
1355 */
1356
1357 class Frame_range_n_bottom: public Frame_cursor
1358 {
1359 Partition_read_cursor cursor;
1360
1361 Cached_item_item *range_expr;
1362
1363 Item *n_val;
1364 Item *item_add;
1365
1366 const bool is_preceding;
1367
1368 bool end_of_partition;
1369
1370 /*
1371 1 when order_list uses ASC ordering
1372 -1 when order_list uses DESC ordering
1373 */
1374 int order_direction;
1375 public:
Frame_range_n_bottom(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list,bool is_preceding_arg,Item * n_val_arg)1376 Frame_range_n_bottom(THD *thd,
1377 SQL_I_List<ORDER> *partition_list,
1378 SQL_I_List<ORDER> *order_list,
1379 bool is_preceding_arg, Item *n_val_arg) :
1380 cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
1381 is_preceding(is_preceding_arg), added_values(false)
1382 {
1383 DBUG_ASSERT(order_list->elements == 1);
1384 Item *src_expr= order_list->first->item[0];
1385
1386 if (order_list->first->direction == ORDER::ORDER_ASC)
1387 order_direction= 1;
1388 else
1389 order_direction= -1;
1390
1391 range_expr= (Cached_item_item*) new_Cached_item(thd, src_expr, FALSE);
1392
1393 bool use_minus= is_preceding;
1394 if (order_direction == -1)
1395 use_minus= !use_minus;
1396
1397 if (use_minus)
1398 item_add= new (thd->mem_root) Item_func_minus(thd, src_expr, n_val);
1399 else
1400 item_add= new (thd->mem_root) Item_func_plus(thd, src_expr, n_val);
1401
1402 item_add->fix_fields(thd, &item_add);
1403 }
1404
init(READ_RECORD * info)1405 void init(READ_RECORD *info)
1406 {
1407 cursor.init(info);
1408 }
1409
pre_next_partition(ha_rows rownum)1410 void pre_next_partition(ha_rows rownum)
1411 {
1412 // Save the value of FUNC(current_row)
1413 range_expr->fetch_value_from(item_add);
1414
1415 cursor.on_next_partition(rownum);
1416 end_of_partition= false;
1417 added_values= false;
1418 }
1419
next_partition(ha_rows rownum)1420 void next_partition(ha_rows rownum)
1421 {
1422 cursor.move_to(rownum);
1423 walk_till_non_peer();
1424 }
1425
pre_next_row()1426 void pre_next_row()
1427 {
1428 if (end_of_partition)
1429 return;
1430 range_expr->fetch_value_from(item_add);
1431 }
1432
next_row()1433 void next_row()
1434 {
1435 if (end_of_partition)
1436 return;
1437 /*
1438 Ok, our cursor is at the first row R where
1439 (prev_row + n) >= R
1440 We need to check about the current row.
1441 */
1442 walk_till_non_peer();
1443 }
1444
is_outside_computation_bounds() const1445 bool is_outside_computation_bounds() const
1446 {
1447 if (!added_values)
1448 return true;
1449 return false;
1450 }
1451
get_curr_rownum() const1452 ha_rows get_curr_rownum() const
1453 {
1454 if (end_of_partition)
1455 return cursor.get_rownum(); // Cursor does not pass over partition bound.
1456 else
1457 return cursor.get_rownum() - 1; // Cursor is placed on first non peer.
1458 }
1459
1460 private:
1461 bool added_values;
1462
walk_till_non_peer()1463 void walk_till_non_peer()
1464 {
1465 cursor.fetch();
1466 // Current row is not a peer.
1467 if (order_direction * range_expr->cmp_read_only() < 0)
1468 return;
1469
1470 add_value_to_items(); // Add current row.
1471 added_values= true;
1472 int res;
1473 while (!(res= cursor.next()))
1474 {
1475 if (order_direction * range_expr->cmp_read_only() < 0)
1476 break;
1477 add_value_to_items();
1478 }
1479 if (res)
1480 end_of_partition= true;
1481 }
1482 };
1483
1484
1485 /*
1486 RANGE BETWEEN ... AND CURRENT ROW, bottom frame bound for CURRENT ROW
1487 ...
1488 | peer1
1489 | peer2 <----- current_row
1490 | peer3
1491 +-peer4 <----- the cursor points here. peer4 itself is included.
1492 nonpeer1
1493 nonpeer2
1494
1495 This bound moves in front of the current_row. It should be a the first row
1496 that is still a peer of the current row.
1497 */
1498
1499 class Frame_range_current_row_bottom: public Frame_cursor
1500 {
1501 Partition_read_cursor cursor;
1502
1503 Group_bound_tracker peer_tracker;
1504
1505 bool dont_move;
1506 public:
Frame_range_current_row_bottom(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1507 Frame_range_current_row_bottom(THD *thd,
1508 SQL_I_List<ORDER> *partition_list,
1509 SQL_I_List<ORDER> *order_list) :
1510 cursor(thd, partition_list), peer_tracker(thd, order_list)
1511 {
1512 }
1513
init(READ_RECORD * info)1514 void init(READ_RECORD *info)
1515 {
1516 cursor.init(info);
1517 peer_tracker.init();
1518 }
1519
pre_next_partition(ha_rows rownum)1520 void pre_next_partition(ha_rows rownum)
1521 {
1522 // Save the value of the current_row
1523 peer_tracker.check_if_next_group();
1524 cursor.on_next_partition(rownum);
1525 // Add the current row now because our cursor has already seen it
1526 add_value_to_items();
1527 }
1528
next_partition(ha_rows rownum)1529 void next_partition(ha_rows rownum)
1530 {
1531 walk_till_non_peer();
1532 }
1533
pre_next_row()1534 void pre_next_row()
1535 {
1536 dont_move= !peer_tracker.check_if_next_group();
1537 }
1538
next_row()1539 void next_row()
1540 {
1541 // Check if our cursor is pointing at a peer of the current row.
1542 // If not, move forward until that becomes true
1543 if (dont_move)
1544 {
1545 /*
1546 Our current is not a peer of the current row.
1547 No need to move the bound.
1548 */
1549 return;
1550 }
1551 walk_till_non_peer();
1552 }
1553
get_curr_rownum() const1554 ha_rows get_curr_rownum() const
1555 {
1556 return cursor.get_rownum();
1557 }
1558
1559 private:
walk_till_non_peer()1560 void walk_till_non_peer()
1561 {
1562 /*
1563 Walk forward until we've met first row that's not a peer of the current
1564 row
1565 */
1566 while (!cursor.next())
1567 {
1568 if (peer_tracker.compare_with_cache())
1569 {
1570 cursor.prev(); // Move to our peer.
1571 break;
1572 }
1573
1574 add_value_to_items();
1575 }
1576 }
1577 };
1578
1579
1580 /*
1581 RANGE BETWEEN CURRENT ROW AND .... Top CURRENT ROW, RANGE-type frame bound
1582
1583 nonpeer1
1584 nonpeer2
1585 +-peer1 <----- the cursor points here. peer1 itself is included.
1586 | peer2
1587 | peer3 <----- current_row
1588 | peer4
1589 ...
1590
1591 It moves behind the current_row. It is located right after the first peer of
1592 the current_row.
1593 */
1594
1595 class Frame_range_current_row_top : public Frame_cursor
1596 {
1597 Group_bound_tracker bound_tracker;
1598
1599 Table_read_cursor cursor;
1600 Group_bound_tracker peer_tracker;
1601
1602 bool move;
1603 public:
Frame_range_current_row_top(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1604 Frame_range_current_row_top(THD *thd,
1605 SQL_I_List<ORDER> *partition_list,
1606 SQL_I_List<ORDER> *order_list) :
1607 bound_tracker(thd, partition_list), cursor(), peer_tracker(thd, order_list),
1608 move(false)
1609 {}
1610
init(READ_RECORD * info)1611 void init(READ_RECORD *info)
1612 {
1613 bound_tracker.init();
1614
1615 cursor.init(info);
1616 peer_tracker.init();
1617 }
1618
pre_next_partition(ha_rows rownum)1619 void pre_next_partition(ha_rows rownum)
1620 {
1621 // Fetch the value from the first row
1622 peer_tracker.check_if_next_group();
1623 cursor.move_to(rownum);
1624 }
1625
next_partition(ha_rows rownum)1626 void next_partition(ha_rows rownum) {}
1627
pre_next_row()1628 void pre_next_row()
1629 {
1630 // Check if the new current_row is a peer of the row that our cursor is
1631 // pointing to.
1632 move= peer_tracker.check_if_next_group();
1633 }
1634
next_row()1635 void next_row()
1636 {
1637 if (move)
1638 {
1639 /*
1640 Our cursor is pointing at the first row that was a peer of the previous
1641 current row. Or, it was the first row in the partition.
1642 */
1643 if (cursor.fetch())
1644 return;
1645
1646 // todo: need the following check ?
1647 if (!peer_tracker.compare_with_cache())
1648 return;
1649 remove_value_from_items();
1650
1651 do
1652 {
1653 if (cursor.next() || cursor.fetch())
1654 return;
1655 if (!peer_tracker.compare_with_cache())
1656 return;
1657 remove_value_from_items();
1658 }
1659 while (1);
1660 }
1661 }
1662
get_curr_rownum() const1663 ha_rows get_curr_rownum() const
1664 {
1665 return cursor.get_rownum();
1666 }
1667 };
1668
1669
1670 /////////////////////////////////////////////////////////////////////////////
1671 // UNBOUNDED frame bounds (shared between RANGE and ROWS)
1672 /////////////////////////////////////////////////////////////////////////////
1673
1674 /*
1675 UNBOUNDED PRECEDING frame bound
1676 */
1677 class Frame_unbounded_preceding : public Frame_cursor
1678 {
1679 public:
Frame_unbounded_preceding(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1680 Frame_unbounded_preceding(THD *thd,
1681 SQL_I_List<ORDER> *partition_list,
1682 SQL_I_List<ORDER> *order_list)
1683 {}
1684
init(READ_RECORD * info)1685 void init(READ_RECORD *info) {}
1686
next_partition(ha_rows rownum)1687 void next_partition(ha_rows rownum)
1688 {
1689 /*
1690 UNBOUNDED PRECEDING frame end just stays on the first row of the
1691 partition. We are top of the frame, so we don't need to update the sum
1692 function.
1693 */
1694 curr_rownum= rownum;
1695 }
1696
next_row()1697 void next_row()
1698 {
1699 /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
1700 }
1701
get_curr_rownum() const1702 ha_rows get_curr_rownum() const
1703 {
1704 return curr_rownum;
1705 }
1706
1707 private:
1708 ha_rows curr_rownum;
1709 };
1710
1711
1712 /*
1713 UNBOUNDED FOLLOWING frame bound
1714 */
1715
1716 class Frame_unbounded_following : public Frame_cursor
1717 {
1718 protected:
1719 Partition_read_cursor cursor;
1720
1721 public:
Frame_unbounded_following(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1722 Frame_unbounded_following(THD *thd,
1723 SQL_I_List<ORDER> *partition_list,
1724 SQL_I_List<ORDER> *order_list) :
1725 cursor(thd, partition_list) {}
1726
init(READ_RECORD * info)1727 void init(READ_RECORD *info)
1728 {
1729 cursor.init(info);
1730 }
1731
pre_next_partition(ha_rows rownum)1732 void pre_next_partition(ha_rows rownum)
1733 {
1734 cursor.on_next_partition(rownum);
1735 }
1736
next_partition(ha_rows rownum)1737 void next_partition(ha_rows rownum)
1738 {
1739 /* Activate the first row */
1740 cursor.fetch();
1741 add_value_to_items();
1742
1743 /* Walk to the end of the partition, updating the SUM function */
1744 while (!cursor.next())
1745 {
1746 add_value_to_items();
1747 }
1748 }
1749
next_row()1750 void next_row()
1751 {
1752 /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
1753 }
1754
get_curr_rownum() const1755 ha_rows get_curr_rownum() const
1756 {
1757 return cursor.get_rownum();
1758 }
1759 };
1760
1761
1762 class Frame_unbounded_following_set_count : public Frame_unbounded_following
1763 {
1764 public:
Frame_unbounded_following_set_count(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1765 Frame_unbounded_following_set_count(
1766 THD *thd,
1767 SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
1768 Frame_unbounded_following(thd, partition_list, order_list) {}
1769
next_partition(ha_rows rownum)1770 void next_partition(ha_rows rownum)
1771 {
1772 ha_rows num_rows_in_partition= 0;
1773 if (cursor.fetch())
1774 return;
1775 num_rows_in_partition++;
1776
1777 /* Walk to the end of the partition, find how many rows there are. */
1778 while (!cursor.next())
1779 num_rows_in_partition++;
1780 set_win_funcs_row_count(num_rows_in_partition);
1781 }
1782
get_curr_rownum() const1783 ha_rows get_curr_rownum() const
1784 {
1785 return cursor.get_rownum();
1786 }
1787
1788 protected:
set_win_funcs_row_count(ha_rows num_rows_in_partition)1789 void set_win_funcs_row_count(ha_rows num_rows_in_partition)
1790 {
1791 List_iterator_fast<Item_sum> it(sum_functions);
1792 Item_sum* item;
1793 while ((item= it++))
1794 item->set_partition_row_count(num_rows_in_partition);
1795 }
1796 };
1797
1798 class Frame_unbounded_following_set_count_no_nulls:
1799 public Frame_unbounded_following_set_count
1800 {
1801
1802 public:
Frame_unbounded_following_set_count_no_nulls(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list)1803 Frame_unbounded_following_set_count_no_nulls(THD *thd,
1804 SQL_I_List<ORDER> *partition_list,
1805 SQL_I_List<ORDER> *order_list) :
1806 Frame_unbounded_following_set_count(thd,partition_list, order_list)
1807 {
1808 order_item= order_list->first->item[0];
1809 }
next_partition(ha_rows rownum)1810 void next_partition(ha_rows rownum)
1811 {
1812 ha_rows num_rows_in_partition= 0;
1813 if (cursor.fetch())
1814 return;
1815
1816 /* Walk to the end of the partition, find how many rows there are. */
1817 do
1818 {
1819 if (!order_item->is_null())
1820 num_rows_in_partition++;
1821 } while (!cursor.next());
1822
1823 set_win_funcs_row_count(num_rows_in_partition);
1824 }
1825
get_curr_rownum() const1826 ha_rows get_curr_rownum() const
1827 {
1828 return cursor.get_rownum();
1829 }
1830
1831 private:
1832 Item* order_item;
1833 };
1834
1835 /////////////////////////////////////////////////////////////////////////////
1836 // ROWS-type frame bounds
1837 /////////////////////////////////////////////////////////////////////////////
1838 /*
1839 ROWS $n PRECEDING frame bound
1840
1841 */
1842 class Frame_n_rows_preceding : public Frame_cursor
1843 {
1844 /* Whether this is top of the frame or bottom */
1845 const bool is_top_bound;
1846 const ha_rows n_rows;
1847
1848 /* Number of rows that we need to skip before our cursor starts moving */
1849 ha_rows n_rows_behind;
1850
1851 Table_read_cursor cursor;
1852 public:
Frame_n_rows_preceding(bool is_top_bound_arg,ha_rows n_rows_arg)1853 Frame_n_rows_preceding(bool is_top_bound_arg, ha_rows n_rows_arg) :
1854 is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), n_rows_behind(0)
1855 {}
1856
init(READ_RECORD * info)1857 void init(READ_RECORD *info)
1858 {
1859 cursor.init(info);
1860 }
1861
next_partition(ha_rows rownum)1862 void next_partition(ha_rows rownum)
1863 {
1864 /*
1865 Position our cursor to point at the first row in the new partition
1866 (for rownum=0, it is already there, otherwise, it lags behind)
1867 */
1868 cursor.move_to(rownum);
1869 /* Cursor is in the same spot as current row. */
1870 n_rows_behind= 0;
1871
1872 /*
1873 Suppose the bound is ROWS 2 PRECEDING, and current row is row#n:
1874 ...
1875 n-3
1876 n-2 --- bound row
1877 n-1
1878 n --- current_row
1879 ...
1880 The bound should point at row #(n-2). Bounds are inclusive, so
1881 - bottom bound should add row #(n-2) into the window function
1882 - top bound should remove row (#n-3) from the window function.
1883 */
1884 move_cursor_if_possible();
1885
1886 }
1887
next_row()1888 void next_row()
1889 {
1890 n_rows_behind++;
1891 move_cursor_if_possible();
1892 }
1893
is_outside_computation_bounds() const1894 bool is_outside_computation_bounds() const
1895 {
1896 /* As a bottom boundary, rows have not yet been added. */
1897 if (!is_top_bound && n_rows - n_rows_behind)
1898 return true;
1899 return false;
1900 }
1901
get_curr_rownum() const1902 ha_rows get_curr_rownum() const
1903 {
1904 return cursor.get_rownum();
1905 }
1906
1907 private:
move_cursor_if_possible()1908 void move_cursor_if_possible()
1909 {
1910 longlong rows_difference= n_rows - n_rows_behind;
1911 if (rows_difference > 0) /* We still have to wait. */
1912 return;
1913
1914 /* The cursor points to the first row in the frame. */
1915 if (rows_difference == 0)
1916 {
1917 if (!is_top_bound)
1918 {
1919 cursor.fetch();
1920 add_value_to_items();
1921 }
1922 /* For top bound we don't have to remove anything as nothing was added. */
1923 return;
1924 }
1925
1926 /* We need to catch up by one row. */
1927 DBUG_ASSERT(rows_difference == -1);
1928
1929 if (is_top_bound)
1930 {
1931 cursor.fetch();
1932 remove_value_from_items();
1933 cursor.next();
1934 }
1935 else
1936 {
1937 cursor.next();
1938 cursor.fetch();
1939 add_value_to_items();
1940 }
1941 /* We've advanced one row. We are no longer behind. */
1942 n_rows_behind--;
1943 }
1944 };
1945
1946
1947 /*
1948 ROWS ... CURRENT ROW, Bottom bound.
1949
1950 This case is moved to separate class because here we don't need to maintain
1951 our own cursor, or check for partition bound.
1952 */
1953
1954 class Frame_rows_current_row_bottom : public Frame_cursor
1955 {
1956 public:
1957
Frame_rows_current_row_bottom()1958 Frame_rows_current_row_bottom() : curr_rownum(0) {}
1959
pre_next_partition(ha_rows rownum)1960 void pre_next_partition(ha_rows rownum)
1961 {
1962 add_value_to_items();
1963 curr_rownum= rownum;
1964 }
1965
next_partition(ha_rows rownum)1966 void next_partition(ha_rows rownum) {}
1967
pre_next_row()1968 void pre_next_row()
1969 {
1970 /* Temp table's current row is current_row. Add it to the window func */
1971 add_value_to_items();
1972 }
1973
next_row()1974 void next_row()
1975 {
1976 curr_rownum++;
1977 };
1978
get_curr_rownum() const1979 ha_rows get_curr_rownum() const
1980 {
1981 return curr_rownum;
1982 }
1983
1984 private:
1985 ha_rows curr_rownum;
1986 };
1987
1988
1989 /*
1990 ROWS-type CURRENT ROW, top bound.
1991
1992 This serves for processing "ROWS BETWEEN CURRENT ROW AND ..." frames.
1993
1994 n-1
1995 n --+ --- current_row, and top frame bound
1996 n+1 |
1997 ... |
1998
1999 when the current_row moves to row #n, this frame bound should remove the
2000 row #(n-1) from the window function.
2001
2002 In other words, we need what "ROWS PRECEDING 0" provides.
2003 */
2004 class Frame_rows_current_row_top: public Frame_n_rows_preceding
2005
2006 {
2007 public:
Frame_rows_current_row_top()2008 Frame_rows_current_row_top() :
2009 Frame_n_rows_preceding(true /*top*/, 0 /* n_rows */)
2010 {}
2011 };
2012
2013
2014 /*
2015 ROWS $n FOLLOWING frame bound.
2016 */
2017
2018 class Frame_n_rows_following : public Frame_cursor
2019 {
2020 /* Whether this is top of the frame or bottom */
2021 const bool is_top_bound;
2022 const ha_rows n_rows;
2023
2024 Partition_read_cursor cursor;
2025 bool at_partition_end;
2026 public:
Frame_n_rows_following(THD * thd,SQL_I_List<ORDER> * partition_list,SQL_I_List<ORDER> * order_list,bool is_top_bound_arg,ha_rows n_rows_arg)2027 Frame_n_rows_following(THD *thd,
2028 SQL_I_List<ORDER> *partition_list,
2029 SQL_I_List<ORDER> *order_list,
2030 bool is_top_bound_arg, ha_rows n_rows_arg) :
2031 is_top_bound(is_top_bound_arg), n_rows(n_rows_arg),
2032 cursor(thd, partition_list)
2033 {
2034 }
2035
init(READ_RECORD * info)2036 void init(READ_RECORD *info)
2037 {
2038 cursor.init(info);
2039 at_partition_end= false;
2040 }
2041
pre_next_partition(ha_rows rownum)2042 void pre_next_partition(ha_rows rownum)
2043 {
2044 at_partition_end= false;
2045
2046 cursor.on_next_partition(rownum);
2047 }
2048
2049 /* Move our cursor to be n_rows ahead. */
next_partition(ha_rows rownum)2050 void next_partition(ha_rows rownum)
2051 {
2052 if (is_top_bound)
2053 next_part_top(rownum);
2054 else
2055 next_part_bottom(rownum);
2056 }
2057
next_row()2058 void next_row()
2059 {
2060 if (is_top_bound)
2061 next_row_top();
2062 else
2063 next_row_bottom();
2064 }
2065
is_outside_computation_bounds() const2066 bool is_outside_computation_bounds() const
2067 {
2068 /*
2069 The top bound can go over the current partition. In this case,
2070 the sum function has 0 values added to it.
2071 */
2072 if (at_partition_end && is_top_bound)
2073 return true;
2074 return false;
2075 }
2076
get_curr_rownum() const2077 ha_rows get_curr_rownum() const
2078 {
2079 return cursor.get_rownum();
2080 }
2081
2082 private:
next_part_top(ha_rows rownum)2083 void next_part_top(ha_rows rownum)
2084 {
2085 for (ha_rows i= 0; i < n_rows; i++)
2086 {
2087 if (cursor.fetch())
2088 break;
2089 remove_value_from_items();
2090 if (cursor.next())
2091 at_partition_end= true;
2092 }
2093 }
2094
next_part_bottom(ha_rows rownum)2095 void next_part_bottom(ha_rows rownum)
2096 {
2097 if (cursor.fetch())
2098 return;
2099 add_value_to_items();
2100
2101 for (ha_rows i= 0; i < n_rows; i++)
2102 {
2103 if (cursor.next())
2104 {
2105 at_partition_end= true;
2106 break;
2107 }
2108 add_value_to_items();
2109 }
2110 return;
2111 }
2112
next_row_top()2113 void next_row_top()
2114 {
2115 if (cursor.fetch()) // PART END OR FAILURE
2116 {
2117 at_partition_end= true;
2118 return;
2119 }
2120 remove_value_from_items();
2121 if (cursor.next())
2122 {
2123 at_partition_end= true;
2124 return;
2125 }
2126 }
2127
next_row_bottom()2128 void next_row_bottom()
2129 {
2130 if (at_partition_end)
2131 return;
2132
2133 if (cursor.next())
2134 {
2135 at_partition_end= true;
2136 return;
2137 }
2138
2139 add_value_to_items();
2140
2141 }
2142 };
2143
2144 /*
2145 A cursor that performs a table scan between two indices. The indices
2146 are provided by the two cursors representing the top and bottom bound
2147 of the window function's frame definition.
2148
2149 Each scan clears the sum function.
2150
2151 NOTE:
2152 The cursor does not alter the top and bottom cursors.
2153 This type of cursor is expensive computational wise. This is only to be
2154 used when the sum functions do not support removal.
2155 */
2156 class Frame_scan_cursor : public Frame_cursor
2157 {
2158 public:
Frame_scan_cursor(const Frame_cursor & top_bound,const Frame_cursor & bottom_bound)2159 Frame_scan_cursor(const Frame_cursor &top_bound,
2160 const Frame_cursor &bottom_bound) :
2161 top_bound(top_bound), bottom_bound(bottom_bound) {}
2162
init(READ_RECORD * info)2163 void init(READ_RECORD *info)
2164 {
2165 cursor.init(info);
2166 }
2167
pre_next_partition(ha_rows rownum)2168 void pre_next_partition(ha_rows rownum)
2169 {
2170 /* TODO(cvicentiu) Sum functions get cleared on next partition anyway during
2171 the window function computation algorithm. Either perform this only in
2172 cursors, or remove it from pre_next_partition.
2173 */
2174 curr_rownum= rownum;
2175 clear_sum_functions();
2176 }
2177
next_partition(ha_rows rownum)2178 void next_partition(ha_rows rownum)
2179 {
2180 compute_values_for_current_row();
2181 }
2182
pre_next_row()2183 void pre_next_row()
2184 {
2185 clear_sum_functions();
2186 }
2187
next_row()2188 void next_row()
2189 {
2190 curr_rownum++;
2191 compute_values_for_current_row();
2192 }
2193
get_curr_rownum() const2194 ha_rows get_curr_rownum() const
2195 {
2196 return curr_rownum;
2197 }
2198
2199 private:
2200 const Frame_cursor &top_bound;
2201 const Frame_cursor &bottom_bound;
2202 Table_read_cursor cursor;
2203 ha_rows curr_rownum;
2204
2205 /* Scan the rows between the top bound and bottom bound. Add all the values
2206 between them, top bound row and bottom bound row inclusive. */
compute_values_for_current_row()2207 void compute_values_for_current_row()
2208 {
2209 if (top_bound.is_outside_computation_bounds() ||
2210 bottom_bound.is_outside_computation_bounds())
2211 return;
2212
2213 ha_rows start_rownum= top_bound.get_curr_rownum();
2214 ha_rows bottom_rownum= bottom_bound.get_curr_rownum();
2215 DBUG_PRINT("info", ("COMPUTING (%llu %llu)", start_rownum, bottom_rownum));
2216
2217 cursor.move_to(start_rownum);
2218
2219 for (ha_rows idx= start_rownum; idx <= bottom_rownum; idx++)
2220 {
2221 if (cursor.fetch()) //EOF
2222 break;
2223 add_value_to_items();
2224 if (cursor.next()) // EOF
2225 break;
2226 }
2227 }
2228 };
2229
2230 /* A cursor that follows a target cursor. Each time a new row is added,
2231 the window functions are cleared and only have the row at which the target
2232 is point at added to them.
2233
2234 The window functions are cleared if the bounds or the position cursors are
2235 outside computational bounds.
2236 */
2237 class Frame_positional_cursor : public Frame_cursor
2238 {
2239 public:
Frame_positional_cursor(const Frame_cursor & position_cursor)2240 Frame_positional_cursor(const Frame_cursor &position_cursor) :
2241 position_cursor(position_cursor), top_bound(NULL),
2242 bottom_bound(NULL), offset(NULL), overflowed(false),
2243 negative_offset(false) {}
2244
Frame_positional_cursor(const Frame_cursor & position_cursor,const Frame_cursor & top_bound,const Frame_cursor & bottom_bound,Item & offset,bool negative_offset)2245 Frame_positional_cursor(const Frame_cursor &position_cursor,
2246 const Frame_cursor &top_bound,
2247 const Frame_cursor &bottom_bound,
2248 Item &offset,
2249 bool negative_offset) :
2250 position_cursor(position_cursor), top_bound(&top_bound),
2251 bottom_bound(&bottom_bound), offset(&offset),
2252 negative_offset(negative_offset) {}
2253
init(READ_RECORD * info)2254 void init(READ_RECORD *info)
2255 {
2256 cursor.init(info);
2257 }
2258
pre_next_partition(ha_rows rownum)2259 void pre_next_partition(ha_rows rownum)
2260 {
2261 /* The offset is dependant on the current row values. We can only get
2262 * it here accurately. When fetching other rows, it changes. */
2263 save_offset_value();
2264 }
2265
next_partition(ha_rows rownum)2266 void next_partition(ha_rows rownum)
2267 {
2268 save_positional_value();
2269 }
2270
pre_next_row()2271 void pre_next_row()
2272 {
2273 /* The offset is dependant on the current row values. We can only get
2274 * it here accurately. When fetching other rows, it changes. */
2275 save_offset_value();
2276 }
2277
next_row()2278 void next_row()
2279 {
2280 save_positional_value();
2281 }
2282
get_curr_rownum() const2283 ha_rows get_curr_rownum() const
2284 {
2285 return position_cursor.get_curr_rownum();
2286 }
2287
2288 private:
2289 /* Check if a our position is within bounds.
2290 * The position is passed as a parameter to avoid recalculating it. */
position_is_within_bounds()2291 bool position_is_within_bounds()
2292 {
2293 if (!offset)
2294 return !position_cursor.is_outside_computation_bounds();
2295
2296 if (overflowed)
2297 return false;
2298
2299 /* No valid bound to compare to. */
2300 if (position_cursor.is_outside_computation_bounds() ||
2301 top_bound->is_outside_computation_bounds() ||
2302 bottom_bound->is_outside_computation_bounds())
2303 return false;
2304
2305 /* We are over the bound. */
2306 if (position < top_bound->get_curr_rownum())
2307 return false;
2308 if (position > bottom_bound->get_curr_rownum())
2309 return false;
2310
2311 return true;
2312 }
2313
2314 /* Get the current position, accounting for the offset value, if present.
2315 NOTE: This function does not check over/underflow.
2316 */
get_current_position()2317 void get_current_position()
2318 {
2319 position = position_cursor.get_curr_rownum();
2320 overflowed= false;
2321 if (offset)
2322 {
2323 if (offset_value < 0 &&
2324 position + offset_value > position)
2325 {
2326 overflowed= true;
2327 }
2328 if (offset_value > 0 &&
2329 position + offset_value < position)
2330 {
2331 overflowed= true;
2332 }
2333 position += offset_value;
2334 }
2335 }
2336
save_offset_value()2337 void save_offset_value()
2338 {
2339 if (offset)
2340 offset_value= offset->val_int() * (negative_offset ? -1 : 1);
2341 else
2342 offset_value= 0;
2343 }
2344
save_positional_value()2345 void save_positional_value()
2346 {
2347 get_current_position();
2348 if (!position_is_within_bounds())
2349 clear_sum_functions();
2350 else
2351 {
2352 cursor.move_to(position);
2353 cursor.fetch();
2354 add_value_to_items();
2355 }
2356 }
2357
2358 const Frame_cursor &position_cursor;
2359 const Frame_cursor *top_bound;
2360 const Frame_cursor *bottom_bound;
2361 Item *offset;
2362 Table_read_cursor cursor;
2363 ha_rows position;
2364 longlong offset_value;
2365 bool overflowed;
2366
2367 bool negative_offset;
2368 };
2369
2370
2371 /*
2372 Get a Frame_cursor for a frame bound. This is a "factory function".
2373 */
get_frame_cursor(THD * thd,Window_spec * spec,bool is_top_bound)2374 Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool is_top_bound)
2375 {
2376 Window_frame *frame= spec->window_frame;
2377 if (!frame)
2378 {
2379 /*
2380 The docs say this about the lack of frame clause:
2381
2382 Let WD be a window structure descriptor.
2383 ...
2384 If WD has no window framing clause, then
2385 Case:
2386 i) If the window ordering clause of WD is not present, then WF is the
2387 window partition of R.
2388 ii) Otherwise, WF consists of all rows of the partition of R that
2389 precede R or are peers of R in the window ordering of the window
2390 partition defined by the window ordering clause.
2391
2392 For case #ii, the frame bounds essentially are "RANGE BETWEEN UNBOUNDED
2393 PRECEDING AND CURRENT ROW".
2394 For the case #i, without ordering clause all rows are considered peers,
2395 so again the same frame bounds can be used.
2396 */
2397 if (is_top_bound)
2398 return new Frame_unbounded_preceding(thd,
2399 spec->partition_list,
2400 spec->order_list);
2401 else
2402 return new Frame_range_current_row_bottom(thd,
2403 spec->partition_list,
2404 spec->order_list);
2405 }
2406
2407 Window_frame_bound *bound= is_top_bound? frame->top_bound :
2408 frame->bottom_bound;
2409
2410 if (bound->precedence_type == Window_frame_bound::PRECEDING ||
2411 bound->precedence_type == Window_frame_bound::FOLLOWING)
2412 {
2413 bool is_preceding= (bound->precedence_type ==
2414 Window_frame_bound::PRECEDING);
2415
2416 if (bound->offset == NULL) /* this is UNBOUNDED */
2417 {
2418 /* The following serve both RANGE and ROWS: */
2419 if (is_preceding)
2420 return new Frame_unbounded_preceding(thd,
2421 spec->partition_list,
2422 spec->order_list);
2423
2424 return new Frame_unbounded_following(thd,
2425 spec->partition_list,
2426 spec->order_list);
2427 }
2428
2429 if (frame->units == Window_frame::UNITS_ROWS)
2430 {
2431 ha_rows n_rows= bound->offset->val_int();
2432 /* These should be handled in the parser */
2433 DBUG_ASSERT(!bound->offset->null_value);
2434 DBUG_ASSERT((longlong) n_rows >= 0);
2435 if (is_preceding)
2436 return new Frame_n_rows_preceding(is_top_bound, n_rows);
2437
2438 return new Frame_n_rows_following(
2439 thd, spec->partition_list, spec->order_list,
2440 is_top_bound, n_rows);
2441 }
2442 else
2443 {
2444 if (is_top_bound)
2445 return new Frame_range_n_top(
2446 thd, spec->partition_list, spec->order_list,
2447 is_preceding, bound->offset);
2448
2449 return new Frame_range_n_bottom(thd,
2450 spec->partition_list, spec->order_list,
2451 is_preceding, bound->offset);
2452 }
2453 }
2454
2455 if (bound->precedence_type == Window_frame_bound::CURRENT)
2456 {
2457 if (frame->units == Window_frame::UNITS_ROWS)
2458 {
2459 if (is_top_bound)
2460 return new Frame_rows_current_row_top;
2461
2462 return new Frame_rows_current_row_bottom;
2463 }
2464 else
2465 {
2466 if (is_top_bound)
2467 return new Frame_range_current_row_top(
2468 thd, spec->partition_list, spec->order_list);
2469
2470 return new Frame_range_current_row_bottom(
2471 thd, spec->partition_list, spec->order_list);
2472 }
2473 }
2474 return NULL;
2475 }
2476
2477 static
add_special_frame_cursors(THD * thd,Cursor_manager * cursor_manager,Item_window_func * window_func)2478 void add_special_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
2479 Item_window_func *window_func)
2480 {
2481 Window_spec *spec= window_func->window_spec;
2482 Item_sum *item_sum= window_func->window_func();
2483 DBUG_PRINT("info", ("Get arg count: %d", item_sum->get_arg_count()));
2484 Frame_cursor *fc;
2485 switch (item_sum->sum_func())
2486 {
2487 case Item_sum::CUME_DIST_FUNC:
2488 fc= new Frame_unbounded_preceding(thd,
2489 spec->partition_list,
2490 spec->order_list);
2491 fc->add_sum_func(item_sum);
2492 cursor_manager->add_cursor(fc);
2493 fc= new Frame_range_current_row_bottom(thd,
2494 spec->partition_list,
2495 spec->order_list);
2496 fc->add_sum_func(item_sum);
2497 cursor_manager->add_cursor(fc);
2498 break;
2499 case Item_sum::LEAD_FUNC:
2500 case Item_sum::LAG_FUNC:
2501 {
2502 Frame_cursor *bottom_bound= new Frame_unbounded_following(thd,
2503 spec->partition_list,
2504 spec->order_list);
2505 Frame_cursor *top_bound= new Frame_unbounded_preceding(thd,
2506 spec->partition_list,
2507 spec->order_list);
2508 Frame_cursor *current_row_pos= new Frame_rows_current_row_bottom;
2509 cursor_manager->add_cursor(bottom_bound);
2510 cursor_manager->add_cursor(top_bound);
2511 cursor_manager->add_cursor(current_row_pos);
2512 DBUG_ASSERT(item_sum->fixed);
2513 bool negative_offset= item_sum->sum_func() == Item_sum::LAG_FUNC;
2514 fc= new Frame_positional_cursor(*current_row_pos,
2515 *top_bound, *bottom_bound,
2516 *item_sum->get_arg(1),
2517 negative_offset);
2518 fc->add_sum_func(item_sum);
2519 cursor_manager->add_cursor(fc);
2520 break;
2521 }
2522 case Item_sum::FIRST_VALUE_FUNC:
2523 {
2524 Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
2525 Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
2526 cursor_manager->add_cursor(bottom_bound);
2527 cursor_manager->add_cursor(top_bound);
2528 DBUG_ASSERT(item_sum->fixed);
2529 Item *offset_item= new (thd->mem_root) Item_int(thd, 0);
2530 offset_item->fix_fields(thd, &offset_item);
2531 fc= new Frame_positional_cursor(*top_bound,
2532 *top_bound, *bottom_bound,
2533 *offset_item, false);
2534 fc->add_sum_func(item_sum);
2535 cursor_manager->add_cursor(fc);
2536 break;
2537 }
2538 case Item_sum::LAST_VALUE_FUNC:
2539 {
2540 Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
2541 Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
2542 cursor_manager->add_cursor(bottom_bound);
2543 cursor_manager->add_cursor(top_bound);
2544 DBUG_ASSERT(item_sum->fixed);
2545 Item *offset_item= new (thd->mem_root) Item_int(thd, 0);
2546 offset_item->fix_fields(thd, &offset_item);
2547 fc= new Frame_positional_cursor(*bottom_bound,
2548 *top_bound, *bottom_bound,
2549 *offset_item, false);
2550 fc->add_sum_func(item_sum);
2551 cursor_manager->add_cursor(fc);
2552 break;
2553 }
2554 case Item_sum::NTH_VALUE_FUNC:
2555 {
2556 Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
2557 Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
2558 cursor_manager->add_cursor(bottom_bound);
2559 cursor_manager->add_cursor(top_bound);
2560 DBUG_ASSERT(item_sum->fixed);
2561 Item *int_item= new (thd->mem_root) Item_int(thd, 1);
2562 Item *offset_func= new (thd->mem_root)
2563 Item_func_minus(thd, item_sum->get_arg(1),
2564 int_item);
2565 offset_func->fix_fields(thd, &offset_func);
2566 fc= new Frame_positional_cursor(*top_bound,
2567 *top_bound, *bottom_bound,
2568 *offset_func, false);
2569 fc->add_sum_func(item_sum);
2570 cursor_manager->add_cursor(fc);
2571 break;
2572 }
2573 case Item_sum::PERCENTILE_CONT_FUNC:
2574 case Item_sum::PERCENTILE_DISC_FUNC:
2575 {
2576 fc= new Frame_unbounded_preceding(thd,
2577 spec->partition_list,
2578 spec->order_list);
2579 fc->add_sum_func(item_sum);
2580 cursor_manager->add_cursor(fc);
2581 fc= new Frame_unbounded_following(thd,
2582 spec->partition_list,
2583 spec->order_list);
2584 fc->add_sum_func(item_sum);
2585 cursor_manager->add_cursor(fc);
2586 break;
2587 }
2588 default:
2589 fc= new Frame_unbounded_preceding(
2590 thd, spec->partition_list, spec->order_list);
2591 fc->add_sum_func(item_sum);
2592 cursor_manager->add_cursor(fc);
2593
2594 fc= new Frame_rows_current_row_bottom;
2595 fc->add_sum_func(item_sum);
2596 cursor_manager->add_cursor(fc);
2597 }
2598 }
2599
2600
is_computed_with_remove(Item_sum::Sumfunctype sum_func)2601 static bool is_computed_with_remove(Item_sum::Sumfunctype sum_func)
2602 {
2603 switch (sum_func)
2604 {
2605 case Item_sum::CUME_DIST_FUNC:
2606 case Item_sum::ROW_NUMBER_FUNC:
2607 case Item_sum::RANK_FUNC:
2608 case Item_sum::DENSE_RANK_FUNC:
2609 case Item_sum::NTILE_FUNC:
2610 case Item_sum::FIRST_VALUE_FUNC:
2611 case Item_sum::LAST_VALUE_FUNC:
2612 case Item_sum::PERCENTILE_CONT_FUNC:
2613 case Item_sum::PERCENTILE_DISC_FUNC:
2614 return false;
2615 default:
2616 return true;
2617 }
2618 }
2619 /*
2620 Create required frame cursors for the list of window functions.
2621 Register all functions to their appropriate cursors.
2622 If the window functions share the same frame specification,
2623 those window functions will be registered to the same cursor.
2624 */
get_window_functions_required_cursors(THD * thd,List<Item_window_func> & window_functions,List<Cursor_manager> * cursor_managers)2625 void get_window_functions_required_cursors(
2626 THD *thd,
2627 List<Item_window_func>& window_functions,
2628 List<Cursor_manager> *cursor_managers)
2629 {
2630 List_iterator_fast<Item_window_func> it(window_functions);
2631 Item_window_func* item_win_func;
2632 Item_sum *sum_func;
2633 while ((item_win_func= it++))
2634 {
2635 Cursor_manager *cursor_manager = new Cursor_manager();
2636 sum_func = item_win_func->window_func();
2637 Frame_cursor *fc;
2638 /*
2639 Some window functions require the partition size for computing values.
2640 Add a cursor that retrieves it as the first one in the list if necessary.
2641 */
2642 if (item_win_func->requires_partition_size())
2643 {
2644 if (item_win_func->only_single_element_order_list())
2645 {
2646 fc= new Frame_unbounded_following_set_count_no_nulls(thd,
2647 item_win_func->window_spec->partition_list,
2648 item_win_func->window_spec->order_list);
2649 }
2650 else
2651 {
2652 fc= new Frame_unbounded_following_set_count(thd,
2653 item_win_func->window_spec->partition_list,
2654 item_win_func->window_spec->order_list);
2655 }
2656 fc->add_sum_func(sum_func);
2657 cursor_manager->add_cursor(fc);
2658 }
2659
2660 /*
2661 If it is not a regular window function that follows frame specifications,
2662 and/or specific cursors are required. ROW_NUM, RANK, NTILE and others
2663 follow such rules. Check is_frame_prohibited check for the full list.
2664
2665 TODO(cvicentiu) This approach is messy. Every time a function allows
2666 computation in a certain way, we have to add an extra method to this
2667 factory function. It is better to have window functions output
2668 their own cursors, as needed. This way, the logic is bound
2669 only to the implementation of said window function. Regular aggregate
2670 functions can keep the default frame generating code, overwrite it or
2671 add to it.
2672 */
2673 if (item_win_func->is_frame_prohibited() ||
2674 item_win_func->requires_special_cursors())
2675 {
2676 add_special_frame_cursors(thd, cursor_manager, item_win_func);
2677 cursor_managers->push_back(cursor_manager);
2678 continue;
2679 }
2680
2681 Frame_cursor *frame_bottom= get_frame_cursor(thd,
2682 item_win_func->window_spec, false);
2683 Frame_cursor *frame_top= get_frame_cursor(thd,
2684 item_win_func->window_spec, true);
2685
2686 frame_bottom->add_sum_func(sum_func);
2687 frame_top->add_sum_func(sum_func);
2688
2689 /*
2690 The order of these cursors is important. A sum function
2691 must first add values (via frame_bottom) then remove them via
2692 frame_top. Removing items first doesn't make sense in the case of all
2693 window functions.
2694 */
2695 cursor_manager->add_cursor(frame_bottom);
2696 cursor_manager->add_cursor(frame_top);
2697 if (is_computed_with_remove(sum_func->sum_func()) &&
2698 !sum_func->supports_removal())
2699 {
2700 frame_bottom->set_no_action();
2701 frame_top->set_no_action();
2702 Frame_cursor *scan_cursor= new Frame_scan_cursor(*frame_top,
2703 *frame_bottom);
2704 scan_cursor->add_sum_func(sum_func);
2705 cursor_manager->add_cursor(scan_cursor);
2706
2707 }
2708 cursor_managers->push_back(cursor_manager);
2709 }
2710 }
2711
2712 /**
2713 Helper function that takes a list of window functions and writes
2714 their values in the current table record.
2715 */
2716 static
save_window_function_values(List<Item_window_func> & window_functions,TABLE * tbl,uchar * rowid_buf)2717 bool save_window_function_values(List<Item_window_func>& window_functions,
2718 TABLE *tbl, uchar *rowid_buf)
2719 {
2720 List_iterator_fast<Item_window_func> iter(window_functions);
2721 JOIN_TAB *join_tab= tbl->reginfo.join_tab;
2722 tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
2723 store_record(tbl, record[1]);
2724 while (Item_window_func *item_win= iter++)
2725 item_win->save_in_field(item_win->result_field, true);
2726
2727 /*
2728 In case we have window functions present, an extra step is required
2729 to compute all the fields from the temporary table.
2730 In case we have a compound expression such as: expr + expr,
2731 where one of the terms has a window function inside it, only
2732 after computing window function values we actually know the true
2733 final result of the compounded expression.
2734
2735 Go through all the func items and save their values once again in the
2736 corresponding temp table fields. Do this for each row in the table.
2737
2738 This needs to be done earlier because ORDER BY clause can also have
2739 a window function, so we need to make sure all the fields of the temp.table
2740 are updated before we do the filesort. So is best to update the other fields
2741 that contain the window functions along with the computation of window
2742 functions.
2743 */
2744
2745 Item **func_ptr= join_tab->tmp_table_param->items_to_copy;
2746 Item *func;
2747 for (; (func = *func_ptr) ; func_ptr++)
2748 {
2749 if (func->with_window_func && func->type() != Item::WINDOW_FUNC_ITEM)
2750 func->save_in_result_field(true);
2751 }
2752
2753 int err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
2754 if (err && err != HA_ERR_RECORD_IS_THE_SAME)
2755 return true;
2756
2757 return false;
2758 }
2759
2760 /*
2761 TODO(cvicentiu) update this comment to reflect the new execution.
2762
2763 Streamed window function computation with window frames.
2764
2765 We make a single pass over the ordered temp.table, but we're using three
2766 cursors:
2767 - current row - the row that we're computing window func value for)
2768 - start_bound - the start of the frame
2769 - bottom_bound - the end of the frame
2770
2771 All three cursors move together.
2772
2773 @todo
2774 Provided bounds have their 'cursors'... is it better to re-clone their
2775 cursors or re-position them onto the current row?
2776
2777 @detail
2778 ROWS BETWEEN 3 PRECEDING -- frame start
2779 AND 3 FOLLOWING -- frame end
2780
2781 /------ frame end (aka BOTTOM)
2782 Dataset start |
2783 --------====*=======[*]========*========-------->> dataset end
2784 | \
2785 | +-------- current row
2786 |
2787 \-------- frame start ("TOP")
2788
2789 - frame_end moves forward and adds rows into the aggregate function.
2790 - frame_start follows behind and removes rows from the aggregate function.
2791 - current_row is the row where the value of aggregate function is stored.
2792
2793 @TODO: Only the first cursor needs to check for run-out-of-partition
2794 condition (Others can catch up by counting rows?)
2795
2796 */
compute_window_func(THD * thd,List<Item_window_func> & window_functions,List<Cursor_manager> & cursor_managers,TABLE * tbl,SORT_INFO * filesort_result)2797 bool compute_window_func(THD *thd,
2798 List<Item_window_func>& window_functions,
2799 List<Cursor_manager>& cursor_managers,
2800 TABLE *tbl,
2801 SORT_INFO *filesort_result)
2802 {
2803 List_iterator_fast<Item_window_func> iter_win_funcs(window_functions);
2804 List_iterator_fast<Cursor_manager> iter_cursor_managers(cursor_managers);
2805 uint err;
2806
2807 READ_RECORD info;
2808
2809 if (init_read_record(&info, current_thd, tbl, NULL/*select*/, filesort_result,
2810 0, 1, FALSE))
2811 return true;
2812
2813 Cursor_manager *cursor_manager;
2814 while ((cursor_manager= iter_cursor_managers++))
2815 cursor_manager->initialize_cursors(&info);
2816
2817 /* One partition tracker for each window function. */
2818 List<Group_bound_tracker> partition_trackers;
2819 Item_window_func *win_func;
2820 while ((win_func= iter_win_funcs++))
2821 {
2822 Group_bound_tracker *tracker= new Group_bound_tracker(thd,
2823 win_func->window_spec->partition_list);
2824 // TODO(cvicentiu) This should be removed and placed in constructor.
2825 tracker->init();
2826 partition_trackers.push_back(tracker);
2827 }
2828
2829 List_iterator_fast<Group_bound_tracker> iter_part_trackers(partition_trackers);
2830 ha_rows rownum= 0;
2831 uchar *rowid_buf= (uchar*) my_malloc(PSI_INSTRUMENT_ME, tbl->file->ref_length, MYF(0));
2832
2833 while (true)
2834 {
2835 if ((err= info.read_record()))
2836 break; // End of file.
2837
2838 /* Remember current row so that we can restore it before computing
2839 each window function. */
2840 tbl->file->position(tbl->record[0]);
2841 memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
2842
2843 iter_win_funcs.rewind();
2844 iter_part_trackers.rewind();
2845 iter_cursor_managers.rewind();
2846
2847 Group_bound_tracker *tracker;
2848 while ((win_func= iter_win_funcs++) &&
2849 (tracker= iter_part_trackers++) &&
2850 (cursor_manager= iter_cursor_managers++))
2851 {
2852 if (tracker->check_if_next_group() || (rownum == 0))
2853 {
2854 /* TODO(cvicentiu)
2855 Clearing window functions should happen through cursors. */
2856 win_func->window_func()->clear();
2857 cursor_manager->notify_cursors_partition_changed(rownum);
2858 }
2859 else
2860 {
2861 cursor_manager->notify_cursors_next_row();
2862 }
2863
2864 /* Check if we found any error in the window function while adding values
2865 through cursors. */
2866 if (unlikely(thd->is_error() || thd->is_killed()))
2867 break;
2868
2869
2870 /* Return to current row after notifying cursors for each window
2871 function. */
2872 tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
2873 }
2874
2875 /* We now have computed values for each window function. They can now
2876 be saved in the current row. */
2877 save_window_function_values(window_functions, tbl, rowid_buf);
2878
2879 rownum++;
2880 }
2881
2882 my_free(rowid_buf);
2883 partition_trackers.delete_elements();
2884 end_read_record(&info);
2885
2886 return false;
2887 }
2888
2889 /* Make a list that is a concation of two lists of ORDER elements */
2890
concat_order_lists(MEM_ROOT * mem_root,ORDER * list1,ORDER * list2)2891 static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2)
2892 {
2893 if (!list1)
2894 {
2895 list1= list2;
2896 list2= NULL;
2897 }
2898
2899 ORDER *res= NULL; // first element in the new list
2900 ORDER *prev= NULL; // last element in the new list
2901 ORDER *cur_list= list1; // this goes through list1, list2
2902 while (cur_list)
2903 {
2904 for (ORDER *cur= cur_list; cur; cur= cur->next)
2905 {
2906 ORDER *copy= (ORDER*)alloc_root(mem_root, sizeof(ORDER));
2907 memcpy(copy, cur, sizeof(ORDER));
2908 if (prev)
2909 prev->next= copy;
2910 prev= copy;
2911 if (!res)
2912 res= copy;
2913 }
2914
2915 cur_list= (cur_list == list1)? list2: NULL;
2916 }
2917
2918 if (prev)
2919 prev->next= NULL;
2920
2921 return res;
2922 }
2923
add_function_to_run(Item_window_func * win_func)2924 bool Window_func_runner::add_function_to_run(Item_window_func *win_func)
2925 {
2926
2927 Item_sum *sum_func= win_func->window_func();
2928 sum_func->setup_window_func(current_thd, win_func->window_spec);
2929
2930 Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
2931
2932 switch (type)
2933 {
2934 /* Distinct is not yet supported. */
2935 case Item_sum::GROUP_CONCAT_FUNC:
2936 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2937 "GROUP_CONCAT() aggregate as window function");
2938 return true;
2939 case Item_sum::SUM_DISTINCT_FUNC:
2940 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2941 "SUM(DISTINCT) aggregate as window function");
2942 return true;
2943 case Item_sum::AVG_DISTINCT_FUNC:
2944 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2945 "AVG(DISTINCT) aggregate as window function");
2946 return true;
2947 case Item_sum::COUNT_DISTINCT_FUNC:
2948 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2949 "COUNT(DISTINCT) aggregate as window function");
2950 return true;
2951 case Item_sum::JSON_ARRAYAGG_FUNC:
2952 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2953 "JSON_ARRAYAGG() aggregate as window function");
2954 return true;
2955 case Item_sum::JSON_OBJECTAGG_FUNC:
2956 my_error(ER_NOT_SUPPORTED_YET, MYF(0),
2957 "JSON_OBJECTAGG() aggregate as window function");
2958 return true;
2959 default:
2960 break;
2961 }
2962
2963 return window_functions.push_back(win_func);
2964 }
2965
2966
2967 /*
2968 Compute the value of window function for all rows.
2969 */
exec(THD * thd,TABLE * tbl,SORT_INFO * filesort_result)2970 bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result)
2971 {
2972 List_iterator_fast<Item_window_func> it(window_functions);
2973 Item_window_func *win_func;
2974 while ((win_func= it++))
2975 {
2976 win_func->set_phase_to_computation();
2977 // TODO(cvicentiu) Setting the aggregator should probably be done during
2978 // setup of Window_funcs_sort.
2979 win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
2980 }
2981 it.rewind();
2982
2983 List<Cursor_manager> cursor_managers;
2984 get_window_functions_required_cursors(thd, window_functions,
2985 &cursor_managers);
2986
2987 /* Go through the sorted array and compute the window function */
2988 bool is_error= compute_window_func(thd,
2989 window_functions,
2990 cursor_managers,
2991 tbl, filesort_result);
2992 while ((win_func= it++))
2993 {
2994 win_func->set_phase_to_retrieval();
2995 }
2996
2997 cursor_managers.delete_elements();
2998
2999 return is_error;
3000 }
3001
3002
exec(JOIN * join,bool keep_filesort_result)3003 bool Window_funcs_sort::exec(JOIN *join, bool keep_filesort_result)
3004 {
3005 THD *thd= join->thd;
3006 JOIN_TAB *join_tab= join->join_tab + join->total_join_tab_cnt();
3007
3008 /* Sort the table based on the most specific sorting criteria of
3009 the window functions. */
3010 if (create_sort_index(thd, join, join_tab, filesort))
3011 return true;
3012
3013 TABLE *tbl= join_tab->table;
3014 SORT_INFO *filesort_result= join_tab->filesort_result;
3015
3016 bool is_error= runner.exec(thd, tbl, filesort_result);
3017
3018 if (!keep_filesort_result)
3019 {
3020 delete join_tab->filesort_result;
3021 join_tab->filesort_result= NULL;
3022 }
3023 return is_error;
3024 }
3025
3026
setup(THD * thd,SQL_SELECT * sel,List_iterator<Item_window_func> & it,JOIN_TAB * join_tab)3027 bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
3028 List_iterator<Item_window_func> &it,
3029 JOIN_TAB *join_tab)
3030 {
3031 Window_spec *spec;
3032 Item_window_func *win_func= it.peek();
3033 Item_window_func *win_func_with_longest_order= NULL;
3034 int longest_order_elements= -1;
3035
3036 /* The iterator should point to a valid function at the start of execution. */
3037 DBUG_ASSERT(win_func);
3038 do
3039 {
3040 spec= win_func->window_spec;
3041 int win_func_order_elements= spec->partition_list->elements +
3042 spec->order_list->elements;
3043 if (win_func_order_elements > longest_order_elements)
3044 {
3045 win_func_with_longest_order= win_func;
3046 longest_order_elements= win_func_order_elements;
3047 }
3048 if (runner.add_function_to_run(win_func))
3049 return true;
3050 it++;
3051 win_func= it.peek();
3052 } while (win_func && !(win_func->marker & SORTORDER_CHANGE_FLAG));
3053
3054 /*
3055 The sort criteria must be taken from the last win_func in the group of
3056 adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is
3057 because the sort order must be the most specific sorting criteria defined
3058 within the window function group. This ensures that we sort the table
3059 in a way that the result is valid for all window functions belonging to
3060 this Window_funcs_sort.
3061 */
3062 spec= win_func_with_longest_order->window_spec;
3063
3064 ORDER* sort_order= concat_order_lists(thd->mem_root,
3065 spec->partition_list->first,
3066 spec->order_list->first);
3067 if (sort_order == NULL) // No partition or order by clause.
3068 {
3069 /* TODO(cvicentiu) This is used as a way to allow an empty OVER ()
3070 clause for window functions. However, a better approach is
3071 to not call Filesort at all in this case and just read whatever order
3072 the temporary table has.
3073 Due to cursors not working for out_of_memory cases (yet!), we have to run
3074 filesort to generate a sort buffer of the results.
3075 In this case we sort by the first field of the temporary table.
3076 We should have this field available, even if it is a window_function
3077 field. We don't care of the particular sorting result in this case.
3078 */
3079 ORDER *order= (ORDER *)alloc_root(thd->mem_root, sizeof(ORDER));
3080 memset(order, 0, sizeof(*order));
3081 Item *item= new (thd->mem_root) Item_temptable_field(thd,
3082 join_tab->table->field[0]);
3083 order->item= (Item **)alloc_root(thd->mem_root, 2 * sizeof(Item *));
3084 order->item[1]= NULL;
3085 order->item[0]= item;
3086 order->field= join_tab->table->field[0];
3087 sort_order= order;
3088 }
3089 filesort= new (thd->mem_root) Filesort(sort_order, HA_POS_ERROR, true, NULL);
3090
3091 /* Apply the same condition that the subsequent sort has. */
3092 filesort->select= sel;
3093
3094 return false;
3095 }
3096
3097
setup(THD * thd,List<Item_window_func> * window_funcs,JOIN_TAB * tab)3098 bool Window_funcs_computation::setup(THD *thd,
3099 List<Item_window_func> *window_funcs,
3100 JOIN_TAB *tab)
3101 {
3102 order_window_funcs_by_window_specs(window_funcs);
3103
3104 SQL_SELECT *sel= NULL;
3105 /*
3106 If the tmp table is filtered during sorting
3107 (ex: SELECT with HAVING && ORDER BY), we must make sure to keep the
3108 filtering conditions when we perform sorting for window function
3109 computation.
3110 */
3111 if (tab->filesort && tab->filesort->select)
3112 {
3113 sel= tab->filesort->select;
3114 DBUG_ASSERT(!sel->quick);
3115 }
3116
3117 Window_funcs_sort *srt;
3118 List_iterator<Item_window_func> iter(*window_funcs);
3119 while (iter.peek())
3120 {
3121 if (!(srt= new Window_funcs_sort()) ||
3122 srt->setup(thd, sel, iter, tab))
3123 {
3124 return true;
3125 }
3126 win_func_sorts.push_back(srt, thd->mem_root);
3127 }
3128 return false;
3129 }
3130
3131
exec(JOIN * join,bool keep_last_filesort_result)3132 bool Window_funcs_computation::exec(JOIN *join, bool keep_last_filesort_result)
3133 {
3134 List_iterator<Window_funcs_sort> it(win_func_sorts);
3135 Window_funcs_sort *srt;
3136 uint counter= 0; /* Count how many sorts we've executed. */
3137 /* Execute each sort */
3138 while ((srt = it++))
3139 {
3140 counter++;
3141 bool keep_filesort_result= keep_last_filesort_result &&
3142 counter == win_func_sorts.elements;
3143 if (srt->exec(join, keep_filesort_result))
3144 return true;
3145 }
3146 return false;
3147 }
3148
3149
cleanup()3150 void Window_funcs_computation::cleanup()
3151 {
3152 List_iterator<Window_funcs_sort> it(win_func_sorts);
3153 Window_funcs_sort *srt;
3154 while ((srt = it++))
3155 {
3156 srt->cleanup();
3157 delete srt;
3158 }
3159 }
3160
3161
3162 Explain_aggr_window_funcs*
save_explain_plan(MEM_ROOT * mem_root,bool is_analyze)3163 Window_funcs_computation::save_explain_plan(MEM_ROOT *mem_root,
3164 bool is_analyze)
3165 {
3166 Explain_aggr_window_funcs *xpl= new Explain_aggr_window_funcs;
3167 List_iterator<Window_funcs_sort> it(win_func_sorts);
3168 Window_funcs_sort *srt;
3169 if (!xpl)
3170 return 0;
3171 while ((srt = it++))
3172 {
3173 Explain_aggr_filesort *eaf=
3174 new Explain_aggr_filesort(mem_root, is_analyze, srt->filesort);
3175 if (!eaf)
3176 return 0;
3177 xpl->sorts.push_back(eaf, mem_root);
3178 }
3179 return xpl;
3180 }
3181
3182
add_window_func(Item_window_func * win_func)3183 bool st_select_lex::add_window_func(Item_window_func *win_func)
3184 {
3185 if (parsing_place != SELECT_LIST)
3186 fields_in_window_functions+= win_func->window_func()->argument_count();
3187 return window_funcs.push_back(win_func);
3188 }
3189
3190 /////////////////////////////////////////////////////////////////////////////
3191 // Unneeded comments (will be removed when we develop a replacement for
3192 // the feature that was attempted here
3193 /////////////////////////////////////////////////////////////////////////////
3194 /*
3195 TODO Get this code to set can_compute_window_function during preparation,
3196 not during execution.
3197
3198 The reason for this is the following:
3199 Our single scan optimization for window functions without tmp table,
3200 is valid, if and only if, we only need to perform one sorting operation,
3201 via filesort. The cases where we need to perform one sorting operation only:
3202
3203 * A select with only one window function.
3204 * A select with multiple window functions, but they must have their
3205 partition and order by clauses compatible. This means that one ordering
3206 is acceptable for both window functions.
3207
3208 For example:
3209 partition by a, b, c; order by d, e results in sorting by a b c d e.
3210 partition by a; order by d results in sorting by a d.
3211
3212 This kind of sorting is compatible. The less specific partition does
3213 not care for the order of b and c columns so it is valid if we sort
3214 by those in case of equality over a.
3215
3216 partition by a, b; order by d, e results in sorting by a b d e
3217 partition by a; order by e results in sorting by a e
3218
3219 This sorting is incompatible due to the order by clause. The partition by
3220 clause is compatible, (partition by a) is a prefix for (partition by a, b)
3221 However, order by e is not a prefix for order by d, e, thus it is not
3222 compatible.
3223
3224 The rule for having compatible sorting is thus:
3225 Each partition order must contain the other window functions partitions
3226 prefixes, or be a prefix itself. This must hold true for all partitions.
3227 Analog for the order by clause.
3228 */
3229 #if 0
3230 List<Item_window_func> window_functions;
3231 SQL_I_List<ORDER> largest_partition;
3232 SQL_I_List<ORDER> largest_order_by;
3233 bool can_compute_window_live = !need_tmp;
3234 // Construct the window_functions item list and check if they can be
3235 // computed using only one sorting.
3236 //
3237 // TODO: Perhaps group functions into compatible sorting bins
3238 // to minimize the number of sorting passes required to compute all of them.
3239 while ((item= it++))
3240 {
3241 if (item->type() == Item::WINDOW_FUNC_ITEM)
3242 {
3243 Item_window_func *item_win = (Item_window_func *) item;
3244 window_functions.push_back(item_win);
3245 if (!can_compute_window_live)
3246 continue; // No point checking since we have to perform multiple sorts.
3247 Window_spec *spec = item_win->window_spec;
3248 // Having an empty partition list on one window function and a
3249 // not empty list on a separate window function causes the sorting
3250 // to be incompatible.
3251 //
3252 // Example:
3253 // over (partition by a, order by x) && over (order by x).
3254 //
3255 // The first function requires an ordering by a first and then by x,
3256 // while the seond function requires an ordering by x first.
3257 // The same restriction is not required for the order by clause.
3258 if (largest_partition.elements && !spec->partition_list.elements)
3259 {
3260 can_compute_window_live= FALSE;
3261 continue;
3262 }
3263 can_compute_window_live= test_if_order_compatible(largest_partition,
3264 spec->partition_list);
3265 if (!can_compute_window_live)
3266 continue;
3267
3268 can_compute_window_live= test_if_order_compatible(largest_order_by,
3269 spec->order_list);
3270 if (!can_compute_window_live)
3271 continue;
3272
3273 if (largest_partition.elements < spec->partition_list.elements)
3274 largest_partition = spec->partition_list;
3275 if (largest_order_by.elements < spec->order_list.elements)
3276 largest_order_by = spec->order_list;
3277 }
3278 }
3279 if (can_compute_window_live && window_functions.elements && table_count == 1)
3280 {
3281 ha_rows examined_rows = 0;
3282 ha_rows found_rows = 0;
3283 ha_rows filesort_retval;
3284 SORT_FIELD *s_order= (SORT_FIELD *) my_malloc(sizeof(SORT_FIELD) *
3285 (largest_partition.elements + largest_order_by.elements) + 1,
3286 MYF(MY_WME | MY_ZEROFILL | MY_THREAD_SPECIFIC));
3287
3288 size_t pos= 0;
3289 for (ORDER* curr = largest_partition.first; curr; curr=curr->next, pos++)
3290 s_order[pos].item = *curr->item;
3291
3292 for (ORDER* curr = largest_order_by.first; curr; curr=curr->next, pos++)
3293 s_order[pos].item = *curr->item;
3294
3295 table[0]->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
3296 MYF(MY_WME | MY_ZEROFILL|
3297 MY_THREAD_SPECIFIC));
3298
3299
3300 filesort_retval= filesort(thd, table[0], s_order,
3301 (largest_partition.elements + largest_order_by.elements),
3302 this->select, HA_POS_ERROR, FALSE,
3303 &examined_rows, &found_rows,
3304 this->explain->ops_tracker.report_sorting(thd));
3305 table[0]->sort.found_records= filesort_retval;
3306
3307 join_tab->read_first_record = join_init_read_record;
3308 join_tab->records= found_rows;
3309
3310 my_free(s_order);
3311 }
3312 else
3313 #endif
3314